diff options
author | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-30 13:26:22 +0300 |
---|---|---|
committer | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-30 15:44:45 +0300 |
commit | 0a98fece5a9b54f16afeb3a94b3eb3105e9c3962 (patch) | |
tree | 291d72dbd7e9865399f668c84d11ed86fb190bbf /library/cpp/on_disk/multi_blob | |
parent | cb2c8d75065e5b3c47094067cb4aa407d4813298 (diff) | |
download | ydb-0a98fece5a9b54f16afeb3a94b3eb3105e9c3962.tar.gz |
YQ Connector:Use docker-compose in integrational tests
Diffstat (limited to 'library/cpp/on_disk/multi_blob')
-rw-r--r-- | library/cpp/on_disk/multi_blob/multiblob.cpp | 67 | ||||
-rw-r--r-- | library/cpp/on_disk/multi_blob/multiblob.h | 77 | ||||
-rw-r--r-- | library/cpp/on_disk/multi_blob/multiblob_builder.cpp | 146 | ||||
-rw-r--r-- | library/cpp/on_disk/multi_blob/multiblob_builder.h | 64 | ||||
-rw-r--r-- | library/cpp/on_disk/multi_blob/ya.make | 13 |
5 files changed, 367 insertions, 0 deletions
diff --git a/library/cpp/on_disk/multi_blob/multiblob.cpp b/library/cpp/on_disk/multi_blob/multiblob.cpp new file mode 100644 index 0000000000..d92b31e613 --- /dev/null +++ b/library/cpp/on_disk/multi_blob/multiblob.cpp @@ -0,0 +1,67 @@ +#include <util/generic/yexception.h> +#include <util/system/align.h> + +#include <library/cpp/on_disk/chunks/reader.h> + +#include "multiblob.h" + +void TSubBlobs::ReadMultiBlob(const TBlob& multi) { + if (multi.Size() < sizeof(TMultiBlobHeader)) { + ythrow yexception() << "not a blob, too small"; + } + + Multi = multi; + memcpy((void*)&Header, Multi.Data(), sizeof(TMultiBlobHeader)); + + if (Header.BlobMetaSig != BLOBMETASIG) { + if (Header.BlobRecordSig != TMultiBlobHeader::RecordSig) { + if (ReadChunkedData(multi)) + return; + } + ythrow yexception() << "is not a blob, MetaSig was read: " + << Header.BlobMetaSig + << ", must be" << BLOBMETASIG; + } + + if (Header.BlobRecordSig != TMultiBlobHeader::RecordSig) + ythrow yexception() << "unknown multiblob RecordSig " + << Header.BlobRecordSig; + + reserve(size() + Header.Count); + if (Header.Flags & EMF_INTERLAY) { + size_t pos = Header.HeaderSize(); + for (size_t i = 0; i < Header.Count; ++i) { + pos = AlignUp<ui64>(pos, sizeof(ui64)); + ui64 size = *((ui64*)((const char*)multi.Data() + pos)); + pos = AlignUp<ui64>(pos + sizeof(ui64), Header.Align); + push_back(multi.SubBlob(pos, pos + size)); + pos += size; + } + } else { + const ui64* sizes = Header.Sizes(multi.Data()); + size_t pos = Header.HeaderSize() + Header.Count * sizeof(ui64); + for (size_t i = 0; i < Header.Count; ++i) { + pos = AlignUp<ui64>(pos, Header.Align); + push_back(multi.SubBlob(pos, pos + *sizes)); + pos += *sizes; + sizes++; + } + } +} + +bool TSubBlobs::ReadChunkedData(const TBlob& multi) noexcept { + Multi = multi; + memset((void*)&Header, 0, sizeof(Header)); + + TChunkedDataReader reader(Multi); + Header.Count = reader.GetBlocksCount(); + resize(GetHeader()->Count); + for (size_t i = 0; i < size(); ++i) + // We can use TBlob::NoCopy() because of reader.GetBlock(i) returns + // address into memory of multi blob. + // This knowledge was acquired from implementation of + // TChunkedDataReader, so we need care about any changes that. + (*this)[i] = TBlob::NoCopy(reader.GetBlock(i), reader.GetBlockLen(i)); + Header.Flags |= EMF_CHUNKED_DATA_READER; + return true; +} diff --git a/library/cpp/on_disk/multi_blob/multiblob.h b/library/cpp/on_disk/multi_blob/multiblob.h new file mode 100644 index 0000000000..b40a5ae6af --- /dev/null +++ b/library/cpp/on_disk/multi_blob/multiblob.h @@ -0,0 +1,77 @@ +#pragma once + +#include <util/generic/vector.h> +#include <util/memory/blob.h> + +#define BLOBMETASIG 0x3456789Au + +enum E_Multiblob_Flags { + // if EMF_INTERLAY is clear + // multiblob format + // HeaderSize() bytes for TMultiBlobHeader + // Count*sizeof(ui64) bytes for blob sizes + // blob1 + // (alignment) + // blob2 + // (alignment) + // ... + // (alignment) + // blobn + // if EMF_INTERLAY is set + // multiblob format + // HeaderSize() bytes for TMultiBlobHeader + // size1 ui64, the size of 1st blob + // blob1 + // (alignment) + // size2 ui64, the size of 2nd blob + // blob2 + // (alignment) + // ... + // (alignment) + // sizen ui64, the size of n'th blob + // blobn + EMF_INTERLAY = 1, + + // Means that multiblob contains blocks in TChunkedDataReader format + // Legacy, use it only for old files, created for TChunkedDataReader + EMF_CHUNKED_DATA_READER = 2, + + // Flags that may be configured for blobbuilder in client code + EMF_WRITEABLE = EMF_INTERLAY, +}; + +struct TMultiBlobHeader { + // data + ui32 BlobMetaSig; + ui32 BlobRecordSig; + ui64 Count; // count of sub blobs + ui32 Align; // alignment for every subblob + ui32 Flags; + static const ui32 RecordSig = 0x23456789; + static inline size_t HeaderSize() { + return 4 * sizeof(ui64); + } + inline const ui64* Sizes(const void* Data) const { + return (const ui64*)((const char*)Data + HeaderSize()); + } +}; + +class TSubBlobs: public TVector<TBlob> { +public: + TSubBlobs() { + } + TSubBlobs(const TBlob& multi) { + ReadMultiBlob(multi); + } + void ReadMultiBlob(const TBlob& multi); + const TMultiBlobHeader* GetHeader() const { + return (const TMultiBlobHeader*)&Header; + } + +protected: + TMultiBlobHeader Header; + TBlob Multi; + +private: + bool ReadChunkedData(const TBlob& multi) noexcept; +}; diff --git a/library/cpp/on_disk/multi_blob/multiblob_builder.cpp b/library/cpp/on_disk/multi_blob/multiblob_builder.cpp new file mode 100644 index 0000000000..44aa4a6c2f --- /dev/null +++ b/library/cpp/on_disk/multi_blob/multiblob_builder.cpp @@ -0,0 +1,146 @@ +#include <util/memory/tempbuf.h> +#include <util/system/align.h> + +#include "multiblob_builder.h" + +/* + * TBlobSaverMemory + */ +TBlobSaverMemory::TBlobSaverMemory(const void* ptr, size_t size) + : Blob(TBlob::NoCopy(ptr, size)) +{ +} + +TBlobSaverMemory::TBlobSaverMemory(const TBlob& blob) + : Blob(blob) +{ +} + +void TBlobSaverMemory::Save(IOutputStream& output, ui32 /*flags*/) { + output.Write((void*)Blob.Data(), Blob.Length()); +} + +size_t TBlobSaverMemory::GetLength() { + return Blob.Length(); +} + +/* + * TBlobSaverFile + */ + +TBlobSaverFile::TBlobSaverFile(TFile file) + : File(file) +{ + Y_ASSERT(File.IsOpen()); +} + +TBlobSaverFile::TBlobSaverFile(const char* filename, EOpenMode oMode) + : File(filename, oMode) +{ + Y_ASSERT(File.IsOpen()); +} + +void TBlobSaverFile::Save(IOutputStream& output, ui32 /*flags*/) { + TTempBuf buffer(1 << 20); + while (size_t size = File.Read((void*)buffer.Data(), buffer.Size())) + output.Write((void*)buffer.Data(), size); +} + +size_t TBlobSaverFile::GetLength() { + return File.GetLength(); +} + +/* + * TMultiBlobBuilder + */ + +TMultiBlobBuilder::TMultiBlobBuilder(bool isOwn) + : IsOwner(isOwn) +{ +} + +TMultiBlobBuilder::~TMultiBlobBuilder() { + if (IsOwner) + DeleteSubBlobs(); +} + +namespace { + ui64 PadToAlign(IOutputStream& output, ui64 fromPos, ui32 align) { + ui64 toPos = AlignUp<ui64>(fromPos, align); + for (; fromPos < toPos; ++fromPos) { + output << (char)0; + } + return toPos; + } +} + +void TMultiBlobBuilder::Save(IOutputStream& output, ui32 flags) { + TMultiBlobHeader header; + memset((void*)&header, 0, sizeof(header)); + header.BlobMetaSig = BLOBMETASIG; + header.BlobRecordSig = TMultiBlobHeader::RecordSig; + header.Count = Blobs.size(); + header.Align = ALIGN; + header.Flags = flags & EMF_WRITEABLE; + output.Write((void*)&header, sizeof(header)); + for (size_t i = sizeof(header); i < header.HeaderSize(); ++i) + output << (char)0; + ui64 pos = header.HeaderSize(); + if (header.Flags & EMF_INTERLAY) { + for (size_t i = 0; i < Blobs.size(); ++i) { + ui64 size = Blobs[i]->GetLength(); + pos = PadToAlign(output, pos, sizeof(ui64)); // Align size record + output.Write((void*)&size, sizeof(ui64)); + pos = PadToAlign(output, pos + sizeof(ui64), header.Align); // Align blob + Blobs[i]->Save(output, header.Flags); + pos += size; + } + } else { + for (size_t i = 0; i < Blobs.size(); ++i) { + ui64 size = Blobs[i]->GetLength(); + output.Write((void*)&size, sizeof(ui64)); + } + pos += Blobs.size() * sizeof(ui64); + for (size_t i = 0; i < Blobs.size(); ++i) { + pos = PadToAlign(output, pos, header.Align); + Blobs[i]->Save(output, header.Flags); + pos += Blobs[i]->GetLength(); + } + } + // Compensate for imprecise size + for (ui64 len = GetLength(); pos < len; ++pos) { + output << (char)0; + } +} + +size_t TMultiBlobBuilder::GetLength() { + // Sizes may be diferent with and without EMF_INTERLAY, so choose greater of 2 + size_t resNonInter = TMultiBlobHeader::HeaderSize() + Blobs.size() * sizeof(ui64); + size_t resInterlay = TMultiBlobHeader::HeaderSize(); + for (size_t i = 0; i < Blobs.size(); ++i) { + resInterlay = AlignUp<ui64>(resInterlay, sizeof(ui64)) + sizeof(ui64); + resInterlay = AlignUp<ui64>(resInterlay, ALIGN) + Blobs[i]->GetLength(); + resNonInter = AlignUp<ui64>(resNonInter, ALIGN) + Blobs[i]->GetLength(); + } + resInterlay = AlignUp<ui64>(resInterlay, ALIGN); + resNonInter = AlignUp<ui64>(resNonInter, ALIGN); + return Max(resNonInter, resInterlay); +} + +TMultiBlobBuilder::TSavers& TMultiBlobBuilder::GetBlobs() { + return Blobs; +} + +const TMultiBlobBuilder::TSavers& TMultiBlobBuilder::GetBlobs() const { + return Blobs; +} + +void TMultiBlobBuilder::AddBlob(IBlobSaverBase* blob) { + Blobs.push_back(blob); +} + +void TMultiBlobBuilder::DeleteSubBlobs() { + for (size_t i = 0; i < Blobs.size(); ++i) + delete Blobs[i]; + Blobs.clear(); +} diff --git a/library/cpp/on_disk/multi_blob/multiblob_builder.h b/library/cpp/on_disk/multi_blob/multiblob_builder.h new file mode 100644 index 0000000000..a8e3c6d35e --- /dev/null +++ b/library/cpp/on_disk/multi_blob/multiblob_builder.h @@ -0,0 +1,64 @@ +#pragma once + +#include <util/system/align.h> +#include <util/stream/output.h> +#include <util/stream/file.h> +#include <util/draft/holder_vector.h> + +#include "multiblob.h" + +class IBlobSaverBase { +public: + virtual ~IBlobSaverBase() { + } + virtual void Save(IOutputStream& output, ui32 flags = 0) = 0; + virtual size_t GetLength() = 0; +}; + +inline void MultiBlobSave(IOutputStream& output, IBlobSaverBase& saver) { + saver.Save(output); +} + +class TBlobSaverMemory: public IBlobSaverBase { +public: + TBlobSaverMemory(const void* ptr, size_t size); + TBlobSaverMemory(const TBlob& blob); + void Save(IOutputStream& output, ui32 flags = 0) override; + size_t GetLength() override; + +private: + TBlob Blob; +}; + +class TBlobSaverFile: public IBlobSaverBase { +public: + TBlobSaverFile(TFile file); + TBlobSaverFile(const char* filename, EOpenMode oMode = RdOnly); + void Save(IOutputStream& output, ui32 flags = 0) override; + size_t GetLength() override; + +protected: + TFile File; +}; + +class TMultiBlobBuilder: public IBlobSaverBase { +protected: + // Data will be stored with default alignment DEVTOOLS-4548 + static const size_t ALIGN = 16; + +public: + typedef TVector<IBlobSaverBase*> TSavers; + + TMultiBlobBuilder(bool isOwn = true); + ~TMultiBlobBuilder() override; + void Save(IOutputStream& output, ui32 flags = 0) override; + size_t GetLength() override; + TSavers& GetBlobs(); + const TSavers& GetBlobs() const; + void AddBlob(IBlobSaverBase* blob); + void DeleteSubBlobs(); + +protected: + TSavers Blobs; + bool IsOwner; +}; diff --git a/library/cpp/on_disk/multi_blob/ya.make b/library/cpp/on_disk/multi_blob/ya.make new file mode 100644 index 0000000000..50615fc901 --- /dev/null +++ b/library/cpp/on_disk/multi_blob/ya.make @@ -0,0 +1,13 @@ +LIBRARY() + +SRCS( + multiblob.cpp + multiblob_builder.cpp +) + +PEERDIR( + library/cpp/on_disk/chunks + util/draft +) + +END() |