aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/archive/yarchive.cpp
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/archive/yarchive.cpp
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/archive/yarchive.cpp')
-rw-r--r--library/cpp/archive/yarchive.cpp398
1 files changed, 398 insertions, 0 deletions
diff --git a/library/cpp/archive/yarchive.cpp b/library/cpp/archive/yarchive.cpp
new file mode 100644
index 0000000000..1becc3e5da
--- /dev/null
+++ b/library/cpp/archive/yarchive.cpp
@@ -0,0 +1,398 @@
+#include "yarchive.h"
+
+#include <util/generic/algorithm.h>
+#include <util/generic/hash.h>
+#include <util/generic/utility.h>
+#include <util/generic/vector.h>
+#include <util/generic/yexception.h>
+#include <util/memory/blob.h>
+#include <util/memory/tempbuf.h>
+#include <util/stream/input.h>
+#include <util/stream/length.h>
+#include <util/stream/mem.h>
+#include <util/stream/output.h>
+#include <util/stream/zlib.h>
+#include <util/system/byteorder.h>
+#include <util/ysaveload.h>
+
+template <class T>
+static inline void ESSave(IOutputStream* out, const T& t_in) {
+ T t = HostToLittle(t_in);
+
+ out->Write((const void*)&t, sizeof(t));
+}
+
+static inline void ESSave(IOutputStream* out, const TString& s) {
+ ESSave(out, (ui32) s.size());
+ out->Write(s.data(), s.size());
+}
+
+template <class T>
+static inline T ESLoad(IInputStream* in) {
+ T t = T();
+
+ if (in->Load(&t, sizeof(t)) != sizeof(t)) {
+ ythrow TSerializeException() << "malformed archive";
+ }
+
+ return LittleToHost(t);
+}
+
+template <>
+inline TString ESLoad<TString>(IInputStream* in) {
+ size_t len = ESLoad<ui32>(in);
+ TString ret;
+ TTempBuf tmp;
+
+ while (len) {
+ const size_t toread = Min(len, tmp.Size());
+ const size_t readed = in->Read(tmp.Data(), toread);
+
+ if (!readed) {
+ ythrow TSerializeException() << "malformed archive";
+ }
+
+ ret.append(tmp.Data(), readed);
+ len -= readed;
+ }
+
+ return ret;
+}
+
+namespace {
+ class TArchiveRecordDescriptor: public TSimpleRefCount<TArchiveRecordDescriptor> {
+ public:
+ inline TArchiveRecordDescriptor(ui64 off, ui64 len, const TString& name)
+ : Off_(off)
+ , Len_(len)
+ , Name_(name)
+ {
+ }
+
+ inline TArchiveRecordDescriptor(IInputStream* in)
+ : Off_(ESLoad<ui64>(in))
+ , Len_(ESLoad<ui64>(in))
+ , Name_(ESLoad<TString>(in))
+ {
+ }
+
+ inline ~TArchiveRecordDescriptor() = default;
+
+ inline void SaveTo(IOutputStream* out) const {
+ ESSave(out, Off_);
+ ESSave(out, Len_);
+ ESSave(out, Name_);
+ }
+
+ inline const TString& Name() const noexcept {
+ return Name_;
+ }
+
+ inline ui64 Length() const noexcept {
+ return Len_;
+ }
+
+ inline ui64 Offset() const noexcept {
+ return Off_;
+ }
+
+ private:
+ ui64 Off_;
+ ui64 Len_;
+ TString Name_;
+ };
+
+ typedef TIntrusivePtr<TArchiveRecordDescriptor> TArchiveRecordDescriptorRef;
+}
+
+class TArchiveWriter::TImpl {
+ using TDict = THashMap<TString, TArchiveRecordDescriptorRef>;
+
+public:
+ inline TImpl(IOutputStream& out, bool compress)
+ : Off_(0)
+ , Out_(&out)
+ , UseCompression(compress)
+ {
+ }
+
+ inline ~TImpl() = default;
+
+ inline void Flush() {
+ Out_->Flush();
+ }
+
+ inline void Finish() {
+ TCountingOutput out(Out_);
+
+ {
+ TZLibCompress compress(&out);
+
+ ESSave(&compress, (ui32)Dict_.size());
+
+ for (const auto& kv : Dict_) {
+ kv.second->SaveTo(&compress);
+ }
+
+ ESSave(&compress, static_cast<ui8>(UseCompression));
+
+ compress.Finish();
+ }
+
+ ESSave(Out_, out.Counter());
+
+ Out_->Flush();
+ }
+
+ inline void Add(const TString& key, IInputStream* src) {
+ Y_ENSURE(!Dict_.contains(key), "key " << key.data() << " already stored");
+
+ TCountingOutput out(Out_);
+ if (UseCompression) {
+ TZLibCompress compress(&out);
+ TransferData(src, &compress);
+ compress.Finish();
+ } else {
+ size_t skip_size = ArchiveWriterDefaultDataAlignment - Off_ % ArchiveWriterDefaultDataAlignment;
+ if (skip_size == ArchiveWriterDefaultDataAlignment) {
+ skip_size = 0;
+ }
+ while(skip_size > 0) {
+ Out_->Write(char(0));
+ Off_ += 1;
+ skip_size -= 1;
+ }
+ TransferData(src, &out);
+ out.Finish();
+ }
+
+ TArchiveRecordDescriptorRef descr(new TArchiveRecordDescriptor(Off_, out.Counter(), key));
+
+ Dict_[key] = descr;
+ Off_ += out.Counter();
+ }
+
+ inline void AddSynonym(const TString& existingKey, const TString& newKey) {
+ Y_ENSURE(Dict_.contains(existingKey), "key " << existingKey.data() << " not stored yet");
+ Y_ENSURE(!Dict_.contains(newKey), "key " << newKey.data() << " already stored");
+
+ TArchiveRecordDescriptorRef existingDescr = Dict_[existingKey];
+ TArchiveRecordDescriptorRef descr(new TArchiveRecordDescriptor(existingDescr->Offset(), existingDescr->Length(), newKey));
+
+ Dict_[newKey] = descr;
+ }
+
+private:
+ ui64 Off_;
+ IOutputStream* Out_;
+ TDict Dict_;
+ const bool UseCompression;
+};
+
+TArchiveWriter::TArchiveWriter(IOutputStream* out, bool compress)
+ : Impl_(new TImpl(*out, compress))
+{
+}
+
+TArchiveWriter::~TArchiveWriter() {
+ try {
+ Finish();
+ } catch (...) {
+ }
+}
+
+void TArchiveWriter::Flush() {
+ if (Impl_.Get()) {
+ Impl_->Flush();
+ }
+}
+
+void TArchiveWriter::Finish() {
+ if (Impl_.Get()) {
+ Impl_->Finish();
+ Impl_.Destroy();
+ }
+}
+
+void TArchiveWriter::Add(const TString& key, IInputStream* src) {
+ Y_ENSURE(Impl_.Get(), "archive already closed");
+
+ Impl_->Add(key, src);
+}
+
+void TArchiveWriter::AddSynonym(const TString& existingKey, const TString& newKey) {
+ Y_ENSURE(Impl_.Get(), "archive already closed");
+
+ Impl_->AddSynonym(existingKey, newKey);
+}
+
+namespace {
+ class TArchiveInputStreamBase {
+ public:
+ inline TArchiveInputStreamBase(const TBlob& b)
+ : Blob_(b)
+ , Input_(b.Data(), b.Size())
+ {
+ }
+
+ protected:
+ TBlob Blob_;
+ TMemoryInput Input_;
+ };
+
+ class TArchiveInputStream: public TArchiveInputStreamBase, public TZLibDecompress {
+ public:
+ inline TArchiveInputStream(const TBlob& b)
+ : TArchiveInputStreamBase(b)
+ , TZLibDecompress(&Input_)
+ {
+ }
+
+ ~TArchiveInputStream() override = default;
+ };
+}
+
+class TArchiveReader::TImpl {
+ typedef THashMap<TString, TArchiveRecordDescriptorRef> TDict;
+
+public:
+ inline TImpl(const TBlob& blob)
+ : Blob_(blob)
+ , UseDecompression(true)
+ {
+ ReadDict();
+ }
+
+ inline ~TImpl() = default;
+
+ inline void ReadDict() {
+ Y_ENSURE(Blob_.Size() >= sizeof(ui64), "too small blob");
+
+ const char* end = (const char*)Blob_.End();
+ const char* ptr = end - sizeof(ui64);
+ ui64 dictlen = 0;
+ memcpy(&dictlen, ptr, sizeof(ui64));
+ dictlen = LittleToHost(dictlen);
+
+ Y_ENSURE(dictlen <= Blob_.Size() - sizeof(ui64), "bad blob");
+
+ const char* beg = ptr - dictlen;
+ TMemoryInput mi(beg, dictlen);
+ TZLibDecompress d(&mi);
+ const ui32 count = ESLoad<ui32>(&d);
+
+ for (size_t i = 0; i < count; ++i) {
+ TArchiveRecordDescriptorRef descr(new TArchiveRecordDescriptor(&d));
+
+ Recs_.push_back(descr);
+ Dict_[descr->Name()] = descr;
+ }
+ Sort(Recs_.begin(), Recs_.end(), [](const auto& lhs, const auto& rhs) -> bool {
+ return lhs->Offset() < rhs->Offset();
+ });
+
+ try {
+ UseDecompression = static_cast<bool>(ESLoad<ui8>(&d));
+ } catch (const TSerializeException&) {
+ // that's ok - just old format
+ UseDecompression = true;
+ }
+ }
+
+ inline size_t Count() const noexcept {
+ return Recs_.size();
+ }
+
+ inline TString KeyByIndex(size_t n) const {
+ if (n < Count()) {
+ return Recs_[n]->Name();
+ }
+
+ ythrow yexception() << "incorrect index";
+ }
+
+ inline bool Has(const TStringBuf key) const {
+ return Dict_.contains(key);
+ }
+
+ inline TAutoPtr<IInputStream> ObjectByKey(const TStringBuf key) const {
+ TBlob subBlob = BlobByKey(key);
+
+ if (UseDecompression) {
+ return new TArchiveInputStream(subBlob);
+ } else {
+ return new TMemoryInput(subBlob.Data(), subBlob.Length());
+ }
+ }
+
+ inline TBlob ObjectBlobByKey(const TStringBuf key) const {
+ TBlob subBlob = BlobByKey(key);
+
+ if (UseDecompression) {
+ TArchiveInputStream st(subBlob);
+ return TBlob::FromStream(st);
+ } else {
+ return subBlob;
+ }
+ }
+
+ inline TBlob BlobByKey(const TStringBuf key) const {
+ const auto it = Dict_.find(key);
+
+ Y_ENSURE(it != Dict_.end(), "key " << key.data() << " not found");
+
+ const size_t off = it->second->Offset();
+ const size_t len = it->second->Length();
+
+ /*
+ * TODO - overflow check
+ */
+
+ return Blob_.SubBlob(off, off + len);
+ }
+
+ inline bool Compressed() const {
+ return UseDecompression;
+ }
+
+private:
+ TBlob Blob_;
+ TVector<TArchiveRecordDescriptorRef> Recs_;
+ TDict Dict_;
+ bool UseDecompression;
+};
+
+TArchiveReader::TArchiveReader(const TBlob& data)
+ : Impl_(new TImpl(data))
+{
+}
+
+TArchiveReader::~TArchiveReader() {}
+
+size_t TArchiveReader::Count() const noexcept {
+ return Impl_->Count();
+}
+
+TString TArchiveReader::KeyByIndex(size_t n) const {
+ return Impl_->KeyByIndex(n);
+}
+
+bool TArchiveReader::Has(const TStringBuf key) const {
+ return Impl_->Has(key);
+}
+
+TAutoPtr<IInputStream> TArchiveReader::ObjectByKey(const TStringBuf key) const {
+ return Impl_->ObjectByKey(key);
+}
+
+TBlob TArchiveReader::ObjectBlobByKey(const TStringBuf key) const {
+ return Impl_->ObjectBlobByKey(key);
+}
+
+TBlob TArchiveReader::BlobByKey(const TStringBuf key) const {
+ return Impl_->BlobByKey(key);
+}
+
+bool TArchiveReader::Compressed() const {
+ return Impl_->Compressed();
+}