aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/on_disk/chunks
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/on_disk/chunks
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/on_disk/chunks')
-rw-r--r--library/cpp/on_disk/chunks/chunked_helpers.cpp67
-rw-r--r--library/cpp/on_disk/chunks/chunked_helpers.h674
-rw-r--r--library/cpp/on_disk/chunks/chunks_ut.cpp329
-rw-r--r--library/cpp/on_disk/chunks/reader.cpp52
-rw-r--r--library/cpp/on_disk/chunks/reader.h57
-rw-r--r--library/cpp/on_disk/chunks/ut/ya.make9
-rw-r--r--library/cpp/on_disk/chunks/writer.cpp46
-rw-r--r--library/cpp/on_disk/chunks/writer.h57
-rw-r--r--library/cpp/on_disk/chunks/ya.make11
9 files changed, 1302 insertions, 0 deletions
diff --git a/library/cpp/on_disk/chunks/chunked_helpers.cpp b/library/cpp/on_disk/chunks/chunked_helpers.cpp
new file mode 100644
index 0000000000..b7adba2753
--- /dev/null
+++ b/library/cpp/on_disk/chunks/chunked_helpers.cpp
@@ -0,0 +1,67 @@
+#include <util/ysaveload.h>
+
+#include "chunked_helpers.h"
+
+TBlob GetBlock(const TBlob& blob, size_t index) {
+ TChunkedDataReader reader(blob);
+ if (index >= reader.GetBlocksCount())
+ ythrow yexception() << "index " << index << " is >= than block count " << reader.GetBlocksCount();
+ size_t begin = (const char*)reader.GetBlock(index) - (const char*)blob.Data();
+ return blob.SubBlob(begin, begin + reader.GetBlockLen(index));
+}
+
+/*************************** TNamedChunkedDataReader ***************************/
+
+static const char* NamedChunkedDataMagic = "NamedChunkedData";
+
+TNamedChunkedDataReader::TNamedChunkedDataReader(const TBlob& blob)
+ : TChunkedDataReader(blob)
+{
+ if (TChunkedDataReader::GetBlocksCount() < 1)
+ throw yexception() << "Too few blocks";
+
+ size_t block = TChunkedDataReader::GetBlocksCount() - 1;
+ size_t magicLen = strlen(NamedChunkedDataMagic);
+ if (GetBlockLen(block) < magicLen || memcmp(GetBlock(block), NamedChunkedDataMagic, magicLen) != 0)
+ throw yexception() << "Not a valid named chunked data file";
+
+ TMemoryInput input(static_cast<const char*>(GetBlock(block)) + magicLen, GetBlockLen(block) - magicLen);
+ Load(&input, Names);
+
+ size_t index = 0;
+ for (TVector<TString>::const_iterator it = Names.begin(); it != Names.end(); ++it, ++index) {
+ if (!it->empty())
+ NameToIndex[*it] = index;
+ }
+}
+
+/*************************** TNamedChunkedDataWriter ***************************/
+
+TNamedChunkedDataWriter::TNamedChunkedDataWriter(IOutputStream& slave)
+ : TChunkedDataWriter(slave)
+{
+}
+
+TNamedChunkedDataWriter::~TNamedChunkedDataWriter() {
+}
+
+void TNamedChunkedDataWriter::NewBlock() {
+ NewBlock("");
+}
+
+void TNamedChunkedDataWriter::NewBlock(const TString& name) {
+ if (!name.empty()) {
+ if (NameToIndex.count(name) != 0)
+ throw yexception() << "Block name is not unique";
+ NameToIndex[name] = Names.size();
+ }
+ Names.push_back(name);
+ TChunkedDataWriter::NewBlock();
+}
+
+void TNamedChunkedDataWriter::WriteFooter() {
+ NewBlock("");
+ Write(NamedChunkedDataMagic);
+ Save(this, Names);
+ TChunkedDataWriter::WriteFooter();
+}
diff --git a/library/cpp/on_disk/chunks/chunked_helpers.h b/library/cpp/on_disk/chunks/chunked_helpers.h
new file mode 100644
index 0000000000..5fa96afdca
--- /dev/null
+++ b/library/cpp/on_disk/chunks/chunked_helpers.h
@@ -0,0 +1,674 @@
+#pragma once
+
+#include <util/generic/vector.h>
+#include <util/generic/buffer.h>
+#include <util/generic/hash_set.h>
+#include <util/generic/cast.h>
+#include <util/generic/ymath.h>
+#include <util/memory/blob.h>
+#include <util/stream/buffer.h>
+#include <util/stream/mem.h>
+#include <util/system/unaligned_mem.h>
+#include <util/ysaveload.h>
+
+#include "reader.h"
+#include "writer.h"
+
+#include <cmath>
+#include <cstddef>
+
+template <typename T>
+class TYVector {
+private:
+ ui32 Size;
+ const T* Data;
+
+public:
+ TYVector(const TBlob& blob)
+ : Size(IntegerCast<ui32>(ReadUnaligned<ui64>(blob.Data())))
+ , Data((const T*)((const char*)blob.Data() + sizeof(ui64)))
+ {
+ }
+
+ void Get(size_t idx, T& t) const {
+ assert(idx < (size_t)Size);
+ t = ReadUnaligned<T>(Data + idx);
+ }
+
+ const T& At(size_t idx) const {
+ assert(idx < (size_t)Size);
+ return Data[idx];
+ }
+
+ size_t GetSize() const {
+ return Size;
+ }
+
+ size_t RealSize() const {
+ return sizeof(ui64) + Size * sizeof(T);
+ }
+
+ ~TYVector() = default;
+};
+
+template <typename T>
+class TYVectorWriter {
+private:
+ TVector<T> Vector;
+
+public:
+ TYVectorWriter() = default;
+
+ void PushBack(const T& value) {
+ Vector.push_back(value);
+ }
+
+ void Save(IOutputStream& out) const {
+ ui64 uSize = (ui64)Vector.size();
+ out.Write(&uSize, sizeof(uSize));
+ out.Write(Vector.data(), Vector.size() * sizeof(T));
+ }
+
+ const T& At(size_t idx) const {
+ assert(idx < Size());
+ return Vector[idx];
+ }
+
+ T& At(size_t idx) {
+ assert(idx < Size());
+ return Vector[idx];
+ }
+
+ void Clear() {
+ Vector.clear();
+ }
+
+ size_t Size() const {
+ return Vector.size();
+ }
+
+ void Resize(size_t size) {
+ Vector.resize(size);
+ }
+
+ void Resize(size_t size, const T& value) {
+ Vector.resize(size, value);
+ }
+};
+
+template <typename T, bool>
+struct TYVectorG;
+
+template <typename X>
+struct TYVectorG<X, false> {
+ typedef TYVector<X> T;
+};
+
+template <typename X>
+struct TYVectorG<X, true> {
+ typedef TYVectorWriter<X> T;
+};
+
+template <typename T>
+struct TIsMemsetThisWithZeroesSupported {
+ enum {
+ Result = TTypeTraits<T>::IsPod
+ };
+};
+
+#define MEMSET_THIS_WITH_ZEROES_SUPPORTED(type) \
+ template <> \
+ struct TIsMemsetThisWithZeroesSupported<type> { \
+ enum { \
+ Result = true \
+ }; \
+ };
+
+class TPlainHashCommon {
+protected:
+#pragma pack(push, 8)
+ template <typename TKey, typename TValue>
+ class TPackedPair {
+ private:
+ typedef TPackedPair<TKey, TValue> TThis;
+ TKey Key;
+ TValue Value;
+
+ private:
+ static_assert(TIsMemsetThisWithZeroesSupported<TKey>::Result, "expect TIsMemsetThisWithZeroesSupported<TKey>::Result");
+ static_assert(TIsMemsetThisWithZeroesSupported<TValue>::Result, "expect TIsMemsetThisWithZeroesSupported<TValue>::Result");
+
+ /// to aviod uninitialized bytes
+ void Init(const TKey& key, const TValue& value) {
+ memset(static_cast<TThis*>(this), 0, sizeof(TThis));
+ Key = key;
+ Value = value;
+ }
+
+ public:
+ TPackedPair(typename TTypeTraits<TKey>::TFuncParam key, typename TTypeTraits<TValue>::TFuncParam value) {
+ Init(key, value);
+ }
+
+ TPackedPair(const TThis& rhs) {
+ Init(rhs.Key, rhs.Value);
+ }
+
+ TPackedPair& operator=(const TThis& rhs) {
+ if (this != &rhs) {
+ Init(rhs.Key, rhs.Value);
+ }
+ return *this;
+ }
+
+ TPackedPair() {
+ Init(TKey(), TValue());
+ }
+
+ typename TTypeTraits<TKey>::TFuncParam First() const {
+ return Key;
+ }
+
+ typename TTypeTraits<TValue>::TFuncParam Second() const {
+ return Value;
+ }
+
+ static TKey GetFirst(const void* self) {
+ static constexpr size_t offset = offsetof(TThis, Key);
+ return ReadUnaligned<TKey>(reinterpret_cast<const char*>(self) + offset);
+ }
+
+ static TValue GetSecond(const void* self) {
+ static constexpr size_t offset = offsetof(TThis, Value);
+ return ReadUnaligned<TValue>(reinterpret_cast<const char*>(self) + offset);
+ }
+ };
+#pragma pack(pop)
+
+protected:
+ static const ui16 VERSION_ID = 2;
+
+#pragma pack(push, 8)
+ struct TInterval {
+ static const ui32 INVALID = (ui32)-1;
+ ui32 Offset;
+ ui32 Length;
+
+ TInterval()
+ : Offset(INVALID)
+ , Length(INVALID)
+ {
+ }
+
+ TInterval(ui32 offset, ui32 length)
+ : Offset(offset)
+ , Length(length)
+ {
+ }
+
+ static inline ui32 GetOffset(const TInterval* self) {
+ static constexpr size_t offset = offsetof(TInterval, Offset);
+ return ReadUnaligned<ui32>(reinterpret_cast<const char*>(self) + offset);
+ }
+
+ static inline ui32 GetLength(const TInterval* self) {
+ static constexpr size_t offset = offsetof(TInterval, Length);
+ return ReadUnaligned<ui32>(reinterpret_cast<const char*>(self) + offset);
+ }
+ };
+#pragma pack(pop)
+ static_assert(8 == sizeof(TInterval), "expect 8 == sizeof(TInterval)");
+
+ template <typename TKey>
+ static ui32 KeyHash(typename TTypeTraits<TKey>::TFuncParam key, ui16 bits) {
+ Y_ASSERT(bits < 32);
+ const ui32 res = ui32(key) & ((ui32(1) << bits) - 1);
+
+ Y_ASSERT(res < (ui32(1) << bits));
+ return res;
+ }
+};
+
+template <typename TKey, typename TValue>
+class TPlainHashWriter : TPlainHashCommon {
+private:
+ typedef TPackedPair<TKey, TValue> TKeyValuePair;
+ typedef TVector<TKeyValuePair> TData;
+ TData Data;
+ typedef TVector<TData> TData2;
+
+ bool IsPlainEnought(ui16 bits) const {
+ TVector<size_t> counts(1LL << bits, 0);
+ for (size_t i = 0; i < Data.size(); ++i) {
+ size_t& count = counts[KeyHash<TKey>(TKeyValuePair::GetFirst(&Data[i]), bits)];
+ ++count;
+ if (count > 2)
+ return false;
+ }
+ return true;
+ }
+
+public:
+ void Add(const TKey& key, const TValue& value) {
+ Data.push_back(TKeyValuePair(key, value));
+ }
+
+ void Save(IOutputStream& out) const {
+ Y_ASSERT(Data.size() < Max<ui32>());
+
+ WriteBin<ui16>(&out, VERSION_ID);
+ static const ui32 PAIR_SIZE = sizeof(TKeyValuePair);
+ WriteBin<ui32>(&out, PAIR_SIZE);
+
+ ui16 bits;
+ if (!Data.empty()) {
+ bits = (ui16)(log((float)Data.size()) / log(2.f));
+ while ((bits < 22) && !IsPlainEnought(bits))
+ ++bits;
+ } else {
+ bits = 0;
+ }
+ WriteBin<ui16>(&out, bits);
+ WriteBin<ui32>(&out, (ui32)Data.size());
+
+ const ui32 nBuckets = ui32(1) << bits;
+ TData2 data2(nBuckets);
+ for (size_t i = 0; i < Data.size(); ++i)
+ data2[KeyHash<TKey>(TKeyValuePair::GetFirst(&Data[i]), bits)].push_back(Data[i]);
+
+ typedef TVector<TInterval> TIntervals;
+ TIntervals intervals(nBuckets);
+ ui32 offset = 0;
+ for (ui32 i = 0; i < nBuckets; ++i) {
+ intervals[i].Offset = offset;
+ intervals[i].Length = (ui32)data2[i].size();
+ offset += (ui32)data2[i].size();
+ }
+#ifndef NDEBUG
+ for (ui32 i = 0; i < nBuckets; ++i) {
+ for (size_t j = 0; j < data2[i].size(); ++j)
+ for (size_t k = j + 1; k < data2[i].size(); ++k)
+ if (TKeyValuePair::GetFirst(&data2[i][j]) == TKeyValuePair::GetFirst(&data2[i][k]))
+ ythrow yexception() << "key clash";
+ }
+#endif
+ out.Write(intervals.data(), intervals.size() * sizeof(intervals[0]));
+ for (ui32 i = 0; i < nBuckets; ++i)
+ out.Write(data2[i].data(), data2[i].size() * sizeof(data2[i][0]));
+ }
+};
+
+template <typename TKey, typename TValue>
+class TPlainHash : TPlainHashCommon {
+private:
+ typedef TPackedPair<TKey, TValue> TKeyValuePair;
+
+ const char* P;
+
+ ui16 GetBits() const {
+ return ReadUnaligned<ui16>(P + 6);
+ }
+
+ ui32 GetSize() const {
+ return ReadUnaligned<ui32>(P + 8);
+ }
+
+ const TInterval* GetIntervals() const {
+ return (const TInterval*)(P + 12);
+ }
+
+ const TKeyValuePair* GetData() const {
+ return (const TKeyValuePair*)(GetIntervals() + (1ULL << GetBits()));
+ }
+
+ template <typename T>
+ void Init(const T* p) {
+ static_assert(sizeof(T) == 1, "expect sizeof(T) == 1");
+ P = reinterpret_cast<const char*>(p);
+#ifndef NDEBUG
+ ui16 version = ReadUnaligned<ui16>(p);
+ if (version != VERSION_ID)
+ ythrow yexception() << "bad version: " << version;
+ static const ui32 PAIR_SIZE = sizeof(TKeyValuePair);
+ const ui32 size = ReadUnaligned<ui32>(p + 2);
+ if (size != PAIR_SIZE)
+ ythrow yexception() << "bad size " << size << " instead of " << PAIR_SIZE;
+#endif
+ }
+
+public:
+ typedef const TKeyValuePair* TConstIterator;
+
+ TPlainHash(const char* p) {
+ Init(p);
+ }
+
+ TPlainHash(const TBlob& blob) {
+ Init(blob.Begin());
+ }
+
+ bool Find(typename TTypeTraits<TKey>::TFuncParam key, TValue* res) const {
+ // Cerr << GetBits() << "\t" << (1 << GetBits()) << "\t" << GetSize() << Endl;
+ const ui32 hash = KeyHash<TKey>(key, GetBits());
+ const TInterval* intervalPtr = GetIntervals();
+ const TKeyValuePair* pair = GetData() + TInterval::GetOffset(intervalPtr + hash);
+ const ui32 length = TInterval::GetLength(intervalPtr + hash);
+ for (ui32 i = 0; i < length; ++i, ++pair) {
+ if (TKeyValuePair::GetFirst(pair) == key) {
+ *res = TKeyValuePair::GetSecond(pair);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ TValue Get(typename TTypeTraits<TKey>::TFuncParam key) const {
+ TValue res;
+ if (Find(key, &res))
+ return res;
+ else
+ ythrow yexception() << "key not found";
+ }
+
+ TConstIterator Begin() const {
+ return GetData();
+ }
+
+ TConstIterator End() const {
+ return GetData() + GetSize();
+ }
+
+ const char* ByteEnd() const {
+ return (const char*)(GetData() + GetSize());
+ }
+
+ size_t ByteSize() const {
+ return 12 + sizeof(TInterval) * (size_t(1) << GetBits()) + sizeof(TKeyValuePair) * GetSize();
+ }
+};
+
+template <typename Key, typename Value, bool>
+struct TPlainHashG;
+
+template <typename Key, typename Value>
+struct TPlainHashG<Key, Value, false> {
+ typedef TPlainHash<Key, Value> T;
+};
+
+template <typename Key, typename Value>
+struct TPlainHashG<Key, Value, true> {
+ typedef TPlainHashWriter<Key, Value> T;
+};
+
+template <typename T>
+class TSingleValue {
+private:
+ const T* Value;
+
+public:
+ TSingleValue(const TBlob& blob) {
+ Y_ASSERT(blob.Length() >= sizeof(T));
+ Y_ASSERT(blob.Length() <= sizeof(T) + 16);
+ Value = reinterpret_cast<const T*>(blob.Begin());
+ }
+
+ const T& Get() const {
+ return *Value;
+ }
+};
+
+template <typename T>
+class TSingleValueWriter {
+private:
+ T Value;
+
+public:
+ TSingleValueWriter() = default;
+
+ TSingleValueWriter(const T& value)
+ : Value(value)
+ {
+ }
+
+ void Set(const T& value) {
+ Value = value;
+ }
+
+ void Save(IOutputStream& out) const {
+ out.Write(&Value, sizeof(Value));
+ }
+};
+
+TBlob GetBlock(const TBlob& data, size_t index);
+
+template <class T>
+void WriteBlock(TChunkedDataWriter& writer, const T& t) {
+ writer.NewBlock();
+ t.Save(writer);
+}
+
+template <class T>
+void WriteBlock(TChunkedDataWriter& writer, T& t) {
+ writer.NewBlock();
+ t.Save(writer);
+}
+
+// Extends TChunkedDataWriter, allowing user to name blocks with arbitrary strings.
+class TNamedChunkedDataWriter: public TChunkedDataWriter {
+public:
+ TNamedChunkedDataWriter(IOutputStream& slave);
+ ~TNamedChunkedDataWriter() override;
+
+ // Start a new unnamed block, overrides TChunkedDataReader::NewBlock().
+ void NewBlock();
+
+ // Start a new block with given name (possibly empty, in which case block is unnamed).
+ // Throws an exception if name is a duplicate.
+ void NewBlock(const TString& name);
+
+ void WriteFooter();
+
+private:
+ TVector<TString> Names;
+ THashMap<TString, size_t> NameToIndex;
+};
+
+class TNamedChunkedDataReader: public TChunkedDataReader {
+public:
+ TNamedChunkedDataReader(const TBlob& blob);
+
+ inline bool HasBlock(const char* name) const {
+ return NameToIndex.find(name) != NameToIndex.end();
+ }
+
+ inline size_t GetIndexByName(const char* name) const {
+ THashMap<TString, size_t>::const_iterator it = NameToIndex.find(name);
+ if (it == NameToIndex.end())
+ throw yexception() << "Block \"" << name << "\" is not found";
+ else
+ return it->second;
+ }
+
+ // Returns number of blocks written to the file by user of TNamedChunkedDataReader.
+ inline size_t GetBlocksCount() const {
+ // Last block is for internal usage
+ return TChunkedDataReader::GetBlocksCount() - 1;
+ }
+
+ inline const char* GetBlockName(size_t index) const {
+ Y_ASSERT(index < GetBlocksCount());
+ return Names[index].data();
+ }
+
+ inline const void* GetBlockByName(const char* name) const {
+ return GetBlock(GetIndexByName(name));
+ }
+
+ inline size_t GetBlockLenByName(const char* name) const {
+ return GetBlockLen(GetIndexByName(name));
+ }
+
+ inline TBlob GetBlobByName(const char* name) const {
+ size_t id = GetIndexByName(name);
+ return TBlob::NoCopy(GetBlock(id), GetBlockLen(id));
+ }
+
+ inline bool GetBlobByName(const char* name, TBlob& blob) const {
+ THashMap<TString, size_t>::const_iterator it = NameToIndex.find(name);
+ if (it == NameToIndex.end())
+ return false;
+ blob = TBlob::NoCopy(GetBlock(it->second), GetBlockLen(it->second));
+ return true;
+ }
+
+private:
+ TVector<TString> Names;
+ THashMap<TString, size_t> NameToIndex;
+};
+
+template <class T>
+struct TSaveLoadVectorNonPodElement {
+ static inline void Save(IOutputStream* out, const T& t) {
+ TSerializer<T>::Save(out, t);
+ }
+
+ static inline void Load(IInputStream* in, T& t, size_t elementSize) {
+ Y_ASSERT(elementSize > 0);
+ TSerializer<T>::Load(in, t);
+ }
+};
+
+template <class T, bool isPod>
+class TVectorTakingIntoAccountThePodType {
+private:
+ ui64 SizeofOffsets;
+ const ui64* Offsets;
+ const char* Data;
+
+public:
+ TVectorTakingIntoAccountThePodType(const TBlob& blob) {
+ SizeofOffsets = ReadUnaligned<ui64>(blob.Begin());
+ Y_ASSERT(SizeofOffsets > 0);
+ Offsets = reinterpret_cast<const ui64*>(blob.Begin() + sizeof(ui64));
+ Data = reinterpret_cast<const char*>(blob.Begin() + sizeof(ui64) + SizeofOffsets * sizeof(ui64));
+ }
+
+ size_t GetSize() const {
+ return (size_t)(SizeofOffsets - 1);
+ }
+
+ size_t GetLength(ui64 index) const {
+ if (index + 1 >= SizeofOffsets)
+ ythrow yexception() << "bad offset";
+ return IntegerCast<size_t>(ReadUnaligned<ui64>(Offsets + index + 1) - ReadUnaligned<ui64>(Offsets + index));
+ }
+
+ void Get(ui64 index, T& t) const {
+ const size_t len = GetLength(index);
+ TMemoryInput input(Data + ReadUnaligned<ui64>(Offsets + index), len);
+ TSaveLoadVectorNonPodElement<T>::Load(&input, t, len);
+ }
+
+ T Get(ui64 index) const {
+ T ret;
+ Get(index, ret);
+ return ret;
+ }
+
+ size_t RealSize() const {
+ return sizeof(ui64) * (SizeofOffsets + 1) + ReadUnaligned<ui64>(Offsets + SizeofOffsets - 1);
+ }
+};
+
+template <class T, bool isPod>
+class TVectorTakingIntoAccountThePodTypeWriter : TNonCopyable {
+private:
+ typedef TVector<ui64> TOffsets;
+ TOffsets Offsets;
+ TBuffer Data;
+ TBufferOutput DataStream;
+
+public:
+ TVectorTakingIntoAccountThePodTypeWriter()
+ : DataStream(Data)
+ {
+ }
+
+ void PushBack(const T& t) {
+ Offsets.push_back((ui64) Data.size());
+ TSaveLoadVectorNonPodElement<T>::Save(&DataStream, t);
+ }
+
+ size_t Size() const {
+ return Offsets.size();
+ }
+
+ void Save(IOutputStream& out) const {
+ ui64 sizeofOffsets = Offsets.size() + 1;
+ out.Write(&sizeofOffsets, sizeof(sizeofOffsets));
+ out.Write(Offsets.data(), Offsets.size() * sizeof(Offsets[0]));
+ ui64 lastOffset = (ui64) Data.size();
+ out.Write(&lastOffset, sizeof(lastOffset));
+ out.Write(Data.data(), Data.size());
+ }
+};
+
+template <class T>
+class TVectorTakingIntoAccountThePodType<T, true>: public TYVector<T> {
+public:
+ TVectorTakingIntoAccountThePodType(const TBlob& blob)
+ : TYVector<T>(blob)
+ {
+ }
+};
+
+template <class T>
+class TVectorTakingIntoAccountThePodTypeWriter<T, true>: public TYVectorWriter<T> {
+};
+
+template <typename T>
+class TGeneralVector: public TVectorTakingIntoAccountThePodType<T, TTypeTraits<T>::IsPod> {
+ typedef TVectorTakingIntoAccountThePodType<T, TTypeTraits<T>::IsPod> TBase;
+
+public:
+ TGeneralVector(const TBlob& blob)
+ : TBase(blob)
+ {
+ }
+};
+
+template <typename T>
+class TGeneralVectorWriter: public TVectorTakingIntoAccountThePodTypeWriter<T, TTypeTraits<T>::IsPod> {
+};
+
+template <typename TItem, bool>
+struct TGeneralVectorG;
+
+template <typename TItem>
+struct TGeneralVectorG<TItem, false> {
+ typedef TGeneralVector<TItem> T;
+};
+
+template <typename TItem>
+struct TGeneralVectorG<TItem, true> {
+ typedef TGeneralVectorWriter<TItem> T;
+};
+
+template <>
+struct TSaveLoadVectorNonPodElement<TString> {
+ static inline void Save(IOutputStream* out, const TString& s) {
+ out->Write(s.data(), s.size() + 1);
+ }
+
+ static inline void Load(TMemoryInput* in, TString& s, size_t elementSize) {
+ Y_ASSERT(elementSize > 0 && in->Avail() >= elementSize);
+ s.assign(in->Buf(), elementSize - 1); /// excluding 0 at the end
+ }
+};
+
+template <bool G>
+struct TStringsVectorG: public TGeneralVectorG<TString, G> {
+};
+
+using TStringsVector = TGeneralVector<TString>;
+using TStringsVectorWriter = TGeneralVectorWriter<TString>;
diff --git a/library/cpp/on_disk/chunks/chunks_ut.cpp b/library/cpp/on_disk/chunks/chunks_ut.cpp
new file mode 100644
index 0000000000..f727647f7f
--- /dev/null
+++ b/library/cpp/on_disk/chunks/chunks_ut.cpp
@@ -0,0 +1,329 @@
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/stream/file.h>
+#include <util/system/filemap.h>
+#include <util/system/tempfile.h>
+
+#include "chunked_helpers.h"
+
+/// Data for TChunkedHelpersTest::TestGeneralVector
+struct TPodStruct {
+ int x;
+ float y;
+ TPodStruct(int _x = 0, float _y = 0)
+ : x(_x)
+ , y(_y)
+ {
+ }
+};
+/// And its serialization
+template <>
+struct TSaveLoadVectorNonPodElement<TPodStruct> {
+ typedef TPodStruct TItem;
+ static inline void Save(IOutputStream* out, const TItem& item) {
+ TSerializer<int>::Save(out, item.x);
+ TSerializer<float>::Save(out, item.y);
+ }
+
+ static inline void Load(IInputStream* in, TItem& item, size_t elementSize) {
+ Y_ASSERT(elementSize == sizeof(TItem));
+ TSerializer<int>::Load(in, item.x);
+ TSerializer<float>::Load(in, item.y);
+ }
+};
+
+class TChunkedHelpersTest: public TTestBase {
+ UNIT_TEST_SUITE(TChunkedHelpersTest);
+ UNIT_TEST(TestHash)
+ UNIT_TEST(TestGeneralVector)
+ UNIT_TEST(TestStrings);
+ UNIT_TEST(TestNamedChunkedData);
+ UNIT_TEST_SUITE_END();
+
+public:
+ void TestHash() {
+ {
+ TBufferStream stream;
+ {
+ TPlainHashWriter<ui64, ui16> writer;
+ writer.Add(5, 7);
+ writer.Save(stream);
+ }
+
+ {
+ TBlob temp = TBlob::FromStreamSingleThreaded(stream);
+ TPlainHash<ui64, ui16> reader(temp);
+ ui16 value = 0;
+ UNIT_ASSERT(reader.Find(5, &value));
+ UNIT_ASSERT_EQUAL(7, value);
+ UNIT_ASSERT(!reader.Find(6, &value));
+ }
+ }
+
+ {
+ TBufferStream stream;
+ int v = 1;
+ wchar16 k = 'a';
+ {
+ TPlainHashWriter<wchar16, void*> writer;
+ writer.Add(k, &v);
+ writer.Save(stream);
+ }
+ {
+ TBlob temp = TBlob::FromStreamSingleThreaded(stream);
+ TPlainHash<wchar16, void*> reader(temp);
+ void* value = nullptr;
+ UNIT_ASSERT(reader.Find(k, &value));
+ UNIT_ASSERT_EQUAL((int*)value, &v);
+ }
+ }
+ }
+
+ void TestGeneralVector() {
+ { /// ui32
+ const size_t N = 3;
+ TBufferStream stream;
+ {
+ TGeneralVectorWriter<ui32> writer;
+ for (size_t i = 0; i < N; ++i)
+ writer.PushBack(i);
+ writer.Save(stream);
+ }
+ {
+ TBlob temp = TBlob::FromStreamSingleThreaded(stream);
+ TGeneralVector<ui32> reader(temp);
+ UNIT_ASSERT_EQUAL(reader.GetSize(), N);
+ for (size_t i = 0; i < N; ++i) {
+ ui32 value;
+ reader.Get(i, value);
+ UNIT_ASSERT_EQUAL(value, i);
+ UNIT_ASSERT_EQUAL(reader.At(i), i);
+ }
+ UNIT_ASSERT_EQUAL(reader.RealSize(), sizeof(ui64) + N * sizeof(ui32));
+ }
+ }
+ { /// TString
+ const size_t N = 4;
+ TBufferStream stream;
+ {
+ TGeneralVectorWriter<TString> writer;
+ for (size_t i = 0; i < N; ++i)
+ writer.PushBack(ToString(i));
+ writer.Save(stream);
+ }
+ {
+ TBlob temp = TBlob::FromStreamSingleThreaded(stream);
+ TGeneralVector<TString> reader(temp);
+ UNIT_ASSERT_EQUAL(reader.GetSize(), N);
+ for (size_t i = 0; i < N; ++i) {
+ TString value;
+ reader.Get(i, value);
+ UNIT_ASSERT_EQUAL(value, ToString(i));
+ UNIT_ASSERT_EQUAL(reader.Get(i), ToString(i));
+ }
+ UNIT_ASSERT_EQUAL(reader.RealSize(), sizeof(ui64) * (N + 2) + N * 2);
+ }
+ }
+ { /// some other struct
+ typedef TPodStruct TItem;
+ const size_t N = 2;
+ TBufferStream stream;
+ {
+ TGeneralVectorWriter<TItem> writer;
+ writer.PushBack(TItem(1, 2));
+ writer.PushBack(TItem(3, 4));
+ writer.Save(stream);
+ }
+ {
+ TBlob temp = TBlob::FromStreamSingleThreaded(stream);
+ TGeneralVector<TItem> reader(temp);
+ UNIT_ASSERT_EQUAL(reader.GetSize(), N);
+
+ TItem value;
+ reader.Get(0, value);
+ UNIT_ASSERT(value.x == 1 && value.y == 2.0);
+
+ reader.Get(1, value);
+ UNIT_ASSERT(value.x == 3 && value.y == 4.0);
+
+ UNIT_ASSERT_EQUAL(reader.RealSize(), sizeof(ui64) * (N + 2) + N * sizeof(TItem));
+ }
+ }
+ { /// pointer
+ const size_t N = 3;
+ TVector<int> data_holder(N);
+ int* a = &(data_holder[0]);
+ TBufferStream stream;
+ {
+ TGeneralVectorWriter<int*> writer;
+ for (size_t i = 0; i < N; ++i) {
+ a[i] = i;
+ writer.PushBack(a + i);
+ }
+ writer.Save(stream);
+ }
+ {
+ TBlob temp = TBlob::FromStreamSingleThreaded(stream);
+ TGeneralVector<int*> reader(temp);
+ UNIT_ASSERT_EQUAL(reader.GetSize(), N);
+ for (size_t i = 0; i < N; ++i) {
+ int* value;
+ reader.Get(i, value);
+ UNIT_ASSERT_EQUAL(value, a + i);
+ UNIT_ASSERT_EQUAL(reader.At(i), a + i);
+ }
+ UNIT_ASSERT_EQUAL(reader.RealSize(), sizeof(ui64) + N * sizeof(int*));
+ }
+ }
+ { /// std::pair<int, int>
+ typedef std::pair<int, int> TItem;
+ const size_t N = 3;
+ TBufferStream stream;
+ {
+ TGeneralVectorWriter<TItem> writer;
+ for (size_t i = 0; i < N; ++i)
+ writer.PushBack(TItem(i, i));
+ writer.Save(stream);
+ }
+ {
+ TBlob temp = TBlob::FromStreamSingleThreaded(stream);
+ TGeneralVector<TItem> reader(temp);
+ UNIT_ASSERT_EQUAL(reader.GetSize(), N);
+ for (size_t i = 0; i < N; ++i) {
+ TItem value;
+ reader.Get(i, value);
+ UNIT_ASSERT_EQUAL(value, TItem(i, i));
+ }
+ UNIT_ASSERT_EQUAL(reader.RealSize(), sizeof(ui64) + N * sizeof(TItem));
+ }
+ }
+ }
+
+ void TestStrings() {
+ const TString FILENAME = "chunked_helpers_test.bin";
+ TTempFileHandle file(FILENAME.c_str());
+
+ {
+ TFixedBufferFileOutput fOut(FILENAME);
+ TStringsVectorWriter stringsWriter;
+ stringsWriter.PushBack("");
+ stringsWriter.PushBack("test");
+ TChunkedDataWriter writer(fOut);
+ WriteBlock(writer, stringsWriter);
+ writer.WriteFooter();
+ }
+
+ {
+ TBlob fIn = TBlob::FromFileSingleThreaded(FILENAME);
+ TStringsVector vct(GetBlock(fIn, 0));
+ UNIT_ASSERT_EQUAL(vct.Get(0), "");
+ UNIT_ASSERT_EQUAL(vct.Get(1), "test");
+
+ bool wasException = false;
+ try {
+ vct.Get(2);
+ } catch (...) {
+ wasException = true;
+ }
+ UNIT_ASSERT(wasException);
+ }
+ }
+
+ void TestNamedChunkedData() {
+ const TString filename = MakeTempName(nullptr, "named_chunked_data_test");
+ TTempFile file(filename);
+
+ {
+ TFixedBufferFileOutput fOut(filename);
+ TNamedChunkedDataWriter writer(fOut);
+
+ writer.NewBlock("alpha");
+ writer.Write("123456");
+
+ writer.NewBlock();
+ writer.Write("anonymous");
+
+ writer.NewBlock("omega");
+ writer.Write("12345678901234567");
+
+ writer.WriteFooter();
+ }
+
+ {
+ TBlob mf = TBlob::FromFileSingleThreaded(filename);
+ TNamedChunkedDataReader reader(mf);
+
+ UNIT_ASSERT(reader.GetBlocksCount() == 3);
+
+ UNIT_ASSERT(reader.HasBlock("alpha"));
+ UNIT_ASSERT(reader.HasBlock("omega"));
+
+ UNIT_ASSERT_STRINGS_EQUAL(reader.GetBlockName(0), "alpha");
+ UNIT_ASSERT_STRINGS_EQUAL(reader.GetBlockName(1), "");
+ UNIT_ASSERT_STRINGS_EQUAL(reader.GetBlockName(2), "omega");
+
+ UNIT_ASSERT_EQUAL(reader.GetBlockLenByName("alpha"), 6); // padding not included
+ UNIT_ASSERT_EQUAL(reader.GetBlockLenByName("omega"), 17);
+
+ UNIT_ASSERT(memcmp(reader.GetBlockByName("alpha"), "123456", 6) == 0);
+ UNIT_ASSERT(memcmp(reader.GetBlock(1), "anonymous", 9) == 0);
+ UNIT_ASSERT(memcmp(reader.GetBlockByName("omega"), "12345678901234567", 17) == 0);
+ }
+ }
+};
+
+UNIT_TEST_SUITE_REGISTRATION(TChunkedHelpersTest);
+
+class TChunkedDataTest: public TTestBase {
+private:
+ UNIT_TEST_SUITE(TChunkedDataTest);
+ UNIT_TEST(Test)
+ UNIT_TEST(TestEmpty)
+ UNIT_TEST_SUITE_END();
+
+ void Test() {
+ TBuffer buffer;
+ {
+ TBufferOutput out(buffer);
+ TChunkedDataWriter writer(out);
+ writer.NewBlock();
+ writer << "test";
+ writer.NewBlock();
+ writer << 4;
+ writer.NewBlock();
+ writer.NewBlock();
+ writer << 1;
+ writer << 2;
+ writer.WriteFooter();
+ }
+ {
+ TBlob blob = TBlob::FromBufferSingleThreaded(buffer);
+ TChunkedDataReader data(blob);
+ // printf("%d\n", (int)data.GetBlockLen(3));
+ UNIT_ASSERT_EQUAL(4, data.GetBlockLen(0));
+ UNIT_ASSERT_EQUAL(1, data.GetBlockLen(1));
+ UNIT_ASSERT_EQUAL(0, data.GetBlockLen(2));
+ UNIT_ASSERT_EQUAL(2, data.GetBlockLen(3));
+ }
+ }
+
+ void TestEmpty() {
+ TBuffer buffer;
+ {
+ TBufferOutput out(buffer);
+ TChunkedDataWriter writer(out);
+ writer.NewBlock();
+ writer.NewBlock();
+ writer.WriteFooter();
+ }
+ {
+ TBlob blob = TBlob::FromBufferSingleThreaded(buffer);
+ TChunkedDataReader data(blob);
+ // printf("%d\n", (int)data.GetBlockLen(1));
+ UNIT_ASSERT_EQUAL(0, data.GetBlockLen(0));
+ UNIT_ASSERT_EQUAL(0, data.GetBlockLen(1));
+ }
+ }
+};
+
+UNIT_TEST_SUITE_REGISTRATION(TChunkedDataTest);
diff --git a/library/cpp/on_disk/chunks/reader.cpp b/library/cpp/on_disk/chunks/reader.cpp
new file mode 100644
index 0000000000..6e28cbf367
--- /dev/null
+++ b/library/cpp/on_disk/chunks/reader.cpp
@@ -0,0 +1,52 @@
+#include <util/generic/cast.h>
+#include <util/memory/blob.h>
+#include <util/system/unaligned_mem.h>
+
+#include "reader.h"
+
+template <typename T>
+static inline void ReadAux(const char* data, T* aux, T count, TVector<const char*>* result) {
+ result->resize(count);
+ for (size_t i = 0; i < count; ++i) {
+ (*result)[i] = data + ReadUnaligned<T>(aux + i);
+ }
+}
+
+TChunkedDataReader::TChunkedDataReader(const TBlob& blob) {
+ const char* cdata = blob.AsCharPtr();
+ const size_t size = blob.Size();
+ Y_ENSURE(size >= sizeof(ui32), "Empty file with chunks. ");
+
+ ui32 last = ReadUnaligned<ui32>((ui32*)(cdata + size) - 1);
+
+ if (last != 0) { // old version file
+ ui32* aux = (ui32*)(cdata + size);
+ ui32 count = last;
+ Size = size - (count + 1) * sizeof(ui32);
+
+ aux -= (count + 1);
+ ReadAux<ui32>(cdata, aux, count, &Offsets);
+ return;
+ }
+
+ Y_ENSURE(size >= 3 * sizeof(ui64), "Blob size must be >= 3 * sizeof(ui64). ");
+
+ ui64* aux = (ui64*)(cdata + size);
+ Version = ReadUnaligned<ui64>(aux - 2);
+ Y_ENSURE(Version > 0, "Invalid chunked array version. ");
+
+ ui64 count = ReadUnaligned<ui64>(aux - 3);
+
+ aux -= (count + 3);
+ ReadAux<ui64>(cdata, aux, count, &Offsets);
+
+ aux -= count;
+ Lengths.resize(count);
+ for (size_t i = 0; i < count; ++i) {
+ Lengths[i] = IntegerCast<size_t>(ReadUnaligned<ui64>(aux + i));
+ }
+}
+
+TBlob TChunkedDataReader::GetBlob(size_t index) const {
+ return TBlob::NoCopy(GetBlock(index), GetBlockLen(index));
+}
diff --git a/library/cpp/on_disk/chunks/reader.h b/library/cpp/on_disk/chunks/reader.h
new file mode 100644
index 0000000000..c5fe783319
--- /dev/null
+++ b/library/cpp/on_disk/chunks/reader.h
@@ -0,0 +1,57 @@
+#pragma once
+
+#include <util/generic/array_ref.h>
+#include <util/generic/vector.h>
+#include <util/generic/yexception.h>
+
+class TBlob;
+
+class TChunkedDataReader {
+public:
+ TChunkedDataReader(const TBlob& blob);
+
+ inline const void* GetBlock(size_t index) const {
+ CheckIndex(index);
+ return Offsets[index];
+ }
+
+ inline size_t GetBlockLen(size_t index) const {
+ CheckIndex(index);
+
+ if (Version == 0) {
+ if (index + 1 < Offsets.size()) {
+ return Offsets[index + 1] - Offsets[index];
+ }
+
+ return Size - (Offsets.back() - Offsets.front());
+ }
+
+ return Lengths[index];
+ }
+
+ TBlob GetBlob(size_t index) const;
+
+ template <typename T>
+ TArrayRef<const T> GetRegion(size_t index) const {
+ size_t len = GetBlockLen(index);
+ Y_ENSURE(len % sizeof(T) == 0, "wrong data padding");
+ return TArrayRef<const T>(reinterpret_cast<const T*>(GetBlock(index)), len / sizeof(T));
+ }
+
+ inline size_t GetBlocksCount() const {
+ return Offsets.size();
+ }
+
+private:
+ inline void CheckIndex(size_t index) const {
+ if (index >= GetBlocksCount()) {
+ ythrow yexception() << "requested block " << index << " of " << GetBlocksCount() << " blocks";
+ }
+ }
+
+private:
+ ui64 Version = 0;
+ TVector<const char*> Offsets;
+ TVector<size_t> Lengths;
+ size_t Size = 0;
+};
diff --git a/library/cpp/on_disk/chunks/ut/ya.make b/library/cpp/on_disk/chunks/ut/ya.make
new file mode 100644
index 0000000000..0190905cbe
--- /dev/null
+++ b/library/cpp/on_disk/chunks/ut/ya.make
@@ -0,0 +1,9 @@
+UNITTEST_FOR(library/cpp/on_disk/chunks)
+
+OWNER(g:util)
+
+SRCS(
+ chunks_ut.cpp
+)
+
+END()
diff --git a/library/cpp/on_disk/chunks/writer.cpp b/library/cpp/on_disk/chunks/writer.cpp
new file mode 100644
index 0000000000..6dc7397f09
--- /dev/null
+++ b/library/cpp/on_disk/chunks/writer.cpp
@@ -0,0 +1,46 @@
+#include <util/ysaveload.h>
+
+#include "writer.h"
+
+static inline void WriteAux(IOutputStream* out, const TVector<ui64>& data) {
+ ::SavePodArray(out, data.data(), data.size());
+}
+
+/*************************** TBuffersWriter ***************************/
+
+TChunkedDataWriter::TChunkedDataWriter(IOutputStream& slave)
+ : Slave(slave)
+ , Offset(0)
+{
+}
+
+TChunkedDataWriter::~TChunkedDataWriter() {
+}
+
+void TChunkedDataWriter::NewBlock() {
+ if (Offsets.size()) {
+ Lengths.push_back(Offset - Offsets.back());
+ }
+
+ Pad(16);
+ Offsets.push_back(Offset);
+}
+
+void TChunkedDataWriter::WriteFooter() {
+ Lengths.push_back(Offset - Offsets.back());
+ WriteAux(this, Lengths);
+ WriteAux(this, Offsets);
+ WriteBinary<ui64>(Offsets.size());
+ WriteBinary<ui64>(Version);
+ WriteBinary<ui64>(0);
+}
+
+size_t TChunkedDataWriter::GetCurrentBlockOffset() const {
+ Y_ASSERT(!Offsets.empty());
+ Y_ASSERT(Offset >= Offsets.back());
+ return Offset - Offsets.back();
+}
+
+size_t TChunkedDataWriter::GetBlockCount() const {
+ return Offsets.size();
+}
diff --git a/library/cpp/on_disk/chunks/writer.h b/library/cpp/on_disk/chunks/writer.h
new file mode 100644
index 0000000000..ab14522bdd
--- /dev/null
+++ b/library/cpp/on_disk/chunks/writer.h
@@ -0,0 +1,57 @@
+#pragma once
+
+#include <util/generic/vector.h>
+#include <util/stream/output.h>
+
+template <typename T>
+inline void WriteBin(IOutputStream* out, typename TTypeTraits<T>::TFuncParam t) {
+ out->Write(&t, sizeof(T));
+}
+
+class TChunkedDataWriter: public IOutputStream {
+public:
+ TChunkedDataWriter(IOutputStream& slave);
+ ~TChunkedDataWriter() override;
+
+ void NewBlock();
+
+ template <typename T>
+ inline void WriteBinary(typename TTypeTraits<T>::TFuncParam t) {
+ this->Write(&t, sizeof(T));
+ }
+
+ void WriteFooter();
+
+ size_t GetCurrentBlockOffset() const;
+ size_t GetBlockCount() const;
+
+protected:
+ void DoWrite(const void* buf, size_t len) override {
+ Slave.Write(buf, len);
+ Offset += len;
+ }
+
+private:
+ static inline size_t PaddingSize(size_t size, size_t boundary) noexcept {
+ const size_t boundaryViolation = size % boundary;
+
+ return boundaryViolation == 0 ? 0 : boundary - boundaryViolation;
+ }
+
+ inline void Pad(size_t boundary) {
+ const size_t newOffset = Offset + PaddingSize(Offset, boundary);
+
+ while (Offset < newOffset) {
+ Write('\0');
+ }
+ }
+
+private:
+ static const ui64 Version = 1;
+
+ IOutputStream& Slave;
+
+ size_t Offset;
+ TVector<ui64> Offsets;
+ TVector<ui64> Lengths;
+};
diff --git a/library/cpp/on_disk/chunks/ya.make b/library/cpp/on_disk/chunks/ya.make
new file mode 100644
index 0000000000..acb52df5b0
--- /dev/null
+++ b/library/cpp/on_disk/chunks/ya.make
@@ -0,0 +1,11 @@
+LIBRARY()
+
+OWNER(g:util)
+
+SRCS(
+ chunked_helpers.cpp
+ reader.cpp
+ writer.cpp
+)
+
+END()