diff options
author | max42 <max42@yandex-team.com> | 2023-06-30 03:37:03 +0300 |
---|---|---|
committer | max42 <max42@yandex-team.com> | 2023-06-30 03:37:03 +0300 |
commit | fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a (patch) | |
tree | b8cbc1deb00309c7f1a7ab6df520a76cf0b5c6d7 /library/cpp/erasure | |
parent | 7bf166b1a7ed0af927f230022b245af618e998c1 (diff) | |
download | ydb-fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a.tar.gz |
YT-19324: move YT provider to ydb/library/yql
This commit is formed by the following script: https://paste.yandex-team.ru/6f92e4b8-efc5-4d34-948b-15ee2accd7e7/text.
This commit has zero effect on all projects that depend on YQL.
The summary of changes:
- `yql/providers/yt -> ydb/library/yql/providers/yt `- the whole implementation of YT provider is moved into YDB code base for further export as a part of YT YQL plugin shared library;
- `yql/providers/stat/{expr_nodes,uploader} -> ydb/library/yql/providers/stat/{expr_nodes,uploader}` - a small interface without implementation and the description of stat expr nodes;
- `yql/core/extract_predicate/ut -> ydb/library/yql/core/extract_predicate/ut`;
- `yql/core/{ut,ut_common} -> ydb/library/yql/core/{ut,ut_common}`;
- `yql/core` is gone;
- `yql/library/url_preprocessing -> ydb/library/yql/core/url_preprocessing`.
**NB**: all new targets inside `ydb/` are under `IF (NOT CMAKE_EXPORT)` clause which disables them from open-source cmake generation and ya make build. They will be enabled in the subsequent commits.
Diffstat (limited to 'library/cpp/erasure')
-rw-r--r-- | library/cpp/erasure/README.md | 9 | ||||
-rw-r--r-- | library/cpp/erasure/codec.cpp | 1 | ||||
-rw-r--r-- | library/cpp/erasure/codec.h | 80 | ||||
-rw-r--r-- | library/cpp/erasure/helpers.cpp | 80 | ||||
-rw-r--r-- | library/cpp/erasure/helpers.h | 30 | ||||
-rw-r--r-- | library/cpp/erasure/isa_erasure.cpp | 1 | ||||
-rw-r--r-- | library/cpp/erasure/isa_erasure.h | 170 | ||||
-rw-r--r-- | library/cpp/erasure/lrc.cpp | 1 | ||||
-rw-r--r-- | library/cpp/erasure/lrc.h | 326 | ||||
-rw-r--r-- | library/cpp/erasure/lrc_isa.cpp | 1 | ||||
-rw-r--r-- | library/cpp/erasure/lrc_isa.h | 77 | ||||
-rw-r--r-- | library/cpp/erasure/public.cpp | 1 | ||||
-rw-r--r-- | library/cpp/erasure/public.h | 57 | ||||
-rw-r--r-- | library/cpp/erasure/reed_solomon.cpp | 1 | ||||
-rw-r--r-- | library/cpp/erasure/reed_solomon.h | 60 | ||||
-rw-r--r-- | library/cpp/erasure/reed_solomon_isa.cpp | 1 | ||||
-rw-r--r-- | library/cpp/erasure/reed_solomon_isa.h | 69 | ||||
-rw-r--r-- | library/cpp/erasure/ya.make | 35 |
18 files changed, 1000 insertions, 0 deletions
diff --git a/library/cpp/erasure/README.md b/library/cpp/erasure/README.md new file mode 100644 index 0000000000..7723bf7c23 --- /dev/null +++ b/library/cpp/erasure/README.md @@ -0,0 +1,9 @@ +# Erasure based codecs for arbitrary data + +C++ wrapper for LRC and Reed-Solomon erasure codecs. +There are two backends for LRC: Jerasure(http://jerasure.org) and ISA-L(https://github.com/intel/isa-l). ISA-L is much faster - it condsiders different instrucion sets to optimize speed of encode and decode. The only limitations now are if you don't have SSE4.2 instruction set (then base variant is as slow as Jerasure) or if you run it on aarch64 architecture (however, 2.29 version will be going to support fast implementation). However, we still have to keep Jerasure because it is incompatible due to some optimization in it which affect data layout in coded blocks. +Also see https://wiki.yandex-team.ru/yt/userdoc/erasure/, https://wiki.yandex-team.ru/ignatijjkolesnichenko/yt/erasure/. + +It is possible to use codecs LRC 2k-2-2 and Reed Solomon n-k for any data stream. All you need is to provide CodecTraits (see `codecs_ut.cpp` for examples). Note that ISA-L only supports WordSize equal to 8. If you use Jerasure codecs with bigger WordSize than `MaxWordSize` in `public.h`, codec is not guaranteed to be thread-safe. + +You can use interface in `codec.h` or use the exact codec from `lrc_isa.h`, `lrc_jerasure.h` and `reed_solomon.h` if you like. diff --git a/library/cpp/erasure/codec.cpp b/library/cpp/erasure/codec.cpp new file mode 100644 index 0000000000..5fbcd720b2 --- /dev/null +++ b/library/cpp/erasure/codec.cpp @@ -0,0 +1 @@ +#include "codec.h" diff --git a/library/cpp/erasure/codec.h b/library/cpp/erasure/codec.h new file mode 100644 index 0000000000..c03d25e9c8 --- /dev/null +++ b/library/cpp/erasure/codec.h @@ -0,0 +1,80 @@ +#pragma once + +#include "public.h" + +#include <optional> +#include <vector> + +namespace NErasure { + +//! Describes a generic way to generate parity blocks from data blocks and +//! to recover (repair) missing blocks. +/*! + * Given N data blocks (numbered from 0 to N - 1) one can call #Encode to generate + * another M parity blocks (numbered from N to N + M - 1). + * + * If some of the resulting N + M blocks ever become missing one can attempt to + * repair the missing blocks by calling #Decode. + * + * Here N and M are fixed (codec-specific) parameters. + * Call #GetDataPartCount and #GetParityPartCount to figure out the + * the values for N and M, respectively. + * + */ +template <class TBlobType> +struct ICodec { + //! Computes a sequence of parity blocks for given data blocks. + /*! + * The size of #blocks must be equal to #GetDataPartCount. + * The size of the returned array is equal to #GetParityPartCount. + */ + virtual std::vector<TBlobType> Encode(const std::vector<TBlobType>& blocks) const = 0; + + //! Decodes (repairs) missing blocks. + /*! + * #erasedIndices must contain the set of erased blocks indices. + * #blocks must contain known blocks (in the order specified by #GetRepairIndices). + * \returns The repaired blocks. + */ + virtual std::vector<TBlobType> Decode( + const std::vector<TBlobType>& blocks, + const TPartIndexList& erasedIndices) const = 0; + + //! Given a set of missing block indices, returns |true| if missing blocks can be repaired. + //! Due to performance reasons the elements of #erasedIndices must unique and sorted. + virtual bool CanRepair(const TPartIndexList& erasedIndices) const = 0; + + //! Rapid version that works with set instead of list. + virtual bool CanRepair(const TPartIndexSet& erasedIndices) const = 0; + + //! Given a set of missing block indices, checks if missing blocks can be repaired. + /*! + * \returns + * If repair is not possible, returns |std::nullopt|. + * Otherwise returns the indices of blocks (both data and parity) to be passed to #Decode + * (in this very order). Not all known blocks may be needed for repair. + */ + virtual std::optional<TPartIndexList> GetRepairIndices(const TPartIndexList& erasedIndices) const = 0; + + //! Returns the number of data blocks this codec can handle. + virtual int GetDataPartCount() const = 0; + + //! Returns the number of parity blocks this codec can handle. + virtual int GetParityPartCount() const = 0; + + //! Returns the maximum number of blocks that can always be repaired when missing. + virtual int GetGuaranteedRepairablePartCount() const = 0; + + //! Every block passed to this codec must have size divisible by the result of #GetWordSize. + virtual int GetWordSize() const = 0; + + // Extension methods + + //! Returns the sum of #GetDataPartCount and #GetParityPartCount. + int GetTotalPartCount() const { + return GetDataPartCount() + GetParityPartCount(); + } +}; + +} // namespace NErasure + diff --git a/library/cpp/erasure/helpers.cpp b/library/cpp/erasure/helpers.cpp new file mode 100644 index 0000000000..74edeca52c --- /dev/null +++ b/library/cpp/erasure/helpers.cpp @@ -0,0 +1,80 @@ +#include "helpers.h" + +#include <algorithm> +#include <iterator> + +namespace NErasure { + +TPartIndexList MakeSegment(int begin, int end) { + TPartIndexList result(end - begin); + for (int i = begin; i < end; ++i) { + result[i - begin] = i; + } + return result; +} + +TPartIndexList MakeSingleton(int elem) { + TPartIndexList result; + result.push_back(elem); + return result; +} + +TPartIndexList Difference(int begin, int end, const TPartIndexList& subtrahend) { + size_t pos = 0; + TPartIndexList result; + for (int i = begin; i < end; ++i) { + while (pos < subtrahend.size() && subtrahend[pos] < i) { + pos += 1; + } + if (pos == subtrahend.size() || subtrahend[pos] != i) { + result.push_back(i); + } + } + return result; +} + +TPartIndexList Difference(const TPartIndexList& first, const TPartIndexList& second) { + TPartIndexList result; + std::set_difference(first.begin(), first.end(), second.begin(), second.end(), std::back_inserter(result)); + return result; +} + +TPartIndexList Difference(const TPartIndexList& set, int subtrahend) { + return Difference(set, MakeSingleton(subtrahend)); +} + +TPartIndexList Intersection(const TPartIndexList& first, const TPartIndexList& second) { + TPartIndexList result; + std::set_intersection(first.begin(), first.end(), second.begin(), second.end(), std::back_inserter(result)); + return result; +} + +TPartIndexList Union(const TPartIndexList& first, const TPartIndexList& second) { + TPartIndexList result; + std::set_union(first.begin(), first.end(), second.begin(), second.end(), std::back_inserter(result)); + return result; +} + +bool Contains(const TPartIndexList& set, int elem) { + return std::binary_search(set.begin(), set.end(), elem); +} + +TPartIndexList UniqueSortedIndices(const TPartIndexList& indices) { + TPartIndexList copy = indices; + std::sort(copy.begin(), copy.end()); + copy.erase(std::unique(copy.begin(), copy.end()), copy.end()); + return copy; +} + +TPartIndexList ExtractRows(const TPartIndexList& matrix, int width, const TPartIndexList& rows) { + Y_ASSERT(matrix.size() % width == 0); + TPartIndexList result(width * rows.size()); + for (size_t i = 0; i < rows.size(); ++i) { + auto start = matrix.begin() + rows[i] * width; + std::copy(start, start + width, result.begin() + i * width); + } + return result; +} + +} // namespace NErasure + diff --git a/library/cpp/erasure/helpers.h b/library/cpp/erasure/helpers.h new file mode 100644 index 0000000000..741186322a --- /dev/null +++ b/library/cpp/erasure/helpers.h @@ -0,0 +1,30 @@ +#pragma once + +#include "public.h" + +namespace NErasure { + +// All vectors here are assumed to be sorted. + +TPartIndexList MakeSegment(int begin, int end); + +TPartIndexList MakeSingleton(int elem); + +TPartIndexList Difference(int begin, int end, const TPartIndexList& subtrahend); + +TPartIndexList Difference(const TPartIndexList& first, const TPartIndexList& second); + +TPartIndexList Difference(const TPartIndexList& first, int elem); + +TPartIndexList Intersection(const TPartIndexList& first, const TPartIndexList& second); + +TPartIndexList Union(const TPartIndexList& first, const TPartIndexList& second); + +bool Contains(const TPartIndexList& set, int elem); + +TPartIndexList UniqueSortedIndices(const TPartIndexList& indices); + +TPartIndexList ExtractRows(const TPartIndexList& matrix, int width, const TPartIndexList& rows); + +} // namespace NErasure + diff --git a/library/cpp/erasure/isa_erasure.cpp b/library/cpp/erasure/isa_erasure.cpp new file mode 100644 index 0000000000..0b0199934e --- /dev/null +++ b/library/cpp/erasure/isa_erasure.cpp @@ -0,0 +1 @@ +#include "isa_erasure.h" diff --git a/library/cpp/erasure/isa_erasure.h b/library/cpp/erasure/isa_erasure.h new file mode 100644 index 0000000000..a7df61307f --- /dev/null +++ b/library/cpp/erasure/isa_erasure.h @@ -0,0 +1,170 @@ +#pragma once + +#include "public.h" + +#include "helpers.h" + +#include <library/cpp/yt/assert/assert.h> + +#include <util/generic/array_ref.h> +#include <util/generic/ptr.h> +#include <util/generic/singleton.h> + +#include <vector> + +extern "C" { + #include <contrib/libs/isa-l/include/erasure_code.h> +} + +namespace NErasure { + +template <class TBlobType> +static inline unsigned char* ConstCast(typename TBlobType::const_iterator blobIter) { + return const_cast<unsigned char*>(reinterpret_cast<const unsigned char*>(blobIter)); +} + +template <int DataPartCount, int ParityPartCount, class TCodecTraits, class TBlobType = typename TCodecTraits::TBlobType, class TMutableBlobType = typename TCodecTraits::TMutableBlobType> +std::vector<TBlobType> ISAErasureEncode( + const std::vector<unsigned char>& encodeGFTables, + const std::vector<TBlobType>& dataBlocks) +{ + YT_VERIFY(dataBlocks.size() == DataPartCount); + + size_t blockLength = dataBlocks.front().Size(); + for (size_t i = 1; i < dataBlocks.size(); ++i) { + YT_VERIFY(dataBlocks[i].Size() == blockLength); + } + + std::vector<unsigned char*> dataPointers; + for (const auto& block : dataBlocks) { + dataPointers.emplace_back(ConstCast<TBlobType>(block.Begin())); + } + + std::vector<TMutableBlobType> parities(ParityPartCount); + std::vector<unsigned char*> parityPointers(ParityPartCount); + for (size_t i = 0; i < ParityPartCount; ++i) { + parities[i] = TCodecTraits::AllocateBlob(blockLength); + parityPointers[i] = ConstCast<TBlobType>(parities[i].Begin()); + memset(parityPointers[i], 0, blockLength); + } + + ec_encode_data( + blockLength, + DataPartCount, + ParityPartCount, + const_cast<unsigned char*>(encodeGFTables.data()), + dataPointers.data(), + parityPointers.data()); + + return std::vector<TBlobType>(parities.begin(), parities.end()); +} + +template <int DataPartCount, int ParityPartCount, class TCodecTraits, class TBlobType = typename TCodecTraits::TBlobType, class TMutableBlobType = typename TCodecTraits::TMutableBlobType> +std::vector<TBlobType> ISAErasureDecode( + const std::vector<TBlobType>& dataBlocks, + const TPartIndexList& erasedIndices, + TConstArrayRef<TPartIndexList> groups, + const std::vector<unsigned char>& fullGeneratorMatrix) +{ + YT_VERIFY(dataBlocks.size() >= DataPartCount); + YT_VERIFY(erasedIndices.size() <= ParityPartCount); + + size_t blockLength = dataBlocks.front().Size(); + for (size_t i = 1; i < dataBlocks.size(); ++i) { + YT_VERIFY(dataBlocks[i].Size() == blockLength); + } + + std::vector<unsigned char> partialGeneratorMatrix(DataPartCount * DataPartCount, 0); + + std::vector<unsigned char*> recoveryBlocks; + for (size_t i = 0; i < DataPartCount; ++i) { + recoveryBlocks.emplace_back(ConstCast<TBlobType>(dataBlocks[i].Begin())); + } + + // Groups check is specific for LRC. + std::vector<int> isGroupHealthy(2, 1); + for (size_t i = 0; i < 2; ++i) { + for (const auto& index : erasedIndices) { + if (!groups.empty() && Contains(groups[0], index)) { + isGroupHealthy[0] = 0; + } else if (!groups.empty() && Contains(groups[1], index)) { + isGroupHealthy[1] = 0; + } + } + } + + // When a group is healthy we cannot use its local parity, thus skip it using gap. + size_t gap = 0; + size_t decodeMatrixIndex = 0; + size_t erasedBlockIndex = 0; + while (decodeMatrixIndex < DataPartCount) { + size_t globalIndex = decodeMatrixIndex + erasedBlockIndex + gap; + + if (erasedBlockIndex < erasedIndices.size() && + globalIndex == static_cast<size_t>(erasedIndices[erasedBlockIndex])) + { + ++erasedBlockIndex; + continue; + } + + if (!groups.empty() && globalIndex >= DataPartCount && globalIndex < DataPartCount + 2) { + if (Contains(groups[0], globalIndex) && isGroupHealthy[0]) { + ++gap; + continue; + } + if (Contains(groups[1], globalIndex) && isGroupHealthy[1]) { + ++gap; + continue; + } + } + + memcpy(&partialGeneratorMatrix[decodeMatrixIndex * DataPartCount], &fullGeneratorMatrix[globalIndex * DataPartCount], DataPartCount); + ++decodeMatrixIndex; + } + + std::vector<unsigned char> invertedGeneratorMatrix(DataPartCount * DataPartCount, 0); + int res = gf_invert_matrix(partialGeneratorMatrix.data(), invertedGeneratorMatrix.data(), DataPartCount); + YT_VERIFY(res == 0); + + std::vector<unsigned char> decodeMatrix(DataPartCount * (DataPartCount + ParityPartCount), 0); + + //! Some magical code from library example. + for (size_t i = 0; i < erasedIndices.size(); ++i) { + if (erasedIndices[i] < DataPartCount) { + memcpy(&decodeMatrix[i * DataPartCount], &invertedGeneratorMatrix[erasedIndices[i] * DataPartCount], DataPartCount); + } else { + for (int k = 0; k < DataPartCount; ++k) { + int val = 0; + for (int j = 0; j < DataPartCount; ++j) { + val ^= gf_mul_erasure(invertedGeneratorMatrix[j * DataPartCount + k], fullGeneratorMatrix[DataPartCount * erasedIndices[i] + j]); + } + + decodeMatrix[DataPartCount * i + k] = val; + } + } + } + + std::vector<unsigned char> decodeGFTables(DataPartCount * erasedIndices.size() * 32); + ec_init_tables(DataPartCount, erasedIndices.size(), decodeMatrix.data(), decodeGFTables.data()); + + std::vector<TMutableBlobType> recoveredParts; + std::vector<unsigned char*> recoveredPartsPointers; + for (size_t i = 0; i < erasedIndices.size(); ++i) { + recoveredParts.emplace_back(TCodecTraits::AllocateBlob(blockLength)); + recoveredPartsPointers.emplace_back(ConstCast<TBlobType>(recoveredParts.back().Begin())); + memset(recoveredPartsPointers.back(), 0, blockLength); + } + + ec_encode_data( + blockLength, + DataPartCount, + erasedIndices.size(), + decodeGFTables.data(), + recoveryBlocks.data(), + recoveredPartsPointers.data()); + + return std::vector<TBlobType>(recoveredParts.begin(), recoveredParts.end()); +} + +} // namespace NErasure + diff --git a/library/cpp/erasure/lrc.cpp b/library/cpp/erasure/lrc.cpp new file mode 100644 index 0000000000..8c6a347091 --- /dev/null +++ b/library/cpp/erasure/lrc.cpp @@ -0,0 +1 @@ +#include "lrc.h" diff --git a/library/cpp/erasure/lrc.h b/library/cpp/erasure/lrc.h new file mode 100644 index 0000000000..15185a47f4 --- /dev/null +++ b/library/cpp/erasure/lrc.h @@ -0,0 +1,326 @@ +#pragma once + +#include "helpers.h" + +#include <library/cpp/sse/sse.h> + +#include <library/cpp/yt/assert/assert.h> + +#include <util/generic/array_ref.h> + +#include <algorithm> +#include <optional> + +namespace NErasure { + +template <class TCodecTraits, class TBlobType = typename TCodecTraits::TBlobType> +static inline TBlobType Xor(const std::vector<TBlobType>& refs) { + using TBufferType = typename TCodecTraits::TBufferType; + size_t size = refs.front().Size(); + TBufferType result = TCodecTraits::AllocateBuffer(size); // this also fills the buffer with zeros + for (const TBlobType& ref : refs) { + const char* data = reinterpret_cast<const char*>(ref.Begin()); + size_t pos = 0; +#ifdef ARCADIA_SSE + for (; pos + sizeof(__m128i) <= size; pos += sizeof(__m128i)) { + __m128i* dst = reinterpret_cast<__m128i*>(result.Begin() + pos); + const __m128i* src = reinterpret_cast<const __m128i*>(data + pos); + _mm_storeu_si128(dst, _mm_xor_si128(_mm_loadu_si128(src), _mm_loadu_si128(dst))); + } +#endif + for (; pos < size; ++pos) { + *(result.Begin() + pos) ^= data[pos]; + } + } + return TCodecTraits::FromBufferToBlob(std::move(result)); +} + +//! Locally Reconstructable Codes +/*! + * See https://www.usenix.org/conference/usenixfederatedconferencesweek/erasure-coding-windows-azure-storage + * for more details. + */ +template <int DataPartCount, int ParityPartCount, int WordSize, class TCodecTraits> +class TLrcCodecBase + : public ICodec<typename TCodecTraits::TBlobType> +{ + static_assert(DataPartCount % 2 == 0, "Data part count must be even."); + static_assert(ParityPartCount == 4, "Now we only support n-2-2 scheme for LRC codec"); + static_assert(1 + DataPartCount / 2 < (1 << (WordSize / 2)), "Data part count should be enough small to construct proper matrix."); +public: + //! Main blob for storing data. + using TBlobType = typename TCodecTraits::TBlobType; + //! Main mutable blob for decoding data. + using TMutableBlobType = typename TCodecTraits::TMutableBlobType; + + static constexpr ui64 RequiredDataAlignment = alignof(ui64); + + TLrcCodecBase() { + Groups_[0] = MakeSegment(0, DataPartCount / 2); + // Xor. + Groups_[0].push_back(DataPartCount); + + Groups_[1] = MakeSegment(DataPartCount / 2, DataPartCount); + // Xor. + Groups_[1].push_back(DataPartCount + 1); + + constexpr int totalPartCount = DataPartCount + ParityPartCount; + if constexpr (totalPartCount <= BitmaskOptimizationThreshold) { + CanRepair_.resize(1 << totalPartCount); + for (int mask = 0; mask < (1 << totalPartCount); ++mask) { + TPartIndexList erasedIndices; + for (size_t i = 0; i < totalPartCount; ++i) { + if ((mask & (1 << i)) == 0) { + erasedIndices.push_back(i); + } + } + CanRepair_[mask] = CalculateCanRepair(erasedIndices); + } + } + } + + /*! Note that if you want to restore any internal data, blocks offsets must by WordSize * sizeof(long) aligned. + * Though it is possible to restore unaligned data if no more than one index in each Group is failed. See unittests for this case. + */ + std::vector<TBlobType> Decode( + const std::vector<TBlobType>& blocks, + const TPartIndexList& erasedIndices) const override + { + if (erasedIndices.empty()) { + return std::vector<TBlobType>(); + } + + size_t blockLength = blocks.front().Size(); + for (size_t i = 1; i < blocks.size(); ++i) { + YT_VERIFY(blocks[i].Size() == blockLength); + } + + TPartIndexList indices = UniqueSortedIndices(erasedIndices); + + // We can restore one block by xor. + if (indices.size() == 1) { + int index = erasedIndices.front(); + for (size_t i = 0; i < 2; ++i) { + if (Contains(Groups_[i], index)) { + return std::vector<TBlobType>(1, Xor<TCodecTraits>(blocks)); + } + } + } + + TPartIndexList recoveryIndices = GetRepairIndices(indices).value(); + // We can restore two blocks from different groups using xor. + if (indices.size() == 2 && + indices.back() < DataPartCount + 2 && + recoveryIndices.back() < DataPartCount + 2) + { + std::vector<TBlobType> result; + for (int index : indices) { + for (size_t groupIndex = 0; groupIndex < 2; ++groupIndex) { + if (!Contains(Groups_[groupIndex], index)) { + continue; + } + + std::vector<TBlobType> correspondingBlocks; + for (int pos : Groups_[groupIndex]) { + for (size_t i = 0; i < blocks.size(); ++i) { + if (recoveryIndices[i] != pos) { + continue; + } + correspondingBlocks.push_back(blocks[i]); + } + } + + result.push_back(Xor<TCodecTraits>(correspondingBlocks)); + } + } + return result; + } + + return FallbackToCodecDecode(blocks, std::move(indices)); + } + + bool CanRepair(const TPartIndexList& erasedIndices) const final { + constexpr int totalPartCount = DataPartCount + ParityPartCount; + if constexpr (totalPartCount <= BitmaskOptimizationThreshold) { + int mask = (1 << (totalPartCount)) - 1; + for (int index : erasedIndices) { + mask -= (1 << index); + } + return CanRepair_[mask]; + } else { + return CalculateCanRepair(erasedIndices); + } + } + + bool CanRepair(const TPartIndexSet& erasedIndicesMask) const final { + constexpr int totalPartCount = DataPartCount + ParityPartCount; + if constexpr (totalPartCount <= BitmaskOptimizationThreshold) { + TPartIndexSet mask = erasedIndicesMask; + return CanRepair_[mask.flip().to_ulong()]; + } else { + TPartIndexList erasedIndices; + for (size_t i = 0; i < erasedIndicesMask.size(); ++i) { + if (erasedIndicesMask[i]) { + erasedIndices.push_back(i); + } + } + return CalculateCanRepair(erasedIndices); + } + } + + std::optional<TPartIndexList> GetRepairIndices(const TPartIndexList& erasedIndices) const final { + if (erasedIndices.empty()) { + return TPartIndexList(); + } + + TPartIndexList indices = UniqueSortedIndices(erasedIndices); + + if (indices.size() > ParityPartCount) { + return std::nullopt; + } + + // One erasure from data or xor blocks. + if (indices.size() == 1) { + int index = indices.front(); + for (size_t i = 0; i < 2; ++i) { + if (Contains(Groups_[i], index)) { + return Difference(Groups_[i], index); + } + } + } + + // Null if we have 4 erasures in one group. + if (indices.size() == ParityPartCount) { + bool intersectsAny = true; + for (size_t i = 0; i < 2; ++i) { + if (Intersection(indices, Groups_[i]).empty()) { + intersectsAny = false; + } + } + if (!intersectsAny) { + return std::nullopt; + } + } + + // Calculate coverage of each group. + int groupCoverage[2] = {}; + for (int index : indices) { + for (size_t i = 0; i < 2; ++i) { + if (Contains(Groups_[i], index)) { + ++groupCoverage[i]; + } + } + } + + // Two erasures, one in each group. + if (indices.size() == 2 && groupCoverage[0] == 1 && groupCoverage[1] == 1) { + return Difference(Union(Groups_[0], Groups_[1]), indices); + } + + // Erasures in only parity blocks. + if (indices.front() >= DataPartCount) { + return MakeSegment(0, DataPartCount); + } + + // Remove unnecessary xor parities. + TPartIndexList result = Difference(0, DataPartCount + ParityPartCount, indices); + for (size_t i = 0; i < 2; ++i) { + if (groupCoverage[i] == 0 && indices.size() <= 3) { + result = Difference(result, DataPartCount + i); + } + } + return result; + } + + int GetDataPartCount() const override { + return DataPartCount; + } + + int GetParityPartCount() const override { + return ParityPartCount; + } + + int GetGuaranteedRepairablePartCount() const override { + return ParityPartCount - 1; + } + + int GetWordSize() const override { + return WordSize * sizeof(long); + } + + virtual ~TLrcCodecBase() = default; + +protected: + // Indices of data blocks and corresponding xor (we have two xor parities). + TConstArrayRef<TPartIndexList> GetXorGroups() const { + return Groups_; + } + + virtual std::vector<TBlobType> FallbackToCodecDecode( + const std::vector<TBlobType>& /* blocks */, + TPartIndexList /* erasedIndices */) const = 0; + + template <typename T> + void InitializeGeneratorMatrix(T* generatorMatrix, const std::function<T(T)>& GFSquare) { + for (int row = 0; row < ParityPartCount; ++row) { + for (int column = 0; column < DataPartCount; ++column) { + int index = row * DataPartCount + column; + + bool isFirstHalf = column < DataPartCount / 2; + if (row == 0) generatorMatrix[index] = isFirstHalf ? 1 : 0; + if (row == 1) generatorMatrix[index] = isFirstHalf ? 0 : 1; + + // Let alpha_i be coefficient of first half and beta_i of the second half. + // Then matrix is non-singular iff: + // a) alpha_i, beta_j != 0 + // b) alpha_i != beta_j + // c) alpha_i + alpha_k != beta_j + beta_l + // for any i, j, k, l. + if (row == 2) { + int shift = isFirstHalf ? 1 : (1 << (WordSize / 2)); + int relativeColumn = isFirstHalf ? column : (column - (DataPartCount / 2)); + generatorMatrix[index] = shift * (1 + relativeColumn); + } + + // The last row is the square of the row before last. + if (row == 3) { + auto prev = generatorMatrix[index - DataPartCount]; + generatorMatrix[index] = GFSquare(prev); + } + } + } + } + +private: + bool CalculateCanRepair(const TPartIndexList& erasedIndices) const { + TPartIndexList indices = UniqueSortedIndices(erasedIndices); + if (indices.size() > ParityPartCount) { + return false; + } + + if (indices.size() == 1) { + int index = indices.front(); + for (size_t i = 0; i < 2; ++i) { + if (Contains(Groups_[i], index)) { + return true; + } + } + } + + // If 4 indices miss in one block we cannot recover. + if (indices.size() == ParityPartCount) { + for (size_t i = 0; i < 2; ++i) { + if (Intersection(indices, Groups_[i]).empty()) { + return false; + } + } + } + + return true; + } + + TPartIndexList Groups_[2]; + std::vector<bool> CanRepair_; +}; + +} // namespace NErasure diff --git a/library/cpp/erasure/lrc_isa.cpp b/library/cpp/erasure/lrc_isa.cpp new file mode 100644 index 0000000000..2068f840c8 --- /dev/null +++ b/library/cpp/erasure/lrc_isa.cpp @@ -0,0 +1 @@ +#include "lrc_isa.h" diff --git a/library/cpp/erasure/lrc_isa.h b/library/cpp/erasure/lrc_isa.h new file mode 100644 index 0000000000..800dc3c5ca --- /dev/null +++ b/library/cpp/erasure/lrc_isa.h @@ -0,0 +1,77 @@ +#pragma once + +#include "lrc.h" +#include "helpers.h" + +#include "isa_erasure.h" + +extern "C" { + #include <contrib/libs/isa-l/include/erasure_code.h> +} + +#include <library/cpp/sse/sse.h> + +#include <util/generic/array_ref.h> + +#include <optional> +#include <vector> + +namespace NErasure { + +template <int DataPartCount, int ParityPartCount, int WordSize, class TCodecTraits> +class TLrcIsa + : public TLrcCodecBase<DataPartCount, ParityPartCount, WordSize, TCodecTraits> +{ + static_assert(WordSize == 8, "ISA-l erasure codes support computations only in GF(2^8)"); +public: + //! Main blob for storing data. + using TBlobType = typename TCodecTraits::TBlobType; + //! Main mutable blob for decoding data. + using TMutableBlobType = typename TCodecTraits::TMutableBlobType; + + static constexpr ui64 RequiredDataAlignment = alignof(ui64); + + TLrcIsa() + : TLrcCodecBase<DataPartCount, ParityPartCount, WordSize, TCodecTraits>() + { + EncodeGFTables_.resize(DataPartCount * ParityPartCount * 32, 0); + GeneratorMatrix_.resize((DataPartCount + ParityPartCount) * DataPartCount, 0); + + for (int row = 0; row < DataPartCount; ++row) { + GeneratorMatrix_[row * DataPartCount + row] = 1; + } + this->template InitializeGeneratorMatrix<typename decltype(GeneratorMatrix_)::value_type>( + &GeneratorMatrix_[DataPartCount * DataPartCount], + std::bind(&gf_mul_erasure, std::placeholders::_1, std::placeholders::_1)); + + ec_init_tables( + DataPartCount, + ParityPartCount, + &GeneratorMatrix_.data()[DataPartCount * DataPartCount], + EncodeGFTables_.data()); + } + + std::vector<TBlobType> Encode(const std::vector<TBlobType>& blocks) const override { + return ISAErasureEncode<DataPartCount, ParityPartCount, TCodecTraits, TBlobType, TMutableBlobType>(EncodeGFTables_, blocks); + } + + virtual ~TLrcIsa() = default; + +private: + std::vector<TBlobType> FallbackToCodecDecode( + const std::vector<TBlobType>& blocks, + TPartIndexList erasedIndices) const override + { + return ISAErasureDecode<DataPartCount, ParityPartCount, TCodecTraits, TBlobType, TMutableBlobType>( + blocks, + std::move(erasedIndices), + this->GetXorGroups(), + GeneratorMatrix_); + } + + std::vector<unsigned char> GeneratorMatrix_; + std::vector<unsigned char> EncodeGFTables_; +}; + +} // NErasure + diff --git a/library/cpp/erasure/public.cpp b/library/cpp/erasure/public.cpp new file mode 100644 index 0000000000..4df9bcaa18 --- /dev/null +++ b/library/cpp/erasure/public.cpp @@ -0,0 +1 @@ +#include "public.h" diff --git a/library/cpp/erasure/public.h b/library/cpp/erasure/public.h new file mode 100644 index 0000000000..d5cf01297b --- /dev/null +++ b/library/cpp/erasure/public.h @@ -0,0 +1,57 @@ +#pragma once + +#include <util/generic/buffer.h> +#include <util/generic/yexception.h> +#include <util/memory/blob.h> +#include <util/string/cast.h> +#include <util/system/src_root.h> + +#include <vector> + +#include <bitset> + +namespace NErasure { + +//! The maximum total number of blocks our erasure codec can handle. +static constexpr int MaxTotalPartCount = 16; + +//! Max word size to use. `w` in jerasure notation +static constexpr int MaxWordSize = 8; + +//! Max threshold to generate bitmask for CanRepair indices for LRC encoding. +static constexpr int BitmaskOptimizationThreshold = 22; + +//! A vector type for holding block indexes. +using TPartIndexList = std::vector<int>; + +//! Each bit corresponds to a possible block index. +using TPartIndexSet = std::bitset<MaxTotalPartCount>; + +template <class TBlobType> +struct ICodec; + +struct TDefaultCodecTraits { + using TBlobType = TBlob; + using TMutableBlobType = TBlob; + using TBufferType = TBuffer; + + static inline TMutableBlobType AllocateBlob(size_t size) { + TBufferType buffer(size); + buffer.Resize(size); + // The buffer is cleared after this call so no use after free. + return TBlob::FromBuffer(buffer); + } + + // AllocateBuffer must fill the memory with 0. + static inline TBufferType AllocateBuffer(size_t size) { + TBufferType buffer(size); + buffer.Fill(0, size); + return buffer; + } + + static inline TBlobType FromBufferToBlob(TBufferType&& blob) { + return TBlobType::FromBuffer(blob); + } +}; + +} // namespace NErasure diff --git a/library/cpp/erasure/reed_solomon.cpp b/library/cpp/erasure/reed_solomon.cpp new file mode 100644 index 0000000000..68aa2acbfb --- /dev/null +++ b/library/cpp/erasure/reed_solomon.cpp @@ -0,0 +1 @@ +#include "reed_solomon.h" diff --git a/library/cpp/erasure/reed_solomon.h b/library/cpp/erasure/reed_solomon.h new file mode 100644 index 0000000000..a2e268e81d --- /dev/null +++ b/library/cpp/erasure/reed_solomon.h @@ -0,0 +1,60 @@ +#pragma once + +#include "helpers.h" + +#include <algorithm> +#include <optional> + +namespace NErasure { + +template <int DataPartCount, int ParityPartCount, int WordSize, class TCodecTraits> +class TReedSolomonBase + : public ICodec<typename TCodecTraits::TBlobType> +{ +public: + static constexpr ui64 RequiredDataAlignment = alignof(ui64); + + bool CanRepair(const TPartIndexList& erasedIndices) const final { + return erasedIndices.size() <= ParityPartCount; + } + + bool CanRepair(const TPartIndexSet& erasedIndices) const final { + return erasedIndices.count() <= static_cast<size_t>(ParityPartCount); + } + + std::optional<TPartIndexList> GetRepairIndices(const TPartIndexList& erasedIndices) const final { + if (erasedIndices.empty()) { + return TPartIndexList(); + } + + TPartIndexList indices = erasedIndices; + std::sort(indices.begin(), indices.end()); + indices.erase(std::unique(indices.begin(), indices.end()), indices.end()); + + if (indices.size() > static_cast<size_t>(ParityPartCount)) { + return std::nullopt; + } + + return Difference(0, DataPartCount + ParityPartCount, indices); + } + + int GetDataPartCount() const final { + return DataPartCount; + } + + int GetParityPartCount() const final { + return ParityPartCount; + } + + int GetGuaranteedRepairablePartCount() const final { + return ParityPartCount; + } + + int GetWordSize() const final { + return WordSize * sizeof(long); + } + + virtual ~TReedSolomonBase() = default; +}; + +} // namespace NErasure diff --git a/library/cpp/erasure/reed_solomon_isa.cpp b/library/cpp/erasure/reed_solomon_isa.cpp new file mode 100644 index 0000000000..eaffde54b5 --- /dev/null +++ b/library/cpp/erasure/reed_solomon_isa.cpp @@ -0,0 +1 @@ +#include "reed_solomon_isa.h" diff --git a/library/cpp/erasure/reed_solomon_isa.h b/library/cpp/erasure/reed_solomon_isa.h new file mode 100644 index 0000000000..73a3cd630f --- /dev/null +++ b/library/cpp/erasure/reed_solomon_isa.h @@ -0,0 +1,69 @@ +#pragma once + +#include "isa_erasure.h" +#include "reed_solomon.h" + +extern "C" { + #include <contrib/libs/isa-l/include/erasure_code.h> +} + +#include <util/generic/array_ref.h> + +#include <array> + +namespace NErasure { + +template <int DataPartCount, int ParityPartCount, int WordSize, class TCodecTraits> +class TReedSolomonIsa + : public TReedSolomonBase<DataPartCount, ParityPartCount, WordSize, TCodecTraits> +{ + static_assert(WordSize == 8, "ISA-l erasure codes support computations only in GF(2^8)"); +public: + //! Main blob for storing data. + using TBlobType = typename TCodecTraits::TBlobType; + //! Main mutable blob for decoding data. + using TMutableBlobType = typename TCodecTraits::TMutableBlobType; + + TReedSolomonIsa() { + EncodeGFTables_.resize(DataPartCount * ParityPartCount * 32, 0); + GeneratorMatrix_.resize((DataPartCount + ParityPartCount) * DataPartCount, 0); + + gf_gen_rs_matrix( + GeneratorMatrix_.data(), + DataPartCount + ParityPartCount, + DataPartCount); + + ec_init_tables( + DataPartCount, + ParityPartCount, + &GeneratorMatrix_.data()[DataPartCount * DataPartCount], + EncodeGFTables_.data()); + } + + virtual std::vector<TBlobType> Encode(const std::vector<TBlobType>& blocks) const override { + return ISAErasureEncode<DataPartCount, ParityPartCount, TCodecTraits, TBlobType, TMutableBlobType>(EncodeGFTables_, blocks); + } + + virtual std::vector<TBlobType> Decode( + const std::vector<TBlobType>& blocks, + const TPartIndexList& erasedIndices) const override + { + if (erasedIndices.empty()) { + return std::vector<TBlobType>(); + } + + return ISAErasureDecode<DataPartCount, ParityPartCount, TCodecTraits, TBlobType, TMutableBlobType>( + blocks, + erasedIndices, + TConstArrayRef<TPartIndexList>(), + GeneratorMatrix_); + } + + virtual ~TReedSolomonIsa() = default; + +private: + std::vector<unsigned char> GeneratorMatrix_; + std::vector<unsigned char> EncodeGFTables_; +}; + +} // namespace NErasure diff --git a/library/cpp/erasure/ya.make b/library/cpp/erasure/ya.make new file mode 100644 index 0000000000..bde816b4d8 --- /dev/null +++ b/library/cpp/erasure/ya.make @@ -0,0 +1,35 @@ +LIBRARY() + +SRCS( + public.cpp + codec.cpp + helpers.cpp + + isa_erasure.cpp + + reed_solomon.cpp + reed_solomon_isa.cpp + + lrc.cpp + lrc_isa.cpp +) + +PEERDIR( + contrib/libs/isa-l/erasure_code + library/cpp/sse + library/cpp/yt/assert +) + +IF (NOT OPENSOURCE) + SRCS( + jerasure.cpp + reed_solomon_jerasure.cpp + lrc_jerasure.cpp + ) + + PEERDIR(contrib/libs/jerasure) +ENDIF() + +GENERATE_ENUM_SERIALIZATION(public.h) + +END() |