diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/on_disk/chunks | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/on_disk/chunks')
-rw-r--r-- | library/cpp/on_disk/chunks/chunked_helpers.cpp | 67 | ||||
-rw-r--r-- | library/cpp/on_disk/chunks/chunked_helpers.h | 674 | ||||
-rw-r--r-- | library/cpp/on_disk/chunks/chunks_ut.cpp | 329 | ||||
-rw-r--r-- | library/cpp/on_disk/chunks/reader.cpp | 52 | ||||
-rw-r--r-- | library/cpp/on_disk/chunks/reader.h | 57 | ||||
-rw-r--r-- | library/cpp/on_disk/chunks/ut/ya.make | 9 | ||||
-rw-r--r-- | library/cpp/on_disk/chunks/writer.cpp | 46 | ||||
-rw-r--r-- | library/cpp/on_disk/chunks/writer.h | 57 | ||||
-rw-r--r-- | library/cpp/on_disk/chunks/ya.make | 11 |
9 files changed, 1302 insertions, 0 deletions
diff --git a/library/cpp/on_disk/chunks/chunked_helpers.cpp b/library/cpp/on_disk/chunks/chunked_helpers.cpp new file mode 100644 index 0000000000..b7adba2753 --- /dev/null +++ b/library/cpp/on_disk/chunks/chunked_helpers.cpp @@ -0,0 +1,67 @@ +#include <util/ysaveload.h> + +#include "chunked_helpers.h" + +TBlob GetBlock(const TBlob& blob, size_t index) { + TChunkedDataReader reader(blob); + if (index >= reader.GetBlocksCount()) + ythrow yexception() << "index " << index << " is >= than block count " << reader.GetBlocksCount(); + size_t begin = (const char*)reader.GetBlock(index) - (const char*)blob.Data(); + return blob.SubBlob(begin, begin + reader.GetBlockLen(index)); +} + +/*************************** TNamedChunkedDataReader ***************************/ + +static const char* NamedChunkedDataMagic = "NamedChunkedData"; + +TNamedChunkedDataReader::TNamedChunkedDataReader(const TBlob& blob) + : TChunkedDataReader(blob) +{ + if (TChunkedDataReader::GetBlocksCount() < 1) + throw yexception() << "Too few blocks"; + + size_t block = TChunkedDataReader::GetBlocksCount() - 1; + size_t magicLen = strlen(NamedChunkedDataMagic); + if (GetBlockLen(block) < magicLen || memcmp(GetBlock(block), NamedChunkedDataMagic, magicLen) != 0) + throw yexception() << "Not a valid named chunked data file"; + + TMemoryInput input(static_cast<const char*>(GetBlock(block)) + magicLen, GetBlockLen(block) - magicLen); + Load(&input, Names); + + size_t index = 0; + for (TVector<TString>::const_iterator it = Names.begin(); it != Names.end(); ++it, ++index) { + if (!it->empty()) + NameToIndex[*it] = index; + } +} + +/*************************** TNamedChunkedDataWriter ***************************/ + +TNamedChunkedDataWriter::TNamedChunkedDataWriter(IOutputStream& slave) + : TChunkedDataWriter(slave) +{ +} + +TNamedChunkedDataWriter::~TNamedChunkedDataWriter() { +} + +void TNamedChunkedDataWriter::NewBlock() { + NewBlock(""); +} + +void TNamedChunkedDataWriter::NewBlock(const TString& name) { + if (!name.empty()) { + if (NameToIndex.count(name) != 0) + throw yexception() << "Block name is not unique"; + NameToIndex[name] = Names.size(); + } + Names.push_back(name); + TChunkedDataWriter::NewBlock(); +} + +void TNamedChunkedDataWriter::WriteFooter() { + NewBlock(""); + Write(NamedChunkedDataMagic); + Save(this, Names); + TChunkedDataWriter::WriteFooter(); +} diff --git a/library/cpp/on_disk/chunks/chunked_helpers.h b/library/cpp/on_disk/chunks/chunked_helpers.h new file mode 100644 index 0000000000..5fa96afdca --- /dev/null +++ b/library/cpp/on_disk/chunks/chunked_helpers.h @@ -0,0 +1,674 @@ +#pragma once + +#include <util/generic/vector.h> +#include <util/generic/buffer.h> +#include <util/generic/hash_set.h> +#include <util/generic/cast.h> +#include <util/generic/ymath.h> +#include <util/memory/blob.h> +#include <util/stream/buffer.h> +#include <util/stream/mem.h> +#include <util/system/unaligned_mem.h> +#include <util/ysaveload.h> + +#include "reader.h" +#include "writer.h" + +#include <cmath> +#include <cstddef> + +template <typename T> +class TYVector { +private: + ui32 Size; + const T* Data; + +public: + TYVector(const TBlob& blob) + : Size(IntegerCast<ui32>(ReadUnaligned<ui64>(blob.Data()))) + , Data((const T*)((const char*)blob.Data() + sizeof(ui64))) + { + } + + void Get(size_t idx, T& t) const { + assert(idx < (size_t)Size); + t = ReadUnaligned<T>(Data + idx); + } + + const T& At(size_t idx) const { + assert(idx < (size_t)Size); + return Data[idx]; + } + + size_t GetSize() const { + return Size; + } + + size_t RealSize() const { + return sizeof(ui64) + Size * sizeof(T); + } + + ~TYVector() = default; +}; + +template <typename T> +class TYVectorWriter { +private: + TVector<T> Vector; + +public: + TYVectorWriter() = default; + + void PushBack(const T& value) { + Vector.push_back(value); + } + + void Save(IOutputStream& out) const { + ui64 uSize = (ui64)Vector.size(); + out.Write(&uSize, sizeof(uSize)); + out.Write(Vector.data(), Vector.size() * sizeof(T)); + } + + const T& At(size_t idx) const { + assert(idx < Size()); + return Vector[idx]; + } + + T& At(size_t idx) { + assert(idx < Size()); + return Vector[idx]; + } + + void Clear() { + Vector.clear(); + } + + size_t Size() const { + return Vector.size(); + } + + void Resize(size_t size) { + Vector.resize(size); + } + + void Resize(size_t size, const T& value) { + Vector.resize(size, value); + } +}; + +template <typename T, bool> +struct TYVectorG; + +template <typename X> +struct TYVectorG<X, false> { + typedef TYVector<X> T; +}; + +template <typename X> +struct TYVectorG<X, true> { + typedef TYVectorWriter<X> T; +}; + +template <typename T> +struct TIsMemsetThisWithZeroesSupported { + enum { + Result = TTypeTraits<T>::IsPod + }; +}; + +#define MEMSET_THIS_WITH_ZEROES_SUPPORTED(type) \ + template <> \ + struct TIsMemsetThisWithZeroesSupported<type> { \ + enum { \ + Result = true \ + }; \ + }; + +class TPlainHashCommon { +protected: +#pragma pack(push, 8) + template <typename TKey, typename TValue> + class TPackedPair { + private: + typedef TPackedPair<TKey, TValue> TThis; + TKey Key; + TValue Value; + + private: + static_assert(TIsMemsetThisWithZeroesSupported<TKey>::Result, "expect TIsMemsetThisWithZeroesSupported<TKey>::Result"); + static_assert(TIsMemsetThisWithZeroesSupported<TValue>::Result, "expect TIsMemsetThisWithZeroesSupported<TValue>::Result"); + + /// to aviod uninitialized bytes + void Init(const TKey& key, const TValue& value) { + memset(static_cast<TThis*>(this), 0, sizeof(TThis)); + Key = key; + Value = value; + } + + public: + TPackedPair(typename TTypeTraits<TKey>::TFuncParam key, typename TTypeTraits<TValue>::TFuncParam value) { + Init(key, value); + } + + TPackedPair(const TThis& rhs) { + Init(rhs.Key, rhs.Value); + } + + TPackedPair& operator=(const TThis& rhs) { + if (this != &rhs) { + Init(rhs.Key, rhs.Value); + } + return *this; + } + + TPackedPair() { + Init(TKey(), TValue()); + } + + typename TTypeTraits<TKey>::TFuncParam First() const { + return Key; + } + + typename TTypeTraits<TValue>::TFuncParam Second() const { + return Value; + } + + static TKey GetFirst(const void* self) { + static constexpr size_t offset = offsetof(TThis, Key); + return ReadUnaligned<TKey>(reinterpret_cast<const char*>(self) + offset); + } + + static TValue GetSecond(const void* self) { + static constexpr size_t offset = offsetof(TThis, Value); + return ReadUnaligned<TValue>(reinterpret_cast<const char*>(self) + offset); + } + }; +#pragma pack(pop) + +protected: + static const ui16 VERSION_ID = 2; + +#pragma pack(push, 8) + struct TInterval { + static const ui32 INVALID = (ui32)-1; + ui32 Offset; + ui32 Length; + + TInterval() + : Offset(INVALID) + , Length(INVALID) + { + } + + TInterval(ui32 offset, ui32 length) + : Offset(offset) + , Length(length) + { + } + + static inline ui32 GetOffset(const TInterval* self) { + static constexpr size_t offset = offsetof(TInterval, Offset); + return ReadUnaligned<ui32>(reinterpret_cast<const char*>(self) + offset); + } + + static inline ui32 GetLength(const TInterval* self) { + static constexpr size_t offset = offsetof(TInterval, Length); + return ReadUnaligned<ui32>(reinterpret_cast<const char*>(self) + offset); + } + }; +#pragma pack(pop) + static_assert(8 == sizeof(TInterval), "expect 8 == sizeof(TInterval)"); + + template <typename TKey> + static ui32 KeyHash(typename TTypeTraits<TKey>::TFuncParam key, ui16 bits) { + Y_ASSERT(bits < 32); + const ui32 res = ui32(key) & ((ui32(1) << bits) - 1); + + Y_ASSERT(res < (ui32(1) << bits)); + return res; + } +}; + +template <typename TKey, typename TValue> +class TPlainHashWriter : TPlainHashCommon { +private: + typedef TPackedPair<TKey, TValue> TKeyValuePair; + typedef TVector<TKeyValuePair> TData; + TData Data; + typedef TVector<TData> TData2; + + bool IsPlainEnought(ui16 bits) const { + TVector<size_t> counts(1LL << bits, 0); + for (size_t i = 0; i < Data.size(); ++i) { + size_t& count = counts[KeyHash<TKey>(TKeyValuePair::GetFirst(&Data[i]), bits)]; + ++count; + if (count > 2) + return false; + } + return true; + } + +public: + void Add(const TKey& key, const TValue& value) { + Data.push_back(TKeyValuePair(key, value)); + } + + void Save(IOutputStream& out) const { + Y_ASSERT(Data.size() < Max<ui32>()); + + WriteBin<ui16>(&out, VERSION_ID); + static const ui32 PAIR_SIZE = sizeof(TKeyValuePair); + WriteBin<ui32>(&out, PAIR_SIZE); + + ui16 bits; + if (!Data.empty()) { + bits = (ui16)(log((float)Data.size()) / log(2.f)); + while ((bits < 22) && !IsPlainEnought(bits)) + ++bits; + } else { + bits = 0; + } + WriteBin<ui16>(&out, bits); + WriteBin<ui32>(&out, (ui32)Data.size()); + + const ui32 nBuckets = ui32(1) << bits; + TData2 data2(nBuckets); + for (size_t i = 0; i < Data.size(); ++i) + data2[KeyHash<TKey>(TKeyValuePair::GetFirst(&Data[i]), bits)].push_back(Data[i]); + + typedef TVector<TInterval> TIntervals; + TIntervals intervals(nBuckets); + ui32 offset = 0; + for (ui32 i = 0; i < nBuckets; ++i) { + intervals[i].Offset = offset; + intervals[i].Length = (ui32)data2[i].size(); + offset += (ui32)data2[i].size(); + } +#ifndef NDEBUG + for (ui32 i = 0; i < nBuckets; ++i) { + for (size_t j = 0; j < data2[i].size(); ++j) + for (size_t k = j + 1; k < data2[i].size(); ++k) + if (TKeyValuePair::GetFirst(&data2[i][j]) == TKeyValuePair::GetFirst(&data2[i][k])) + ythrow yexception() << "key clash"; + } +#endif + out.Write(intervals.data(), intervals.size() * sizeof(intervals[0])); + for (ui32 i = 0; i < nBuckets; ++i) + out.Write(data2[i].data(), data2[i].size() * sizeof(data2[i][0])); + } +}; + +template <typename TKey, typename TValue> +class TPlainHash : TPlainHashCommon { +private: + typedef TPackedPair<TKey, TValue> TKeyValuePair; + + const char* P; + + ui16 GetBits() const { + return ReadUnaligned<ui16>(P + 6); + } + + ui32 GetSize() const { + return ReadUnaligned<ui32>(P + 8); + } + + const TInterval* GetIntervals() const { + return (const TInterval*)(P + 12); + } + + const TKeyValuePair* GetData() const { + return (const TKeyValuePair*)(GetIntervals() + (1ULL << GetBits())); + } + + template <typename T> + void Init(const T* p) { + static_assert(sizeof(T) == 1, "expect sizeof(T) == 1"); + P = reinterpret_cast<const char*>(p); +#ifndef NDEBUG + ui16 version = ReadUnaligned<ui16>(p); + if (version != VERSION_ID) + ythrow yexception() << "bad version: " << version; + static const ui32 PAIR_SIZE = sizeof(TKeyValuePair); + const ui32 size = ReadUnaligned<ui32>(p + 2); + if (size != PAIR_SIZE) + ythrow yexception() << "bad size " << size << " instead of " << PAIR_SIZE; +#endif + } + +public: + typedef const TKeyValuePair* TConstIterator; + + TPlainHash(const char* p) { + Init(p); + } + + TPlainHash(const TBlob& blob) { + Init(blob.Begin()); + } + + bool Find(typename TTypeTraits<TKey>::TFuncParam key, TValue* res) const { + // Cerr << GetBits() << "\t" << (1 << GetBits()) << "\t" << GetSize() << Endl; + const ui32 hash = KeyHash<TKey>(key, GetBits()); + const TInterval* intervalPtr = GetIntervals(); + const TKeyValuePair* pair = GetData() + TInterval::GetOffset(intervalPtr + hash); + const ui32 length = TInterval::GetLength(intervalPtr + hash); + for (ui32 i = 0; i < length; ++i, ++pair) { + if (TKeyValuePair::GetFirst(pair) == key) { + *res = TKeyValuePair::GetSecond(pair); + return true; + } + } + return false; + } + + TValue Get(typename TTypeTraits<TKey>::TFuncParam key) const { + TValue res; + if (Find(key, &res)) + return res; + else + ythrow yexception() << "key not found"; + } + + TConstIterator Begin() const { + return GetData(); + } + + TConstIterator End() const { + return GetData() + GetSize(); + } + + const char* ByteEnd() const { + return (const char*)(GetData() + GetSize()); + } + + size_t ByteSize() const { + return 12 + sizeof(TInterval) * (size_t(1) << GetBits()) + sizeof(TKeyValuePair) * GetSize(); + } +}; + +template <typename Key, typename Value, bool> +struct TPlainHashG; + +template <typename Key, typename Value> +struct TPlainHashG<Key, Value, false> { + typedef TPlainHash<Key, Value> T; +}; + +template <typename Key, typename Value> +struct TPlainHashG<Key, Value, true> { + typedef TPlainHashWriter<Key, Value> T; +}; + +template <typename T> +class TSingleValue { +private: + const T* Value; + +public: + TSingleValue(const TBlob& blob) { + Y_ASSERT(blob.Length() >= sizeof(T)); + Y_ASSERT(blob.Length() <= sizeof(T) + 16); + Value = reinterpret_cast<const T*>(blob.Begin()); + } + + const T& Get() const { + return *Value; + } +}; + +template <typename T> +class TSingleValueWriter { +private: + T Value; + +public: + TSingleValueWriter() = default; + + TSingleValueWriter(const T& value) + : Value(value) + { + } + + void Set(const T& value) { + Value = value; + } + + void Save(IOutputStream& out) const { + out.Write(&Value, sizeof(Value)); + } +}; + +TBlob GetBlock(const TBlob& data, size_t index); + +template <class T> +void WriteBlock(TChunkedDataWriter& writer, const T& t) { + writer.NewBlock(); + t.Save(writer); +} + +template <class T> +void WriteBlock(TChunkedDataWriter& writer, T& t) { + writer.NewBlock(); + t.Save(writer); +} + +// Extends TChunkedDataWriter, allowing user to name blocks with arbitrary strings. +class TNamedChunkedDataWriter: public TChunkedDataWriter { +public: + TNamedChunkedDataWriter(IOutputStream& slave); + ~TNamedChunkedDataWriter() override; + + // Start a new unnamed block, overrides TChunkedDataReader::NewBlock(). + void NewBlock(); + + // Start a new block with given name (possibly empty, in which case block is unnamed). + // Throws an exception if name is a duplicate. + void NewBlock(const TString& name); + + void WriteFooter(); + +private: + TVector<TString> Names; + THashMap<TString, size_t> NameToIndex; +}; + +class TNamedChunkedDataReader: public TChunkedDataReader { +public: + TNamedChunkedDataReader(const TBlob& blob); + + inline bool HasBlock(const char* name) const { + return NameToIndex.find(name) != NameToIndex.end(); + } + + inline size_t GetIndexByName(const char* name) const { + THashMap<TString, size_t>::const_iterator it = NameToIndex.find(name); + if (it == NameToIndex.end()) + throw yexception() << "Block \"" << name << "\" is not found"; + else + return it->second; + } + + // Returns number of blocks written to the file by user of TNamedChunkedDataReader. + inline size_t GetBlocksCount() const { + // Last block is for internal usage + return TChunkedDataReader::GetBlocksCount() - 1; + } + + inline const char* GetBlockName(size_t index) const { + Y_ASSERT(index < GetBlocksCount()); + return Names[index].data(); + } + + inline const void* GetBlockByName(const char* name) const { + return GetBlock(GetIndexByName(name)); + } + + inline size_t GetBlockLenByName(const char* name) const { + return GetBlockLen(GetIndexByName(name)); + } + + inline TBlob GetBlobByName(const char* name) const { + size_t id = GetIndexByName(name); + return TBlob::NoCopy(GetBlock(id), GetBlockLen(id)); + } + + inline bool GetBlobByName(const char* name, TBlob& blob) const { + THashMap<TString, size_t>::const_iterator it = NameToIndex.find(name); + if (it == NameToIndex.end()) + return false; + blob = TBlob::NoCopy(GetBlock(it->second), GetBlockLen(it->second)); + return true; + } + +private: + TVector<TString> Names; + THashMap<TString, size_t> NameToIndex; +}; + +template <class T> +struct TSaveLoadVectorNonPodElement { + static inline void Save(IOutputStream* out, const T& t) { + TSerializer<T>::Save(out, t); + } + + static inline void Load(IInputStream* in, T& t, size_t elementSize) { + Y_ASSERT(elementSize > 0); + TSerializer<T>::Load(in, t); + } +}; + +template <class T, bool isPod> +class TVectorTakingIntoAccountThePodType { +private: + ui64 SizeofOffsets; + const ui64* Offsets; + const char* Data; + +public: + TVectorTakingIntoAccountThePodType(const TBlob& blob) { + SizeofOffsets = ReadUnaligned<ui64>(blob.Begin()); + Y_ASSERT(SizeofOffsets > 0); + Offsets = reinterpret_cast<const ui64*>(blob.Begin() + sizeof(ui64)); + Data = reinterpret_cast<const char*>(blob.Begin() + sizeof(ui64) + SizeofOffsets * sizeof(ui64)); + } + + size_t GetSize() const { + return (size_t)(SizeofOffsets - 1); + } + + size_t GetLength(ui64 index) const { + if (index + 1 >= SizeofOffsets) + ythrow yexception() << "bad offset"; + return IntegerCast<size_t>(ReadUnaligned<ui64>(Offsets + index + 1) - ReadUnaligned<ui64>(Offsets + index)); + } + + void Get(ui64 index, T& t) const { + const size_t len = GetLength(index); + TMemoryInput input(Data + ReadUnaligned<ui64>(Offsets + index), len); + TSaveLoadVectorNonPodElement<T>::Load(&input, t, len); + } + + T Get(ui64 index) const { + T ret; + Get(index, ret); + return ret; + } + + size_t RealSize() const { + return sizeof(ui64) * (SizeofOffsets + 1) + ReadUnaligned<ui64>(Offsets + SizeofOffsets - 1); + } +}; + +template <class T, bool isPod> +class TVectorTakingIntoAccountThePodTypeWriter : TNonCopyable { +private: + typedef TVector<ui64> TOffsets; + TOffsets Offsets; + TBuffer Data; + TBufferOutput DataStream; + +public: + TVectorTakingIntoAccountThePodTypeWriter() + : DataStream(Data) + { + } + + void PushBack(const T& t) { + Offsets.push_back((ui64) Data.size()); + TSaveLoadVectorNonPodElement<T>::Save(&DataStream, t); + } + + size_t Size() const { + return Offsets.size(); + } + + void Save(IOutputStream& out) const { + ui64 sizeofOffsets = Offsets.size() + 1; + out.Write(&sizeofOffsets, sizeof(sizeofOffsets)); + out.Write(Offsets.data(), Offsets.size() * sizeof(Offsets[0])); + ui64 lastOffset = (ui64) Data.size(); + out.Write(&lastOffset, sizeof(lastOffset)); + out.Write(Data.data(), Data.size()); + } +}; + +template <class T> +class TVectorTakingIntoAccountThePodType<T, true>: public TYVector<T> { +public: + TVectorTakingIntoAccountThePodType(const TBlob& blob) + : TYVector<T>(blob) + { + } +}; + +template <class T> +class TVectorTakingIntoAccountThePodTypeWriter<T, true>: public TYVectorWriter<T> { +}; + +template <typename T> +class TGeneralVector: public TVectorTakingIntoAccountThePodType<T, TTypeTraits<T>::IsPod> { + typedef TVectorTakingIntoAccountThePodType<T, TTypeTraits<T>::IsPod> TBase; + +public: + TGeneralVector(const TBlob& blob) + : TBase(blob) + { + } +}; + +template <typename T> +class TGeneralVectorWriter: public TVectorTakingIntoAccountThePodTypeWriter<T, TTypeTraits<T>::IsPod> { +}; + +template <typename TItem, bool> +struct TGeneralVectorG; + +template <typename TItem> +struct TGeneralVectorG<TItem, false> { + typedef TGeneralVector<TItem> T; +}; + +template <typename TItem> +struct TGeneralVectorG<TItem, true> { + typedef TGeneralVectorWriter<TItem> T; +}; + +template <> +struct TSaveLoadVectorNonPodElement<TString> { + static inline void Save(IOutputStream* out, const TString& s) { + out->Write(s.data(), s.size() + 1); + } + + static inline void Load(TMemoryInput* in, TString& s, size_t elementSize) { + Y_ASSERT(elementSize > 0 && in->Avail() >= elementSize); + s.assign(in->Buf(), elementSize - 1); /// excluding 0 at the end + } +}; + +template <bool G> +struct TStringsVectorG: public TGeneralVectorG<TString, G> { +}; + +using TStringsVector = TGeneralVector<TString>; +using TStringsVectorWriter = TGeneralVectorWriter<TString>; diff --git a/library/cpp/on_disk/chunks/chunks_ut.cpp b/library/cpp/on_disk/chunks/chunks_ut.cpp new file mode 100644 index 0000000000..f727647f7f --- /dev/null +++ b/library/cpp/on_disk/chunks/chunks_ut.cpp @@ -0,0 +1,329 @@ +#include <library/cpp/testing/unittest/registar.h> + +#include <util/stream/file.h> +#include <util/system/filemap.h> +#include <util/system/tempfile.h> + +#include "chunked_helpers.h" + +/// Data for TChunkedHelpersTest::TestGeneralVector +struct TPodStruct { + int x; + float y; + TPodStruct(int _x = 0, float _y = 0) + : x(_x) + , y(_y) + { + } +}; +/// And its serialization +template <> +struct TSaveLoadVectorNonPodElement<TPodStruct> { + typedef TPodStruct TItem; + static inline void Save(IOutputStream* out, const TItem& item) { + TSerializer<int>::Save(out, item.x); + TSerializer<float>::Save(out, item.y); + } + + static inline void Load(IInputStream* in, TItem& item, size_t elementSize) { + Y_ASSERT(elementSize == sizeof(TItem)); + TSerializer<int>::Load(in, item.x); + TSerializer<float>::Load(in, item.y); + } +}; + +class TChunkedHelpersTest: public TTestBase { + UNIT_TEST_SUITE(TChunkedHelpersTest); + UNIT_TEST(TestHash) + UNIT_TEST(TestGeneralVector) + UNIT_TEST(TestStrings); + UNIT_TEST(TestNamedChunkedData); + UNIT_TEST_SUITE_END(); + +public: + void TestHash() { + { + TBufferStream stream; + { + TPlainHashWriter<ui64, ui16> writer; + writer.Add(5, 7); + writer.Save(stream); + } + + { + TBlob temp = TBlob::FromStreamSingleThreaded(stream); + TPlainHash<ui64, ui16> reader(temp); + ui16 value = 0; + UNIT_ASSERT(reader.Find(5, &value)); + UNIT_ASSERT_EQUAL(7, value); + UNIT_ASSERT(!reader.Find(6, &value)); + } + } + + { + TBufferStream stream; + int v = 1; + wchar16 k = 'a'; + { + TPlainHashWriter<wchar16, void*> writer; + writer.Add(k, &v); + writer.Save(stream); + } + { + TBlob temp = TBlob::FromStreamSingleThreaded(stream); + TPlainHash<wchar16, void*> reader(temp); + void* value = nullptr; + UNIT_ASSERT(reader.Find(k, &value)); + UNIT_ASSERT_EQUAL((int*)value, &v); + } + } + } + + void TestGeneralVector() { + { /// ui32 + const size_t N = 3; + TBufferStream stream; + { + TGeneralVectorWriter<ui32> writer; + for (size_t i = 0; i < N; ++i) + writer.PushBack(i); + writer.Save(stream); + } + { + TBlob temp = TBlob::FromStreamSingleThreaded(stream); + TGeneralVector<ui32> reader(temp); + UNIT_ASSERT_EQUAL(reader.GetSize(), N); + for (size_t i = 0; i < N; ++i) { + ui32 value; + reader.Get(i, value); + UNIT_ASSERT_EQUAL(value, i); + UNIT_ASSERT_EQUAL(reader.At(i), i); + } + UNIT_ASSERT_EQUAL(reader.RealSize(), sizeof(ui64) + N * sizeof(ui32)); + } + } + { /// TString + const size_t N = 4; + TBufferStream stream; + { + TGeneralVectorWriter<TString> writer; + for (size_t i = 0; i < N; ++i) + writer.PushBack(ToString(i)); + writer.Save(stream); + } + { + TBlob temp = TBlob::FromStreamSingleThreaded(stream); + TGeneralVector<TString> reader(temp); + UNIT_ASSERT_EQUAL(reader.GetSize(), N); + for (size_t i = 0; i < N; ++i) { + TString value; + reader.Get(i, value); + UNIT_ASSERT_EQUAL(value, ToString(i)); + UNIT_ASSERT_EQUAL(reader.Get(i), ToString(i)); + } + UNIT_ASSERT_EQUAL(reader.RealSize(), sizeof(ui64) * (N + 2) + N * 2); + } + } + { /// some other struct + typedef TPodStruct TItem; + const size_t N = 2; + TBufferStream stream; + { + TGeneralVectorWriter<TItem> writer; + writer.PushBack(TItem(1, 2)); + writer.PushBack(TItem(3, 4)); + writer.Save(stream); + } + { + TBlob temp = TBlob::FromStreamSingleThreaded(stream); + TGeneralVector<TItem> reader(temp); + UNIT_ASSERT_EQUAL(reader.GetSize(), N); + + TItem value; + reader.Get(0, value); + UNIT_ASSERT(value.x == 1 && value.y == 2.0); + + reader.Get(1, value); + UNIT_ASSERT(value.x == 3 && value.y == 4.0); + + UNIT_ASSERT_EQUAL(reader.RealSize(), sizeof(ui64) * (N + 2) + N * sizeof(TItem)); + } + } + { /// pointer + const size_t N = 3; + TVector<int> data_holder(N); + int* a = &(data_holder[0]); + TBufferStream stream; + { + TGeneralVectorWriter<int*> writer; + for (size_t i = 0; i < N; ++i) { + a[i] = i; + writer.PushBack(a + i); + } + writer.Save(stream); + } + { + TBlob temp = TBlob::FromStreamSingleThreaded(stream); + TGeneralVector<int*> reader(temp); + UNIT_ASSERT_EQUAL(reader.GetSize(), N); + for (size_t i = 0; i < N; ++i) { + int* value; + reader.Get(i, value); + UNIT_ASSERT_EQUAL(value, a + i); + UNIT_ASSERT_EQUAL(reader.At(i), a + i); + } + UNIT_ASSERT_EQUAL(reader.RealSize(), sizeof(ui64) + N * sizeof(int*)); + } + } + { /// std::pair<int, int> + typedef std::pair<int, int> TItem; + const size_t N = 3; + TBufferStream stream; + { + TGeneralVectorWriter<TItem> writer; + for (size_t i = 0; i < N; ++i) + writer.PushBack(TItem(i, i)); + writer.Save(stream); + } + { + TBlob temp = TBlob::FromStreamSingleThreaded(stream); + TGeneralVector<TItem> reader(temp); + UNIT_ASSERT_EQUAL(reader.GetSize(), N); + for (size_t i = 0; i < N; ++i) { + TItem value; + reader.Get(i, value); + UNIT_ASSERT_EQUAL(value, TItem(i, i)); + } + UNIT_ASSERT_EQUAL(reader.RealSize(), sizeof(ui64) + N * sizeof(TItem)); + } + } + } + + void TestStrings() { + const TString FILENAME = "chunked_helpers_test.bin"; + TTempFileHandle file(FILENAME.c_str()); + + { + TFixedBufferFileOutput fOut(FILENAME); + TStringsVectorWriter stringsWriter; + stringsWriter.PushBack(""); + stringsWriter.PushBack("test"); + TChunkedDataWriter writer(fOut); + WriteBlock(writer, stringsWriter); + writer.WriteFooter(); + } + + { + TBlob fIn = TBlob::FromFileSingleThreaded(FILENAME); + TStringsVector vct(GetBlock(fIn, 0)); + UNIT_ASSERT_EQUAL(vct.Get(0), ""); + UNIT_ASSERT_EQUAL(vct.Get(1), "test"); + + bool wasException = false; + try { + vct.Get(2); + } catch (...) { + wasException = true; + } + UNIT_ASSERT(wasException); + } + } + + void TestNamedChunkedData() { + const TString filename = MakeTempName(nullptr, "named_chunked_data_test"); + TTempFile file(filename); + + { + TFixedBufferFileOutput fOut(filename); + TNamedChunkedDataWriter writer(fOut); + + writer.NewBlock("alpha"); + writer.Write("123456"); + + writer.NewBlock(); + writer.Write("anonymous"); + + writer.NewBlock("omega"); + writer.Write("12345678901234567"); + + writer.WriteFooter(); + } + + { + TBlob mf = TBlob::FromFileSingleThreaded(filename); + TNamedChunkedDataReader reader(mf); + + UNIT_ASSERT(reader.GetBlocksCount() == 3); + + UNIT_ASSERT(reader.HasBlock("alpha")); + UNIT_ASSERT(reader.HasBlock("omega")); + + UNIT_ASSERT_STRINGS_EQUAL(reader.GetBlockName(0), "alpha"); + UNIT_ASSERT_STRINGS_EQUAL(reader.GetBlockName(1), ""); + UNIT_ASSERT_STRINGS_EQUAL(reader.GetBlockName(2), "omega"); + + UNIT_ASSERT_EQUAL(reader.GetBlockLenByName("alpha"), 6); // padding not included + UNIT_ASSERT_EQUAL(reader.GetBlockLenByName("omega"), 17); + + UNIT_ASSERT(memcmp(reader.GetBlockByName("alpha"), "123456", 6) == 0); + UNIT_ASSERT(memcmp(reader.GetBlock(1), "anonymous", 9) == 0); + UNIT_ASSERT(memcmp(reader.GetBlockByName("omega"), "12345678901234567", 17) == 0); + } + } +}; + +UNIT_TEST_SUITE_REGISTRATION(TChunkedHelpersTest); + +class TChunkedDataTest: public TTestBase { +private: + UNIT_TEST_SUITE(TChunkedDataTest); + UNIT_TEST(Test) + UNIT_TEST(TestEmpty) + UNIT_TEST_SUITE_END(); + + void Test() { + TBuffer buffer; + { + TBufferOutput out(buffer); + TChunkedDataWriter writer(out); + writer.NewBlock(); + writer << "test"; + writer.NewBlock(); + writer << 4; + writer.NewBlock(); + writer.NewBlock(); + writer << 1; + writer << 2; + writer.WriteFooter(); + } + { + TBlob blob = TBlob::FromBufferSingleThreaded(buffer); + TChunkedDataReader data(blob); + // printf("%d\n", (int)data.GetBlockLen(3)); + UNIT_ASSERT_EQUAL(4, data.GetBlockLen(0)); + UNIT_ASSERT_EQUAL(1, data.GetBlockLen(1)); + UNIT_ASSERT_EQUAL(0, data.GetBlockLen(2)); + UNIT_ASSERT_EQUAL(2, data.GetBlockLen(3)); + } + } + + void TestEmpty() { + TBuffer buffer; + { + TBufferOutput out(buffer); + TChunkedDataWriter writer(out); + writer.NewBlock(); + writer.NewBlock(); + writer.WriteFooter(); + } + { + TBlob blob = TBlob::FromBufferSingleThreaded(buffer); + TChunkedDataReader data(blob); + // printf("%d\n", (int)data.GetBlockLen(1)); + UNIT_ASSERT_EQUAL(0, data.GetBlockLen(0)); + UNIT_ASSERT_EQUAL(0, data.GetBlockLen(1)); + } + } +}; + +UNIT_TEST_SUITE_REGISTRATION(TChunkedDataTest); diff --git a/library/cpp/on_disk/chunks/reader.cpp b/library/cpp/on_disk/chunks/reader.cpp new file mode 100644 index 0000000000..6e28cbf367 --- /dev/null +++ b/library/cpp/on_disk/chunks/reader.cpp @@ -0,0 +1,52 @@ +#include <util/generic/cast.h> +#include <util/memory/blob.h> +#include <util/system/unaligned_mem.h> + +#include "reader.h" + +template <typename T> +static inline void ReadAux(const char* data, T* aux, T count, TVector<const char*>* result) { + result->resize(count); + for (size_t i = 0; i < count; ++i) { + (*result)[i] = data + ReadUnaligned<T>(aux + i); + } +} + +TChunkedDataReader::TChunkedDataReader(const TBlob& blob) { + const char* cdata = blob.AsCharPtr(); + const size_t size = blob.Size(); + Y_ENSURE(size >= sizeof(ui32), "Empty file with chunks. "); + + ui32 last = ReadUnaligned<ui32>((ui32*)(cdata + size) - 1); + + if (last != 0) { // old version file + ui32* aux = (ui32*)(cdata + size); + ui32 count = last; + Size = size - (count + 1) * sizeof(ui32); + + aux -= (count + 1); + ReadAux<ui32>(cdata, aux, count, &Offsets); + return; + } + + Y_ENSURE(size >= 3 * sizeof(ui64), "Blob size must be >= 3 * sizeof(ui64). "); + + ui64* aux = (ui64*)(cdata + size); + Version = ReadUnaligned<ui64>(aux - 2); + Y_ENSURE(Version > 0, "Invalid chunked array version. "); + + ui64 count = ReadUnaligned<ui64>(aux - 3); + + aux -= (count + 3); + ReadAux<ui64>(cdata, aux, count, &Offsets); + + aux -= count; + Lengths.resize(count); + for (size_t i = 0; i < count; ++i) { + Lengths[i] = IntegerCast<size_t>(ReadUnaligned<ui64>(aux + i)); + } +} + +TBlob TChunkedDataReader::GetBlob(size_t index) const { + return TBlob::NoCopy(GetBlock(index), GetBlockLen(index)); +} diff --git a/library/cpp/on_disk/chunks/reader.h b/library/cpp/on_disk/chunks/reader.h new file mode 100644 index 0000000000..c5fe783319 --- /dev/null +++ b/library/cpp/on_disk/chunks/reader.h @@ -0,0 +1,57 @@ +#pragma once + +#include <util/generic/array_ref.h> +#include <util/generic/vector.h> +#include <util/generic/yexception.h> + +class TBlob; + +class TChunkedDataReader { +public: + TChunkedDataReader(const TBlob& blob); + + inline const void* GetBlock(size_t index) const { + CheckIndex(index); + return Offsets[index]; + } + + inline size_t GetBlockLen(size_t index) const { + CheckIndex(index); + + if (Version == 0) { + if (index + 1 < Offsets.size()) { + return Offsets[index + 1] - Offsets[index]; + } + + return Size - (Offsets.back() - Offsets.front()); + } + + return Lengths[index]; + } + + TBlob GetBlob(size_t index) const; + + template <typename T> + TArrayRef<const T> GetRegion(size_t index) const { + size_t len = GetBlockLen(index); + Y_ENSURE(len % sizeof(T) == 0, "wrong data padding"); + return TArrayRef<const T>(reinterpret_cast<const T*>(GetBlock(index)), len / sizeof(T)); + } + + inline size_t GetBlocksCount() const { + return Offsets.size(); + } + +private: + inline void CheckIndex(size_t index) const { + if (index >= GetBlocksCount()) { + ythrow yexception() << "requested block " << index << " of " << GetBlocksCount() << " blocks"; + } + } + +private: + ui64 Version = 0; + TVector<const char*> Offsets; + TVector<size_t> Lengths; + size_t Size = 0; +}; diff --git a/library/cpp/on_disk/chunks/ut/ya.make b/library/cpp/on_disk/chunks/ut/ya.make new file mode 100644 index 0000000000..0190905cbe --- /dev/null +++ b/library/cpp/on_disk/chunks/ut/ya.make @@ -0,0 +1,9 @@ +UNITTEST_FOR(library/cpp/on_disk/chunks) + +OWNER(g:util) + +SRCS( + chunks_ut.cpp +) + +END() diff --git a/library/cpp/on_disk/chunks/writer.cpp b/library/cpp/on_disk/chunks/writer.cpp new file mode 100644 index 0000000000..6dc7397f09 --- /dev/null +++ b/library/cpp/on_disk/chunks/writer.cpp @@ -0,0 +1,46 @@ +#include <util/ysaveload.h> + +#include "writer.h" + +static inline void WriteAux(IOutputStream* out, const TVector<ui64>& data) { + ::SavePodArray(out, data.data(), data.size()); +} + +/*************************** TBuffersWriter ***************************/ + +TChunkedDataWriter::TChunkedDataWriter(IOutputStream& slave) + : Slave(slave) + , Offset(0) +{ +} + +TChunkedDataWriter::~TChunkedDataWriter() { +} + +void TChunkedDataWriter::NewBlock() { + if (Offsets.size()) { + Lengths.push_back(Offset - Offsets.back()); + } + + Pad(16); + Offsets.push_back(Offset); +} + +void TChunkedDataWriter::WriteFooter() { + Lengths.push_back(Offset - Offsets.back()); + WriteAux(this, Lengths); + WriteAux(this, Offsets); + WriteBinary<ui64>(Offsets.size()); + WriteBinary<ui64>(Version); + WriteBinary<ui64>(0); +} + +size_t TChunkedDataWriter::GetCurrentBlockOffset() const { + Y_ASSERT(!Offsets.empty()); + Y_ASSERT(Offset >= Offsets.back()); + return Offset - Offsets.back(); +} + +size_t TChunkedDataWriter::GetBlockCount() const { + return Offsets.size(); +} diff --git a/library/cpp/on_disk/chunks/writer.h b/library/cpp/on_disk/chunks/writer.h new file mode 100644 index 0000000000..ab14522bdd --- /dev/null +++ b/library/cpp/on_disk/chunks/writer.h @@ -0,0 +1,57 @@ +#pragma once + +#include <util/generic/vector.h> +#include <util/stream/output.h> + +template <typename T> +inline void WriteBin(IOutputStream* out, typename TTypeTraits<T>::TFuncParam t) { + out->Write(&t, sizeof(T)); +} + +class TChunkedDataWriter: public IOutputStream { +public: + TChunkedDataWriter(IOutputStream& slave); + ~TChunkedDataWriter() override; + + void NewBlock(); + + template <typename T> + inline void WriteBinary(typename TTypeTraits<T>::TFuncParam t) { + this->Write(&t, sizeof(T)); + } + + void WriteFooter(); + + size_t GetCurrentBlockOffset() const; + size_t GetBlockCount() const; + +protected: + void DoWrite(const void* buf, size_t len) override { + Slave.Write(buf, len); + Offset += len; + } + +private: + static inline size_t PaddingSize(size_t size, size_t boundary) noexcept { + const size_t boundaryViolation = size % boundary; + + return boundaryViolation == 0 ? 0 : boundary - boundaryViolation; + } + + inline void Pad(size_t boundary) { + const size_t newOffset = Offset + PaddingSize(Offset, boundary); + + while (Offset < newOffset) { + Write('\0'); + } + } + +private: + static const ui64 Version = 1; + + IOutputStream& Slave; + + size_t Offset; + TVector<ui64> Offsets; + TVector<ui64> Lengths; +}; diff --git a/library/cpp/on_disk/chunks/ya.make b/library/cpp/on_disk/chunks/ya.make new file mode 100644 index 0000000000..acb52df5b0 --- /dev/null +++ b/library/cpp/on_disk/chunks/ya.make @@ -0,0 +1,11 @@ +LIBRARY() + +OWNER(g:util) + +SRCS( + chunked_helpers.cpp + reader.cpp + writer.cpp +) + +END() |