aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/on_disk/multi_blob
diff options
context:
space:
mode:
authorvitalyisaev <vitalyisaev@ydb.tech>2023-11-30 13:26:22 +0300
committervitalyisaev <vitalyisaev@ydb.tech>2023-11-30 15:44:45 +0300
commit0a98fece5a9b54f16afeb3a94b3eb3105e9c3962 (patch)
tree291d72dbd7e9865399f668c84d11ed86fb190bbf /library/cpp/on_disk/multi_blob
parentcb2c8d75065e5b3c47094067cb4aa407d4813298 (diff)
downloadydb-0a98fece5a9b54f16afeb3a94b3eb3105e9c3962.tar.gz
YQ Connector:Use docker-compose in integrational tests
Diffstat (limited to 'library/cpp/on_disk/multi_blob')
-rw-r--r--library/cpp/on_disk/multi_blob/multiblob.cpp67
-rw-r--r--library/cpp/on_disk/multi_blob/multiblob.h77
-rw-r--r--library/cpp/on_disk/multi_blob/multiblob_builder.cpp146
-rw-r--r--library/cpp/on_disk/multi_blob/multiblob_builder.h64
-rw-r--r--library/cpp/on_disk/multi_blob/ya.make13
5 files changed, 367 insertions, 0 deletions
diff --git a/library/cpp/on_disk/multi_blob/multiblob.cpp b/library/cpp/on_disk/multi_blob/multiblob.cpp
new file mode 100644
index 0000000000..d92b31e613
--- /dev/null
+++ b/library/cpp/on_disk/multi_blob/multiblob.cpp
@@ -0,0 +1,67 @@
+#include <util/generic/yexception.h>
+#include <util/system/align.h>
+
+#include <library/cpp/on_disk/chunks/reader.h>
+
+#include "multiblob.h"
+
+void TSubBlobs::ReadMultiBlob(const TBlob& multi) {
+ if (multi.Size() < sizeof(TMultiBlobHeader)) {
+ ythrow yexception() << "not a blob, too small";
+ }
+
+ Multi = multi;
+ memcpy((void*)&Header, Multi.Data(), sizeof(TMultiBlobHeader));
+
+ if (Header.BlobMetaSig != BLOBMETASIG) {
+ if (Header.BlobRecordSig != TMultiBlobHeader::RecordSig) {
+ if (ReadChunkedData(multi))
+ return;
+ }
+ ythrow yexception() << "is not a blob, MetaSig was read: "
+ << Header.BlobMetaSig
+ << ", must be" << BLOBMETASIG;
+ }
+
+ if (Header.BlobRecordSig != TMultiBlobHeader::RecordSig)
+ ythrow yexception() << "unknown multiblob RecordSig "
+ << Header.BlobRecordSig;
+
+ reserve(size() + Header.Count);
+ if (Header.Flags & EMF_INTERLAY) {
+ size_t pos = Header.HeaderSize();
+ for (size_t i = 0; i < Header.Count; ++i) {
+ pos = AlignUp<ui64>(pos, sizeof(ui64));
+ ui64 size = *((ui64*)((const char*)multi.Data() + pos));
+ pos = AlignUp<ui64>(pos + sizeof(ui64), Header.Align);
+ push_back(multi.SubBlob(pos, pos + size));
+ pos += size;
+ }
+ } else {
+ const ui64* sizes = Header.Sizes(multi.Data());
+ size_t pos = Header.HeaderSize() + Header.Count * sizeof(ui64);
+ for (size_t i = 0; i < Header.Count; ++i) {
+ pos = AlignUp<ui64>(pos, Header.Align);
+ push_back(multi.SubBlob(pos, pos + *sizes));
+ pos += *sizes;
+ sizes++;
+ }
+ }
+}
+
+bool TSubBlobs::ReadChunkedData(const TBlob& multi) noexcept {
+ Multi = multi;
+ memset((void*)&Header, 0, sizeof(Header));
+
+ TChunkedDataReader reader(Multi);
+ Header.Count = reader.GetBlocksCount();
+ resize(GetHeader()->Count);
+ for (size_t i = 0; i < size(); ++i)
+ // We can use TBlob::NoCopy() because of reader.GetBlock(i) returns
+ // address into memory of multi blob.
+ // This knowledge was acquired from implementation of
+ // TChunkedDataReader, so we need care about any changes that.
+ (*this)[i] = TBlob::NoCopy(reader.GetBlock(i), reader.GetBlockLen(i));
+ Header.Flags |= EMF_CHUNKED_DATA_READER;
+ return true;
+}
diff --git a/library/cpp/on_disk/multi_blob/multiblob.h b/library/cpp/on_disk/multi_blob/multiblob.h
new file mode 100644
index 0000000000..b40a5ae6af
--- /dev/null
+++ b/library/cpp/on_disk/multi_blob/multiblob.h
@@ -0,0 +1,77 @@
+#pragma once
+
+#include <util/generic/vector.h>
+#include <util/memory/blob.h>
+
+#define BLOBMETASIG 0x3456789Au
+
+enum E_Multiblob_Flags {
+ // if EMF_INTERLAY is clear
+ // multiblob format
+ // HeaderSize() bytes for TMultiBlobHeader
+ // Count*sizeof(ui64) bytes for blob sizes
+ // blob1
+ // (alignment)
+ // blob2
+ // (alignment)
+ // ...
+ // (alignment)
+ // blobn
+ // if EMF_INTERLAY is set
+ // multiblob format
+ // HeaderSize() bytes for TMultiBlobHeader
+ // size1 ui64, the size of 1st blob
+ // blob1
+ // (alignment)
+ // size2 ui64, the size of 2nd blob
+ // blob2
+ // (alignment)
+ // ...
+ // (alignment)
+ // sizen ui64, the size of n'th blob
+ // blobn
+ EMF_INTERLAY = 1,
+
+ // Means that multiblob contains blocks in TChunkedDataReader format
+ // Legacy, use it only for old files, created for TChunkedDataReader
+ EMF_CHUNKED_DATA_READER = 2,
+
+ // Flags that may be configured for blobbuilder in client code
+ EMF_WRITEABLE = EMF_INTERLAY,
+};
+
+struct TMultiBlobHeader {
+ // data
+ ui32 BlobMetaSig;
+ ui32 BlobRecordSig;
+ ui64 Count; // count of sub blobs
+ ui32 Align; // alignment for every subblob
+ ui32 Flags;
+ static const ui32 RecordSig = 0x23456789;
+ static inline size_t HeaderSize() {
+ return 4 * sizeof(ui64);
+ }
+ inline const ui64* Sizes(const void* Data) const {
+ return (const ui64*)((const char*)Data + HeaderSize());
+ }
+};
+
+class TSubBlobs: public TVector<TBlob> {
+public:
+ TSubBlobs() {
+ }
+ TSubBlobs(const TBlob& multi) {
+ ReadMultiBlob(multi);
+ }
+ void ReadMultiBlob(const TBlob& multi);
+ const TMultiBlobHeader* GetHeader() const {
+ return (const TMultiBlobHeader*)&Header;
+ }
+
+protected:
+ TMultiBlobHeader Header;
+ TBlob Multi;
+
+private:
+ bool ReadChunkedData(const TBlob& multi) noexcept;
+};
diff --git a/library/cpp/on_disk/multi_blob/multiblob_builder.cpp b/library/cpp/on_disk/multi_blob/multiblob_builder.cpp
new file mode 100644
index 0000000000..44aa4a6c2f
--- /dev/null
+++ b/library/cpp/on_disk/multi_blob/multiblob_builder.cpp
@@ -0,0 +1,146 @@
+#include <util/memory/tempbuf.h>
+#include <util/system/align.h>
+
+#include "multiblob_builder.h"
+
+/*
+ * TBlobSaverMemory
+ */
+TBlobSaverMemory::TBlobSaverMemory(const void* ptr, size_t size)
+ : Blob(TBlob::NoCopy(ptr, size))
+{
+}
+
+TBlobSaverMemory::TBlobSaverMemory(const TBlob& blob)
+ : Blob(blob)
+{
+}
+
+void TBlobSaverMemory::Save(IOutputStream& output, ui32 /*flags*/) {
+ output.Write((void*)Blob.Data(), Blob.Length());
+}
+
+size_t TBlobSaverMemory::GetLength() {
+ return Blob.Length();
+}
+
+/*
+ * TBlobSaverFile
+ */
+
+TBlobSaverFile::TBlobSaverFile(TFile file)
+ : File(file)
+{
+ Y_ASSERT(File.IsOpen());
+}
+
+TBlobSaverFile::TBlobSaverFile(const char* filename, EOpenMode oMode)
+ : File(filename, oMode)
+{
+ Y_ASSERT(File.IsOpen());
+}
+
+void TBlobSaverFile::Save(IOutputStream& output, ui32 /*flags*/) {
+ TTempBuf buffer(1 << 20);
+ while (size_t size = File.Read((void*)buffer.Data(), buffer.Size()))
+ output.Write((void*)buffer.Data(), size);
+}
+
+size_t TBlobSaverFile::GetLength() {
+ return File.GetLength();
+}
+
+/*
+ * TMultiBlobBuilder
+ */
+
+TMultiBlobBuilder::TMultiBlobBuilder(bool isOwn)
+ : IsOwner(isOwn)
+{
+}
+
+TMultiBlobBuilder::~TMultiBlobBuilder() {
+ if (IsOwner)
+ DeleteSubBlobs();
+}
+
+namespace {
+ ui64 PadToAlign(IOutputStream& output, ui64 fromPos, ui32 align) {
+ ui64 toPos = AlignUp<ui64>(fromPos, align);
+ for (; fromPos < toPos; ++fromPos) {
+ output << (char)0;
+ }
+ return toPos;
+ }
+}
+
+void TMultiBlobBuilder::Save(IOutputStream& output, ui32 flags) {
+ TMultiBlobHeader header;
+ memset((void*)&header, 0, sizeof(header));
+ header.BlobMetaSig = BLOBMETASIG;
+ header.BlobRecordSig = TMultiBlobHeader::RecordSig;
+ header.Count = Blobs.size();
+ header.Align = ALIGN;
+ header.Flags = flags & EMF_WRITEABLE;
+ output.Write((void*)&header, sizeof(header));
+ for (size_t i = sizeof(header); i < header.HeaderSize(); ++i)
+ output << (char)0;
+ ui64 pos = header.HeaderSize();
+ if (header.Flags & EMF_INTERLAY) {
+ for (size_t i = 0; i < Blobs.size(); ++i) {
+ ui64 size = Blobs[i]->GetLength();
+ pos = PadToAlign(output, pos, sizeof(ui64)); // Align size record
+ output.Write((void*)&size, sizeof(ui64));
+ pos = PadToAlign(output, pos + sizeof(ui64), header.Align); // Align blob
+ Blobs[i]->Save(output, header.Flags);
+ pos += size;
+ }
+ } else {
+ for (size_t i = 0; i < Blobs.size(); ++i) {
+ ui64 size = Blobs[i]->GetLength();
+ output.Write((void*)&size, sizeof(ui64));
+ }
+ pos += Blobs.size() * sizeof(ui64);
+ for (size_t i = 0; i < Blobs.size(); ++i) {
+ pos = PadToAlign(output, pos, header.Align);
+ Blobs[i]->Save(output, header.Flags);
+ pos += Blobs[i]->GetLength();
+ }
+ }
+ // Compensate for imprecise size
+ for (ui64 len = GetLength(); pos < len; ++pos) {
+ output << (char)0;
+ }
+}
+
+size_t TMultiBlobBuilder::GetLength() {
+ // Sizes may be diferent with and without EMF_INTERLAY, so choose greater of 2
+ size_t resNonInter = TMultiBlobHeader::HeaderSize() + Blobs.size() * sizeof(ui64);
+ size_t resInterlay = TMultiBlobHeader::HeaderSize();
+ for (size_t i = 0; i < Blobs.size(); ++i) {
+ resInterlay = AlignUp<ui64>(resInterlay, sizeof(ui64)) + sizeof(ui64);
+ resInterlay = AlignUp<ui64>(resInterlay, ALIGN) + Blobs[i]->GetLength();
+ resNonInter = AlignUp<ui64>(resNonInter, ALIGN) + Blobs[i]->GetLength();
+ }
+ resInterlay = AlignUp<ui64>(resInterlay, ALIGN);
+ resNonInter = AlignUp<ui64>(resNonInter, ALIGN);
+ return Max(resNonInter, resInterlay);
+}
+
+TMultiBlobBuilder::TSavers& TMultiBlobBuilder::GetBlobs() {
+ return Blobs;
+}
+
+const TMultiBlobBuilder::TSavers& TMultiBlobBuilder::GetBlobs() const {
+ return Blobs;
+}
+
+void TMultiBlobBuilder::AddBlob(IBlobSaverBase* blob) {
+ Blobs.push_back(blob);
+}
+
+void TMultiBlobBuilder::DeleteSubBlobs() {
+ for (size_t i = 0; i < Blobs.size(); ++i)
+ delete Blobs[i];
+ Blobs.clear();
+}
diff --git a/library/cpp/on_disk/multi_blob/multiblob_builder.h b/library/cpp/on_disk/multi_blob/multiblob_builder.h
new file mode 100644
index 0000000000..a8e3c6d35e
--- /dev/null
+++ b/library/cpp/on_disk/multi_blob/multiblob_builder.h
@@ -0,0 +1,64 @@
+#pragma once
+
+#include <util/system/align.h>
+#include <util/stream/output.h>
+#include <util/stream/file.h>
+#include <util/draft/holder_vector.h>
+
+#include "multiblob.h"
+
+class IBlobSaverBase {
+public:
+ virtual ~IBlobSaverBase() {
+ }
+ virtual void Save(IOutputStream& output, ui32 flags = 0) = 0;
+ virtual size_t GetLength() = 0;
+};
+
+inline void MultiBlobSave(IOutputStream& output, IBlobSaverBase& saver) {
+ saver.Save(output);
+}
+
+class TBlobSaverMemory: public IBlobSaverBase {
+public:
+ TBlobSaverMemory(const void* ptr, size_t size);
+ TBlobSaverMemory(const TBlob& blob);
+ void Save(IOutputStream& output, ui32 flags = 0) override;
+ size_t GetLength() override;
+
+private:
+ TBlob Blob;
+};
+
+class TBlobSaverFile: public IBlobSaverBase {
+public:
+ TBlobSaverFile(TFile file);
+ TBlobSaverFile(const char* filename, EOpenMode oMode = RdOnly);
+ void Save(IOutputStream& output, ui32 flags = 0) override;
+ size_t GetLength() override;
+
+protected:
+ TFile File;
+};
+
+class TMultiBlobBuilder: public IBlobSaverBase {
+protected:
+ // Data will be stored with default alignment DEVTOOLS-4548
+ static const size_t ALIGN = 16;
+
+public:
+ typedef TVector<IBlobSaverBase*> TSavers;
+
+ TMultiBlobBuilder(bool isOwn = true);
+ ~TMultiBlobBuilder() override;
+ void Save(IOutputStream& output, ui32 flags = 0) override;
+ size_t GetLength() override;
+ TSavers& GetBlobs();
+ const TSavers& GetBlobs() const;
+ void AddBlob(IBlobSaverBase* blob);
+ void DeleteSubBlobs();
+
+protected:
+ TSavers Blobs;
+ bool IsOwner;
+};
diff --git a/library/cpp/on_disk/multi_blob/ya.make b/library/cpp/on_disk/multi_blob/ya.make
new file mode 100644
index 0000000000..50615fc901
--- /dev/null
+++ b/library/cpp/on_disk/multi_blob/ya.make
@@ -0,0 +1,13 @@
+LIBRARY()
+
+SRCS(
+ multiblob.cpp
+ multiblob_builder.cpp
+)
+
+PEERDIR(
+ library/cpp/on_disk/chunks
+ util/draft
+)
+
+END()