diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/blockcodecs | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/blockcodecs')
37 files changed, 1910 insertions, 0 deletions
diff --git a/library/cpp/blockcodecs/README.md b/library/cpp/blockcodecs/README.md new file mode 100644 index 00000000000..417917a475b --- /dev/null +++ b/library/cpp/blockcodecs/README.md @@ -0,0 +1,23 @@ +This is a simple library for block data compression (this means data is compressed/uncompressed +by whole blocks in memory). It's a lite-version of the `library/cpp/codecs`. Lite here means that it +provide only well-known compression algorithms, without the possibility of learning. + +There are two possible ways to work with it. + +Codec by name +============= +Use `NBlockCodec::Codec` to obtain the codec by name. The codec can be asked to compress +or decompress something and in various ways. + +To get a full list of codecs there is a function `NBlockCodecs::ListAllCodecs()`. + +Streaming +========= +Use `stream.h` to obtain simple streams over block codecs (buffer data, compress them by blocks, +write to the resulting stream). + +Using codec plugins +=================== +If you don't want your code to bloat from unused codecs, you can use the small version of the +library: `library/cpp/blockcodecs/core`. In that case, you need to manually set `PEERDIR()`s to +needed codecs (i.e. `PEERDIR(library/cpp/blockcodecs/codecs/lzma)`). diff --git a/library/cpp/blockcodecs/codecs.cpp b/library/cpp/blockcodecs/codecs.cpp new file mode 100644 index 00000000000..fdec4809d37 --- /dev/null +++ b/library/cpp/blockcodecs/codecs.cpp @@ -0,0 +1 @@ +#include "codecs.h" diff --git a/library/cpp/blockcodecs/codecs.h b/library/cpp/blockcodecs/codecs.h new file mode 100644 index 00000000000..fd499b54b0d --- /dev/null +++ b/library/cpp/blockcodecs/codecs.h @@ -0,0 +1,3 @@ +#pragma once + +#include <library/cpp/blockcodecs/core/codecs.h> diff --git a/library/cpp/blockcodecs/codecs/brotli/brotli.cpp b/library/cpp/blockcodecs/codecs/brotli/brotli.cpp new file mode 100644 index 00000000000..6e3cd971bdc --- /dev/null +++ b/library/cpp/blockcodecs/codecs/brotli/brotli.cpp @@ -0,0 +1,67 @@ +#include <library/cpp/blockcodecs/core/codecs.h> +#include <library/cpp/blockcodecs/core/common.h> +#include <library/cpp/blockcodecs/core/register.h> + +#include <contrib/libs/brotli/include/brotli/encode.h> +#include <contrib/libs/brotli/include/brotli/decode.h> + +using namespace NBlockCodecs; + +namespace { + struct TBrotliCodec : public TAddLengthCodec<TBrotliCodec> { + static constexpr int BEST_QUALITY = 11; + + inline TBrotliCodec(ui32 level) + : Quality(level) + , MyName(TStringBuf("brotli_") + ToString(level)) + { + } + + static inline size_t DoMaxCompressedLength(size_t l) noexcept { + return BrotliEncoderMaxCompressedSize(l); + } + + inline size_t DoCompress(const TData& in, void* out) const { + size_t resultSize = MaxCompressedLength(in); + auto result = BrotliEncoderCompress( + /*quality*/ Quality, + /*window*/ BROTLI_DEFAULT_WINDOW, + /*mode*/ BrotliEncoderMode::BROTLI_MODE_GENERIC, + /*input_size*/ in.size(), + /*input_buffer*/ (const unsigned char*)(in.data()), + /*encoded_size*/ &resultSize, + /*encoded_buffer*/ static_cast<unsigned char*>(out)); + if (result != BROTLI_TRUE) { + ythrow yexception() << "internal brotli error during compression"; + } + + return resultSize; + } + + inline void DoDecompress(const TData& in, void* out, size_t dsize) const { + size_t decoded = dsize; + auto result = BrotliDecoderDecompress(in.size(), (const unsigned char*)in.data(), &decoded, static_cast<unsigned char*>(out)); + if (result != BROTLI_DECODER_RESULT_SUCCESS) { + ythrow yexception() << "internal brotli error during decompression"; + } else if (decoded != dsize) { + ythrow TDecompressError(dsize, decoded); + } + } + + TStringBuf Name() const noexcept override { + return MyName; + } + + const int Quality = BEST_QUALITY; + const TString MyName; + }; + + struct TBrotliRegistrar { + TBrotliRegistrar() { + for (int i = 1; i <= TBrotliCodec::BEST_QUALITY; ++i) { + RegisterCodec(MakeHolder<TBrotliCodec>(i)); + } + } + }; + const TBrotliRegistrar Registrar{}; +} diff --git a/library/cpp/blockcodecs/codecs/brotli/ya.make b/library/cpp/blockcodecs/codecs/brotli/ya.make new file mode 100644 index 00000000000..17aff0bb722 --- /dev/null +++ b/library/cpp/blockcodecs/codecs/brotli/ya.make @@ -0,0 +1,15 @@ +LIBRARY() + +OWNER(pg) + +PEERDIR( + contrib/libs/brotli/enc + contrib/libs/brotli/dec + library/cpp/blockcodecs/core +) + +SRCS( + GLOBAL brotli.cpp +) + +END() diff --git a/library/cpp/blockcodecs/codecs/bzip/bzip.cpp b/library/cpp/blockcodecs/codecs/bzip/bzip.cpp new file mode 100644 index 00000000000..3a5cfdd0e9a --- /dev/null +++ b/library/cpp/blockcodecs/codecs/bzip/bzip.cpp @@ -0,0 +1,62 @@ +#include <library/cpp/blockcodecs/core/codecs.h> +#include <library/cpp/blockcodecs/core/common.h> +#include <library/cpp/blockcodecs/core/register.h> + +#include <contrib/libs/libbz2/bzlib.h> + +using namespace NBlockCodecs; + +namespace { + struct TBZipCodec: public TAddLengthCodec<TBZipCodec> { + inline TBZipCodec(int level) + : Level(level) + , MyName("bzip2-" + ToString(Level)) + { + } + + static inline size_t DoMaxCompressedLength(size_t in) noexcept { + // very strange + return in * 2 + 128; + } + + TStringBuf Name() const noexcept override { + return MyName; + } + + inline size_t DoCompress(const TData& in, void* buf) const { + unsigned int ret = DoMaxCompressedLength(in.size()); + const int res = BZ2_bzBuffToBuffCompress((char*)buf, &ret, (char*)in.data(), in.size(), Level, 0, 0); + if (res != BZ_OK) { + ythrow TCompressError(res); + } + + return ret; + } + + inline void DoDecompress(const TData& in, void* out, size_t len) const { + unsigned int tmp = SafeIntegerCast<unsigned int>(len); + const int res = BZ2_bzBuffToBuffDecompress((char*)out, &tmp, (char*)in.data(), in.size(), 0, 0); + + if (res != BZ_OK) { + ythrow TDecompressError(res); + } + + if (len != tmp) { + ythrow TDecompressError(len, tmp); + } + } + + const int Level; + const TString MyName; + }; + + struct TBZipRegistrar { + TBZipRegistrar() { + for (int i = 1; i < 10; ++i) { + RegisterCodec(MakeHolder<TBZipCodec>(i)); + } + RegisterAlias("bzip2", "bzip2-6"); + } + }; + const TBZipRegistrar Registrar{}; +} diff --git a/library/cpp/blockcodecs/codecs/bzip/ya.make b/library/cpp/blockcodecs/codecs/bzip/ya.make new file mode 100644 index 00000000000..f0a8aefd62d --- /dev/null +++ b/library/cpp/blockcodecs/codecs/bzip/ya.make @@ -0,0 +1,14 @@ +LIBRARY() + +OWNER(pg) + +PEERDIR( + contrib/libs/libbz2 + library/cpp/blockcodecs/core +) + +SRCS( + GLOBAL bzip.cpp +) + +END() diff --git a/library/cpp/blockcodecs/codecs/fastlz/fastlz.cpp b/library/cpp/blockcodecs/codecs/fastlz/fastlz.cpp new file mode 100644 index 00000000000..da2831fbd24 --- /dev/null +++ b/library/cpp/blockcodecs/codecs/fastlz/fastlz.cpp @@ -0,0 +1,54 @@ +#include <library/cpp/blockcodecs/core/codecs.h> +#include <library/cpp/blockcodecs/core/common.h> +#include <library/cpp/blockcodecs/core/register.h> + +#include <contrib/libs/fastlz/fastlz.h> + +using namespace NBlockCodecs; + +namespace { + struct TFastLZCodec: public TAddLengthCodec<TFastLZCodec> { + inline TFastLZCodec(int level) + : MyName("fastlz-" + ToString(level)) + , Level(level) + { + } + + static inline size_t DoMaxCompressedLength(size_t in) noexcept { + return Max<size_t>(in + in / 20, 128); + } + + TStringBuf Name() const noexcept override { + return MyName; + } + + inline size_t DoCompress(const TData& in, void* buf) const { + if (Level) { + return fastlz_compress_level(Level, in.data(), in.size(), buf); + } + + return fastlz_compress(in.data(), in.size(), buf); + } + + inline void DoDecompress(const TData& in, void* out, size_t len) const { + const int ret = fastlz_decompress(in.data(), in.size(), out, len); + + if (ret < 0 || (size_t)ret != len) { + ythrow TDataError() << TStringBuf("can not decompress"); + } + } + + const TString MyName; + const int Level; + }; + + struct TFastLZRegistrar { + TFastLZRegistrar() { + for (int i = 0; i < 3; ++i) { + RegisterCodec(MakeHolder<TFastLZCodec>(i)); + } + RegisterAlias("fastlz", "fastlz-0"); + } + }; + const TFastLZRegistrar Registrar{}; +} diff --git a/library/cpp/blockcodecs/codecs/fastlz/ya.make b/library/cpp/blockcodecs/codecs/fastlz/ya.make new file mode 100644 index 00000000000..59c09b329b3 --- /dev/null +++ b/library/cpp/blockcodecs/codecs/fastlz/ya.make @@ -0,0 +1,14 @@ +LIBRARY() + +OWNER(pg) + +PEERDIR( + contrib/libs/fastlz + library/cpp/blockcodecs/core +) + +SRCS( + GLOBAL fastlz.cpp +) + +END() diff --git a/library/cpp/blockcodecs/codecs/legacy_zstd06/legacy_zstd06.cpp b/library/cpp/blockcodecs/codecs/legacy_zstd06/legacy_zstd06.cpp new file mode 100644 index 00000000000..042f031679c --- /dev/null +++ b/library/cpp/blockcodecs/codecs/legacy_zstd06/legacy_zstd06.cpp @@ -0,0 +1,58 @@ +#include <library/cpp/blockcodecs/core/codecs.h> +#include <library/cpp/blockcodecs/core/common.h> +#include <library/cpp/blockcodecs/core/register.h> + +#include <contrib/libs/zstd06/common/zstd.h> +#include <contrib/libs/zstd06/common/zstd_static.h> + +using namespace NBlockCodecs; + +namespace { + struct TZStd06Codec: public TAddLengthCodec<TZStd06Codec> { + inline TZStd06Codec(unsigned level) + : Level(level) + , MyName(TStringBuf("zstd06_") + ToString(Level)) + { + } + + static inline size_t CheckError(size_t ret, const char* what) { + if (ZSTD_isError(ret)) { + ythrow yexception() << what << TStringBuf(" zstd error: ") << ZSTD_getErrorName(ret); + } + + return ret; + } + + static inline size_t DoMaxCompressedLength(size_t l) noexcept { + return ZSTD_compressBound(l); + } + + inline size_t DoCompress(const TData& in, void* out) const { + return CheckError(ZSTD_compress(out, DoMaxCompressedLength(in.size()), in.data(), in.size(), Level), "compress"); + } + + inline void DoDecompress(const TData& in, void* out, size_t dsize) const { + const size_t res = CheckError(ZSTD_decompress(out, dsize, in.data(), in.size()), "decompress"); + + if (res != dsize) { + ythrow TDecompressError(dsize, res); + } + } + + TStringBuf Name() const noexcept override { + return MyName; + } + + const unsigned Level; + const TString MyName; + }; + + struct TZStd06Registrar { + TZStd06Registrar() { + for (unsigned i = 1; i <= ZSTD_maxCLevel(); ++i) { + RegisterCodec(MakeHolder<TZStd06Codec>(i)); + } + } + }; + const TZStd06Registrar Registrar{}; +} diff --git a/library/cpp/blockcodecs/codecs/legacy_zstd06/ya.make b/library/cpp/blockcodecs/codecs/legacy_zstd06/ya.make new file mode 100644 index 00000000000..067f7312330 --- /dev/null +++ b/library/cpp/blockcodecs/codecs/legacy_zstd06/ya.make @@ -0,0 +1,14 @@ +LIBRARY() + +OWNER(pg) + +PEERDIR( + contrib/libs/zstd06 + library/cpp/blockcodecs/core +) + +SRCS( + GLOBAL legacy_zstd06.cpp +) + +END() diff --git a/library/cpp/blockcodecs/codecs/lz4/lz4.cpp b/library/cpp/blockcodecs/codecs/lz4/lz4.cpp new file mode 100644 index 00000000000..fbf0fe110f1 --- /dev/null +++ b/library/cpp/blockcodecs/codecs/lz4/lz4.cpp @@ -0,0 +1,123 @@ +#include <library/cpp/blockcodecs/core/codecs.h> +#include <library/cpp/blockcodecs/core/common.h> +#include <library/cpp/blockcodecs/core/register.h> + +#include <contrib/libs/lz4/lz4.h> +#include <contrib/libs/lz4/lz4hc.h> +#include <contrib/libs/lz4/generated/iface.h> + +using namespace NBlockCodecs; + +namespace { + struct TLz4Base { + static inline size_t DoMaxCompressedLength(size_t in) { + return LZ4_compressBound(SafeIntegerCast<int>(in)); + } + }; + + struct TLz4FastCompress { + inline TLz4FastCompress(int memory) + : Memory(memory) + , Methods(LZ4Methods(Memory)) + { + } + + inline size_t DoCompress(const TData& in, void* buf) const { + return Methods->LZ4CompressLimited(in.data(), (char*)buf, in.size(), LZ4_compressBound(in.size())); + } + + inline TString CPrefix() { + return "fast" + ToString(Memory); + } + + const int Memory; + const TLZ4Methods* Methods; + }; + + struct TLz4BestCompress { + inline size_t DoCompress(const TData& in, void* buf) const { + return LZ4_compress_HC(in.data(), (char*)buf, in.size(), LZ4_compressBound(in.size()), 0); + } + + static inline TString CPrefix() { + return "hc"; + } + }; + + struct TLz4FastDecompress { + inline void DoDecompress(const TData& in, void* out, size_t len) const { + ssize_t res = LZ4_decompress_fast(in.data(), (char*)out, len); + if (res < 0) { + ythrow TDecompressError(res); + } + } + + static inline TStringBuf DPrefix() { + return TStringBuf("fast"); + } + }; + + struct TLz4SafeDecompress { + inline void DoDecompress(const TData& in, void* out, size_t len) const { + ssize_t res = LZ4_decompress_safe(in.data(), (char*)out, in.size(), len); + if (res < 0) { + ythrow TDecompressError(res); + } + } + + static inline TStringBuf DPrefix() { + return TStringBuf("safe"); + } + }; + + template <class TC, class TD> + struct TLz4Codec: public TAddLengthCodec<TLz4Codec<TC, TD>>, public TLz4Base, public TC, public TD { + inline TLz4Codec() + : MyName("lz4-" + TC::CPrefix() + "-" + TD::DPrefix()) + { + } + + template <class T> + inline TLz4Codec(const T& t) + : TC(t) + , MyName("lz4-" + TC::CPrefix() + "-" + TD::DPrefix()) + { + } + + TStringBuf Name() const noexcept override { + return MyName; + } + + const TString MyName; + }; + + struct TLz4Registrar { + TLz4Registrar() { + for (int i = 0; i < 30; ++i) { + typedef TLz4Codec<TLz4FastCompress, TLz4FastDecompress> T1; + typedef TLz4Codec<TLz4FastCompress, TLz4SafeDecompress> T2; + + THolder<T1> t1(new T1(i)); + THolder<T2> t2(new T2(i)); + + if (t1->Methods) { + RegisterCodec(std::move(t1)); + } + + if (t2->Methods) { + RegisterCodec(std::move(t2)); + } + } + + RegisterCodec(MakeHolder<TLz4Codec<TLz4BestCompress, TLz4FastDecompress>>()); + RegisterCodec(MakeHolder<TLz4Codec<TLz4BestCompress, TLz4SafeDecompress>>()); + + RegisterAlias("lz4-fast-safe", "lz4-fast14-safe"); + RegisterAlias("lz4-fast-fast", "lz4-fast14-fast"); + RegisterAlias("lz4", "lz4-fast-safe"); + RegisterAlias("lz4fast", "lz4-fast-fast"); + RegisterAlias("lz4hc", "lz4-hc-safe"); + } + }; + const TLz4Registrar Registrar{}; +} diff --git a/library/cpp/blockcodecs/codecs/lz4/ya.make b/library/cpp/blockcodecs/codecs/lz4/ya.make new file mode 100644 index 00000000000..f2471d7d96f --- /dev/null +++ b/library/cpp/blockcodecs/codecs/lz4/ya.make @@ -0,0 +1,15 @@ +LIBRARY() + +OWNER(pg) + +PEERDIR( + contrib/libs/lz4 + contrib/libs/lz4/generated + library/cpp/blockcodecs/core +) + +SRCS( + GLOBAL lz4.cpp +) + +END() diff --git a/library/cpp/blockcodecs/codecs/lzma/lzma.cpp b/library/cpp/blockcodecs/codecs/lzma/lzma.cpp new file mode 100644 index 00000000000..6c8d5fded42 --- /dev/null +++ b/library/cpp/blockcodecs/codecs/lzma/lzma.cpp @@ -0,0 +1,74 @@ +#include <library/cpp/blockcodecs/core/codecs.h> +#include <library/cpp/blockcodecs/core/common.h> +#include <library/cpp/blockcodecs/core/register.h> + +#include <contrib/libs/lzmasdk/LzmaLib.h> + +using namespace NBlockCodecs; + +namespace { + struct TLzmaCodec: public TAddLengthCodec<TLzmaCodec> { + inline TLzmaCodec(int level) + : Level(level) + , MyName("lzma-" + ToString(Level)) + { + } + + static inline size_t DoMaxCompressedLength(size_t in) noexcept { + return Max<size_t>(in + in / 20, 128) + LZMA_PROPS_SIZE; + } + + TStringBuf Name() const noexcept override { + return MyName; + } + + inline size_t DoCompress(const TData& in, void* buf) const { + unsigned char* props = (unsigned char*)buf; + unsigned char* data = props + LZMA_PROPS_SIZE; + size_t destLen = Max<size_t>(); + size_t outPropsSize = LZMA_PROPS_SIZE; + + const int ret = LzmaCompress(data, &destLen, (const unsigned char*)in.data(), in.size(), props, &outPropsSize, Level, 0, -1, -1, -1, -1, -1); + + if (ret != SZ_OK) { + ythrow TCompressError(ret); + } + + return destLen + LZMA_PROPS_SIZE; + } + + inline void DoDecompress(const TData& in, void* out, size_t len) const { + if (in.size() <= LZMA_PROPS_SIZE) { + ythrow TDataError() << TStringBuf("broken lzma stream"); + } + + const unsigned char* props = (const unsigned char*)in.data(); + const unsigned char* data = props + LZMA_PROPS_SIZE; + size_t destLen = len; + SizeT srcLen = in.size() - LZMA_PROPS_SIZE; + + const int res = LzmaUncompress((unsigned char*)out, &destLen, data, &srcLen, props, LZMA_PROPS_SIZE); + + if (res != SZ_OK) { + ythrow TDecompressError(res); + } + + if (destLen != len) { + ythrow TDecompressError(len, destLen); + } + } + + const int Level; + const TString MyName; + }; + + struct TLzmaRegistrar { + TLzmaRegistrar() { + for (int i = 0; i < 10; ++i) { + RegisterCodec(MakeHolder<TLzmaCodec>(i)); + } + RegisterAlias("lzma", "lzma-5"); + } + }; + const TLzmaRegistrar Registrar{}; +} diff --git a/library/cpp/blockcodecs/codecs/lzma/ya.make b/library/cpp/blockcodecs/codecs/lzma/ya.make new file mode 100644 index 00000000000..e145834da68 --- /dev/null +++ b/library/cpp/blockcodecs/codecs/lzma/ya.make @@ -0,0 +1,14 @@ +LIBRARY() + +OWNER(pg) + +PEERDIR( + contrib/libs/lzmasdk + library/cpp/blockcodecs/core +) + +SRCS( + GLOBAL lzma.cpp +) + +END() diff --git a/library/cpp/blockcodecs/codecs/snappy/snappy.cpp b/library/cpp/blockcodecs/codecs/snappy/snappy.cpp new file mode 100644 index 00000000000..f6be05a05fc --- /dev/null +++ b/library/cpp/blockcodecs/codecs/snappy/snappy.cpp @@ -0,0 +1,52 @@ +#include <library/cpp/blockcodecs/core/codecs.h> +#include <library/cpp/blockcodecs/core/common.h> +#include <library/cpp/blockcodecs/core/register.h> + +#include <contrib/libs/snappy/snappy.h> + +using namespace NBlockCodecs; + +namespace { + struct TSnappyCodec: public ICodec { + size_t DecompressedLength(const TData& in) const override { + size_t ret; + + if (snappy::GetUncompressedLength(in.data(), in.size(), &ret)) { + return ret; + } + + ythrow TDecompressError(0); + } + + size_t MaxCompressedLength(const TData& in) const override { + return snappy::MaxCompressedLength(in.size()); + } + + size_t Compress(const TData& in, void* out) const override { + size_t ret; + + snappy::RawCompress(in.data(), in.size(), (char*)out, &ret); + + return ret; + } + + size_t Decompress(const TData& in, void* out) const override { + if (snappy::RawUncompress(in.data(), in.size(), (char*)out)) { + return DecompressedLength(in); + } + + ythrow TDecompressError(0); + } + + TStringBuf Name() const noexcept override { + return "snappy"; + } + }; + + struct TSnappyRegistrar { + TSnappyRegistrar() { + RegisterCodec(MakeHolder<TSnappyCodec>()); + } + }; + const TSnappyRegistrar Registrar{}; +} diff --git a/library/cpp/blockcodecs/codecs/snappy/ya.make b/library/cpp/blockcodecs/codecs/snappy/ya.make new file mode 100644 index 00000000000..0cf2be2f944 --- /dev/null +++ b/library/cpp/blockcodecs/codecs/snappy/ya.make @@ -0,0 +1,14 @@ +LIBRARY() + +OWNER(pg) + +PEERDIR( + contrib/libs/snappy + library/cpp/blockcodecs/core +) + +SRCS( + GLOBAL snappy.cpp +) + +END() diff --git a/library/cpp/blockcodecs/codecs/zlib/ya.make b/library/cpp/blockcodecs/codecs/zlib/ya.make new file mode 100644 index 00000000000..9f04995f667 --- /dev/null +++ b/library/cpp/blockcodecs/codecs/zlib/ya.make @@ -0,0 +1,14 @@ +LIBRARY() + +OWNER(pg) + +PEERDIR( + contrib/libs/zlib + library/cpp/blockcodecs/core +) + +SRCS( + GLOBAL zlib.cpp +) + +END() diff --git a/library/cpp/blockcodecs/codecs/zlib/zlib.cpp b/library/cpp/blockcodecs/codecs/zlib/zlib.cpp new file mode 100644 index 00000000000..cdb556c36d4 --- /dev/null +++ b/library/cpp/blockcodecs/codecs/zlib/zlib.cpp @@ -0,0 +1,64 @@ +#include <library/cpp/blockcodecs/core/codecs.h> +#include <library/cpp/blockcodecs/core/common.h> +#include <library/cpp/blockcodecs/core/register.h> + +#include <contrib/libs/zlib/zlib.h> + +using namespace NBlockCodecs; + +namespace { + struct TZLibCodec: public TAddLengthCodec<TZLibCodec> { + inline TZLibCodec(int level) + : MyName("zlib-" + ToString(level)) + , Level(level) + { + } + + static inline size_t DoMaxCompressedLength(size_t in) noexcept { + return compressBound(in); + } + + TStringBuf Name() const noexcept override { + return MyName; + } + + inline size_t DoCompress(const TData& in, void* buf) const { + //TRASH detected + uLong ret = Max<unsigned int>(); + + int cres = compress2((Bytef*)buf, &ret, (const Bytef*)in.data(), in.size(), Level); + + if (cres != Z_OK) { + ythrow TCompressError(cres); + } + + return ret; + } + + inline void DoDecompress(const TData& in, void* out, size_t len) const { + uLong ret = len; + + int uncres = uncompress((Bytef*)out, &ret, (const Bytef*)in.data(), in.size()); + if (uncres != Z_OK) { + ythrow TDecompressError(uncres); + } + + if (ret != len) { + ythrow TDecompressError(len, ret); + } + } + + const TString MyName; + const int Level; + }; + + struct TZLibRegistrar { + TZLibRegistrar() { + for (int i = 0; i < 10; ++i) { + RegisterCodec(MakeHolder<TZLibCodec>(i)); + } + RegisterAlias("zlib", "zlib-6"); + } + }; + const TZLibRegistrar Registrar{}; +} diff --git a/library/cpp/blockcodecs/codecs/zstd/ya.make b/library/cpp/blockcodecs/codecs/zstd/ya.make new file mode 100644 index 00000000000..c077dd47b74 --- /dev/null +++ b/library/cpp/blockcodecs/codecs/zstd/ya.make @@ -0,0 +1,14 @@ +LIBRARY() + +OWNER(pg) + +PEERDIR( + contrib/libs/zstd + library/cpp/blockcodecs/core +) + +SRCS( + GLOBAL zstd.cpp +) + +END() diff --git a/library/cpp/blockcodecs/codecs/zstd/zstd.cpp b/library/cpp/blockcodecs/codecs/zstd/zstd.cpp new file mode 100644 index 00000000000..95299b3f6d3 --- /dev/null +++ b/library/cpp/blockcodecs/codecs/zstd/zstd.cpp @@ -0,0 +1,59 @@ +#include <library/cpp/blockcodecs/core/codecs.h> +#include <library/cpp/blockcodecs/core/common.h> +#include <library/cpp/blockcodecs/core/register.h> + +#define ZSTD_STATIC_LINKING_ONLY +#include <contrib/libs/zstd/include/zstd.h> + +using namespace NBlockCodecs; + +namespace { + struct TZStd08Codec: public TAddLengthCodec<TZStd08Codec> { + inline TZStd08Codec(unsigned level) + : Level(level) + , MyName(TStringBuf("zstd08_") + ToString(Level)) + { + } + + static inline size_t CheckError(size_t ret, const char* what) { + if (ZSTD_isError(ret)) { + ythrow yexception() << what << TStringBuf(" zstd error: ") << ZSTD_getErrorName(ret); + } + + return ret; + } + + static inline size_t DoMaxCompressedLength(size_t l) noexcept { + return ZSTD_compressBound(l); + } + + inline size_t DoCompress(const TData& in, void* out) const { + return CheckError(ZSTD_compress(out, DoMaxCompressedLength(in.size()), in.data(), in.size(), Level), "compress"); + } + + inline void DoDecompress(const TData& in, void* out, size_t dsize) const { + const size_t res = CheckError(ZSTD_decompress(out, dsize, in.data(), in.size()), "decompress"); + + if (res != dsize) { + ythrow TDecompressError(dsize, res); + } + } + + TStringBuf Name() const noexcept override { + return MyName; + } + + const unsigned Level; + const TString MyName; + }; + + struct TZStd08Registrar { + TZStd08Registrar() { + for (int i = 1; i <= ZSTD_maxCLevel(); ++i) { + RegisterCodec(MakeHolder<TZStd08Codec>(i)); + RegisterAlias("zstd_" + ToString(i), "zstd08_" + ToString(i)); + } + } + }; + const TZStd08Registrar Registrar{}; +} diff --git a/library/cpp/blockcodecs/codecs_ut.cpp b/library/cpp/blockcodecs/codecs_ut.cpp new file mode 100644 index 00000000000..bfe5a236909 --- /dev/null +++ b/library/cpp/blockcodecs/codecs_ut.cpp @@ -0,0 +1,340 @@ +#include "codecs.h" +#include "stream.h" + +#include <library/cpp/testing/unittest/registar.h> + +#include <util/stream/str.h> +#include <util/string/join.h> +#include <util/digest/multi.h> + +Y_UNIT_TEST_SUITE(TBlockCodecsTest) { + using namespace NBlockCodecs; + + TBuffer Buffer(TStringBuf b) { + TBuffer bb; + bb.Assign(b.data(), b.size()); + return bb; + } + + void TestAllAtOnce(size_t n, size_t m) { + TVector<TBuffer> datas; + + datas.emplace_back(); + datas.push_back(Buffer("na gorshke sidel korol")); + datas.push_back(Buffer(TStringBuf("", 1))); + datas.push_back(Buffer(" ")); + datas.push_back(Buffer(" ")); + datas.push_back(Buffer(" ")); + datas.push_back(Buffer(" ")); + + { + TStringStream data; + + for (size_t i = 0; i < 1024; ++i) { + data << " " << i; + } + + datas.push_back(Buffer(data.Str())); + } + + TCodecList lst = ListAllCodecs(); + + for (size_t i = 0; i < lst.size(); ++i) { + const ICodec* c = Codec(lst[i]); + const auto h = MultiHash(c->Name(), i, 1); + + if (h % n == m) { + } else { + continue; + } + + for (size_t j = 0; j < datas.size(); ++j) { + const TBuffer& data = datas[j]; + TString res; + + try { + TBuffer e, d; + c->Encode(data, e); + c->Decode(e, d); + d.AsString(res); + UNIT_ASSERT_EQUAL(NBlockCodecs::TData(res), NBlockCodecs::TData(data)); + } catch (...) { + Cerr << c->Name() << "(" << res.Quote() << ")(" << TString{NBlockCodecs::TData(data)}.Quote() << ")" << Endl; + + throw; + } + } + } + } + + Y_UNIT_TEST(TestAllAtOnce0) { + TestAllAtOnce(20, 0); + } + + Y_UNIT_TEST(TestAllAtOnce1) { + TestAllAtOnce(20, 1); + } + + Y_UNIT_TEST(TestAllAtOnce2) { + TestAllAtOnce(20, 2); + } + + Y_UNIT_TEST(TestAllAtOnce3) { + TestAllAtOnce(20, 3); + } + + Y_UNIT_TEST(TestAllAtOnce4) { + TestAllAtOnce(20, 4); + } + + Y_UNIT_TEST(TestAllAtOnce5) { + TestAllAtOnce(20, 5); + } + + Y_UNIT_TEST(TestAllAtOnce6) { + TestAllAtOnce(20, 6); + } + + Y_UNIT_TEST(TestAllAtOnce7) { + TestAllAtOnce(20, 7); + } + + Y_UNIT_TEST(TestAllAtOnce8) { + TestAllAtOnce(20, 8); + } + + Y_UNIT_TEST(TestAllAtOnce9) { + TestAllAtOnce(20, 9); + } + + Y_UNIT_TEST(TestAllAtOnce10) { + TestAllAtOnce(20, 10); + } + + Y_UNIT_TEST(TestAllAtOnce12) { + TestAllAtOnce(20, 12); + } + + Y_UNIT_TEST(TestAllAtOnce13) { + TestAllAtOnce(20, 13); + } + + Y_UNIT_TEST(TestAllAtOnce14) { + TestAllAtOnce(20, 14); + } + + Y_UNIT_TEST(TestAllAtOnce15) { + TestAllAtOnce(20, 15); + } + + Y_UNIT_TEST(TestAllAtOnce16) { + TestAllAtOnce(20, 16); + } + + Y_UNIT_TEST(TestAllAtOnce17) { + TestAllAtOnce(20, 17); + } + + Y_UNIT_TEST(TestAllAtOnce18) { + TestAllAtOnce(20, 18); + } + + Y_UNIT_TEST(TestAllAtOnce19) { + TestAllAtOnce(20, 19); + } + + void TestStreams(size_t n, size_t m) { + TVector<TString> datas; + TString res; + + for (size_t i = 0; i < 256; ++i) { + datas.push_back(TString(i, (char)(i % 128))); + } + + for (size_t i = 0; i < datas.size(); ++i) { + res += datas[i]; + } + + TCodecList lst = ListAllCodecs(); + + for (size_t i = 0; i < lst.size(); ++i) { + TStringStream ss; + + const ICodec* c = Codec(lst[i]); + const auto h = MultiHash(c->Name(), i, 2); + + if (h % n == m) { + } else { + continue; + } + + { + TCodedOutput out(&ss, c, 1234); + + for (size_t j = 0; j < datas.size(); ++j) { + out << datas[j]; + } + + out.Finish(); + } + + const TString resNew = TDecodedInput(&ss).ReadAll(); + + try { + UNIT_ASSERT_EQUAL(resNew, res); + } catch (...) { + Cerr << c->Name() << Endl; + + throw; + } + } + } + + Y_UNIT_TEST(TestStreams0) { + TestStreams(20, 0); + } + + Y_UNIT_TEST(TestStreams1) { + TestStreams(20, 1); + } + + Y_UNIT_TEST(TestStreams2) { + TestStreams(20, 2); + } + + Y_UNIT_TEST(TestStreams3) { + TestStreams(20, 3); + } + + Y_UNIT_TEST(TestStreams4) { + TestStreams(20, 4); + } + + Y_UNIT_TEST(TestStreams5) { + TestStreams(20, 5); + } + + Y_UNIT_TEST(TestStreams6) { + TestStreams(20, 6); + } + + Y_UNIT_TEST(TestStreams7) { + TestStreams(20, 7); + } + + Y_UNIT_TEST(TestStreams8) { + TestStreams(20, 8); + } + + Y_UNIT_TEST(TestStreams9) { + TestStreams(20, 9); + } + + Y_UNIT_TEST(TestStreams10) { + TestStreams(20, 10); + } + + Y_UNIT_TEST(TestStreams11) { + TestStreams(20, 11); + } + + Y_UNIT_TEST(TestStreams12) { + TestStreams(20, 12); + } + + Y_UNIT_TEST(TestStreams13) { + TestStreams(20, 13); + } + + Y_UNIT_TEST(TestStreams14) { + TestStreams(20, 14); + } + + Y_UNIT_TEST(TestStreams15) { + TestStreams(20, 15); + } + + Y_UNIT_TEST(TestStreams16) { + TestStreams(20, 16); + } + + Y_UNIT_TEST(TestStreams17) { + TestStreams(20, 17); + } + + Y_UNIT_TEST(TestStreams18) { + TestStreams(20, 18); + } + + Y_UNIT_TEST(TestStreams19) { + TestStreams(20, 19); + } + + Y_UNIT_TEST(TestMaxPossibleDecompressedSize) { + + UNIT_ASSERT_VALUES_EQUAL(GetMaxPossibleDecompressedLength(), Max<size_t>()); + + TVector<char> input(10001, ' '); + TCodecList codecs = ListAllCodecs(); + SetMaxPossibleDecompressedLength(10000); + + for (const auto& codec : codecs) { + const ICodec* c = Codec(codec); + TBuffer inputBuffer(input.data(), input.size()); + TBuffer output; + TBuffer decompressed; + c->Encode(inputBuffer, output); + UNIT_ASSERT_EXCEPTION(c->Decode(output, decompressed), yexception); + } + + // restore status quo + SetMaxPossibleDecompressedLength(Max<size_t>()); + } + + Y_UNIT_TEST(TestListAllCodecs) { + static const TString ALL_CODECS = + "brotli_1,brotli_10,brotli_11,brotli_2,brotli_3,brotli_4,brotli_5,brotli_6,brotli_7,brotli_8,brotli_9," + + "bzip2,bzip2-1,bzip2-2,bzip2-3,bzip2-4,bzip2-5,bzip2-6,bzip2-7,bzip2-8,bzip2-9," + + "fastlz,fastlz-0,fastlz-1,fastlz-2," + + "lz4,lz4-fast-fast,lz4-fast-safe,lz4-fast10-fast,lz4-fast10-safe,lz4-fast11-fast,lz4-fast11-safe," + "lz4-fast12-fast,lz4-fast12-safe,lz4-fast13-fast,lz4-fast13-safe,lz4-fast14-fast,lz4-fast14-safe," + "lz4-fast15-fast,lz4-fast15-safe,lz4-fast16-fast,lz4-fast16-safe,lz4-fast17-fast,lz4-fast17-safe," + "lz4-fast18-fast,lz4-fast18-safe,lz4-fast19-fast,lz4-fast19-safe,lz4-fast20-fast,lz4-fast20-safe," + "lz4-hc-fast,lz4-hc-safe,lz4fast,lz4hc," + + "lzma,lzma-0,lzma-1,lzma-2,lzma-3,lzma-4,lzma-5,lzma-6,lzma-7,lzma-8,lzma-9," + + "null," + + "snappy," + + "zlib,zlib-0,zlib-1,zlib-2,zlib-3,zlib-4,zlib-5,zlib-6,zlib-7,zlib-8,zlib-9," + + "zstd06_1,zstd06_10,zstd06_11,zstd06_12,zstd06_13,zstd06_14,zstd06_15,zstd06_16,zstd06_17,zstd06_18," + "zstd06_19,zstd06_2,zstd06_20,zstd06_21,zstd06_22,zstd06_3,zstd06_4,zstd06_5,zstd06_6,zstd06_7,zstd06_8," + "zstd06_9," + + "zstd08_1,zstd08_10,zstd08_11,zstd08_12,zstd08_13,zstd08_14,zstd08_15,zstd08_16,zstd08_17,zstd08_18," + "zstd08_19,zstd08_2,zstd08_20,zstd08_21,zstd08_22,zstd08_3,zstd08_4,zstd08_5,zstd08_6,zstd08_7,zstd08_8," + "zstd08_9,zstd_1,zstd_10,zstd_11,zstd_12,zstd_13,zstd_14,zstd_15,zstd_16,zstd_17,zstd_18,zstd_19,zstd_2," + "zstd_20,zstd_21,zstd_22,zstd_3,zstd_4,zstd_5,zstd_6,zstd_7,zstd_8,zstd_9"; + + UNIT_ASSERT_VALUES_EQUAL(ALL_CODECS, JoinSeq(",", ListAllCodecs())); + } + + Y_UNIT_TEST(TestEncodeDecodeIntoString) { + TStringBuf data = "na gorshke sidel korol"; + + TCodecList codecs = ListAllCodecs(); + for (const auto& codec : codecs) { + const ICodec* c = Codec(codec); + TString encoded = c->Encode(data); + TString decoded = c->Decode(encoded); + + UNIT_ASSERT_VALUES_EQUAL(decoded, data); + } + } +} diff --git a/library/cpp/blockcodecs/core/codecs.cpp b/library/cpp/blockcodecs/core/codecs.cpp new file mode 100644 index 00000000000..21506e812b4 --- /dev/null +++ b/library/cpp/blockcodecs/core/codecs.cpp @@ -0,0 +1,148 @@ +#include "codecs.h" +#include "common.h" +#include "register.h" + +#include <util/ysaveload.h> +#include <util/stream/null.h> +#include <util/stream/mem.h> +#include <util/string/cast.h> +#include <util/string/join.h> +#include <util/system/align.h> +#include <util/system/unaligned_mem.h> +#include <util/generic/hash.h> +#include <util/generic/cast.h> +#include <util/generic/deque.h> +#include <util/generic/buffer.h> +#include <util/generic/array_ref.h> +#include <util/generic/singleton.h> +#include <util/generic/algorithm.h> +#include <util/generic/mem_copy.h> + +using namespace NBlockCodecs; + +namespace { + + struct TCodecFactory { + inline TCodecFactory() { + Add(&Null); + } + + inline const ICodec* Find(const TStringBuf& name) const { + auto it = Registry.find(name); + + if (it == Registry.end()) { + ythrow TNotFound() << "can not found " << name << " codec"; + } + + return it->second; + } + + inline void ListCodecs(TCodecList& lst) const { + for (const auto& it : Registry) { + lst.push_back(it.first); + } + + Sort(lst.begin(), lst.end()); + } + + inline void Add(ICodec* codec) { + Registry[codec->Name()] = codec; + } + + inline void Add(TCodecPtr codec) { + Codecs.push_back(std::move(codec)); + Add(Codecs.back().Get()); + } + + inline void Alias(TStringBuf from, TStringBuf to) { + Tmp.emplace_back(from); + Registry[Tmp.back()] = Registry[to]; + } + + TDeque<TString> Tmp; + TNullCodec Null; + TVector<TCodecPtr> Codecs; + typedef THashMap<TStringBuf, ICodec*> TRegistry; + TRegistry Registry; + + // SEARCH-8344: Global decompressed size limiter (to prevent remote DoS) + size_t MaxPossibleDecompressedLength = Max<size_t>(); + }; +} + +const ICodec* NBlockCodecs::Codec(const TStringBuf& name) { + return Singleton<TCodecFactory>()->Find(name); +} + +TCodecList NBlockCodecs::ListAllCodecs() { + TCodecList ret; + + Singleton<TCodecFactory>()->ListCodecs(ret); + + return ret; +} + +TString NBlockCodecs::ListAllCodecsAsString() { + return JoinSeq(TStringBuf(","), ListAllCodecs()); +} + +void NBlockCodecs::RegisterCodec(TCodecPtr codec) { + Singleton<TCodecFactory>()->Add(std::move(codec)); +} + +void NBlockCodecs::RegisterAlias(TStringBuf from, TStringBuf to) { + Singleton<TCodecFactory>()->Alias(from, to); +} + +void NBlockCodecs::SetMaxPossibleDecompressedLength(size_t maxPossibleDecompressedLength) { + Singleton<TCodecFactory>()->MaxPossibleDecompressedLength = maxPossibleDecompressedLength; +} + +size_t NBlockCodecs::GetMaxPossibleDecompressedLength() { + return Singleton<TCodecFactory>()->MaxPossibleDecompressedLength; +} + +size_t ICodec::GetDecompressedLength(const TData& in) const { + const size_t len = DecompressedLength(in); + + Y_ENSURE( + len <= NBlockCodecs::GetMaxPossibleDecompressedLength(), + "Attempt to decompress the block that is larger than maximum possible decompressed length, " + "see SEARCH-8344 for details. " + ); + return len; +} + +void ICodec::Encode(const TData& in, TBuffer& out) const { + const size_t maxLen = MaxCompressedLength(in); + + out.Reserve(maxLen); + out.Resize(Compress(in, out.Data())); +} + +void ICodec::Decode(const TData& in, TBuffer& out) const { + const size_t len = GetDecompressedLength(in); + + out.Reserve(len); + out.Resize(Decompress(in, out.Data())); +} + +void ICodec::Encode(const TData& in, TString& out) const { + const size_t maxLen = MaxCompressedLength(in); + out.ReserveAndResize(maxLen); + + size_t actualLen = Compress(in, out.begin()); + Y_ASSERT(actualLen <= maxLen); + out.resize(actualLen); +} + +void ICodec::Decode(const TData& in, TString& out) const { + const size_t maxLen = GetDecompressedLength(in); + out.ReserveAndResize(maxLen); + + size_t actualLen = Decompress(in, out.begin()); + Y_ASSERT(actualLen <= maxLen); + out.resize(actualLen); +} + +ICodec::~ICodec() = default; diff --git a/library/cpp/blockcodecs/core/codecs.h b/library/cpp/blockcodecs/core/codecs.h new file mode 100644 index 00000000000..9c93c002748 --- /dev/null +++ b/library/cpp/blockcodecs/core/codecs.h @@ -0,0 +1,90 @@ +#pragma once + +#include <util/generic/buffer.h> +#include <util/generic/strbuf.h> +#include <util/generic/string.h> +#include <util/generic/typetraits.h> +#include <util/generic/vector.h> +#include <util/generic/yexception.h> + +namespace NBlockCodecs { + struct TData: public TStringBuf { + inline TData() = default; + + Y_HAS_MEMBER(Data); + Y_HAS_MEMBER(Size); + + template <class T, std::enable_if_t<!THasSize<T>::value || !THasData<T>::value, int> = 0> + inline TData(const T& t) + : TStringBuf((const char*)t.data(), t.size()) + { + } + + template <class T, std::enable_if_t<THasSize<T>::value && THasData<T>::value, int> = 0> + inline TData(const T& t) + : TStringBuf((const char*)t.Data(), t.Size()) + { + } + }; + + struct TCodecError: public yexception { + }; + + struct TNotFound: public TCodecError { + }; + + struct TDataError: public TCodecError { + }; + + struct ICodec { + virtual ~ICodec(); + + // main interface + virtual size_t DecompressedLength(const TData& in) const = 0; + virtual size_t MaxCompressedLength(const TData& in) const = 0; + virtual size_t Compress(const TData& in, void* out) const = 0; + virtual size_t Decompress(const TData& in, void* out) const = 0; + + virtual TStringBuf Name() const noexcept = 0; + + // some useful helpers + void Encode(const TData& in, TBuffer& out) const; + void Decode(const TData& in, TBuffer& out) const; + + void Encode(const TData& in, TString& out) const; + void Decode(const TData& in, TString& out) const; + + inline TString Encode(const TData& in) const { + TString out; + + Encode(in, out); + + return out; + } + + inline TString Decode(const TData& in) const { + TString out; + + Decode(in, out); + + return out; + } + private: + size_t GetDecompressedLength(const TData& in) const; + }; + + using TCodecPtr = THolder<ICodec>; + + const ICodec* Codec(const TStringBuf& name); + + // some aux methods + typedef TVector<TStringBuf> TCodecList; + TCodecList ListAllCodecs(); + TString ListAllCodecsAsString(); + + // SEARCH-8344: Get the size of max possible decompressed block + size_t GetMaxPossibleDecompressedLength(); + // SEARCH-8344: Globally set the size of max possible decompressed block + void SetMaxPossibleDecompressedLength(size_t maxPossibleDecompressedLength); + +} diff --git a/library/cpp/blockcodecs/core/common.h b/library/cpp/blockcodecs/core/common.h new file mode 100644 index 00000000000..f05df4d3341 --- /dev/null +++ b/library/cpp/blockcodecs/core/common.h @@ -0,0 +1,105 @@ +#pragma once + +#include "codecs.h" + +#include <util/ysaveload.h> +#include <util/stream/null.h> +#include <util/stream/mem.h> +#include <util/string/cast.h> +#include <util/string/join.h> +#include <util/system/align.h> +#include <util/system/unaligned_mem.h> +#include <util/generic/hash.h> +#include <util/generic/cast.h> +#include <util/generic/buffer.h> +#include <util/generic/array_ref.h> +#include <util/generic/singleton.h> +#include <util/generic/algorithm.h> +#include <util/generic/mem_copy.h> + +namespace NBlockCodecs { + struct TDecompressError: public TDataError { + TDecompressError(int code) { + *this << "cannot decompress (errcode " << code << ")"; + } + + TDecompressError(size_t exp, size_t real) { + *this << "broken input (expected len: " << exp << ", got: " << real << ")"; + } + }; + + struct TCompressError: public TDataError { + TCompressError(int code) { + *this << "cannot compress (errcode " << code << ")"; + } + }; + + struct TNullCodec: public ICodec { + size_t DecompressedLength(const TData& in) const override { + return in.size(); + } + + size_t MaxCompressedLength(const TData& in) const override { + return in.size(); + } + + size_t Compress(const TData& in, void* out) const override { + MemCopy((char*)out, in.data(), in.size()); + + return in.size(); + } + + size_t Decompress(const TData& in, void* out) const override { + MemCopy((char*)out, in.data(), in.size()); + + return in.size(); + } + + TStringBuf Name() const noexcept override { + return TStringBuf("null"); + } + }; + + template <class T> + struct TAddLengthCodec: public ICodec { + static inline void Check(const TData& in) { + if (in.size() < sizeof(ui64)) { + ythrow TDataError() << "too small input"; + } + } + + size_t DecompressedLength(const TData& in) const override { + Check(in); + + return ReadUnaligned<ui64>(in.data()); + } + + size_t MaxCompressedLength(const TData& in) const override { + return T::DoMaxCompressedLength(in.size()) + sizeof(ui64); + } + + size_t Compress(const TData& in, void* out) const override { + ui64* ptr = (ui64*)out; + + WriteUnaligned<ui64>(ptr, (ui64) in.size()); + + return Base()->DoCompress(!in ? TData(TStringBuf("")) : in, ptr + 1) + sizeof(*ptr); + } + + size_t Decompress(const TData& in, void* out) const override { + Check(in); + + const auto len = ReadUnaligned<ui64>(in.data()); + + if (!len) + return 0; + + Base()->DoDecompress(TData(in).Skip(sizeof(len)), out, len); + return len; + } + + inline const T* Base() const noexcept { + return static_cast<const T*>(this); + } + }; +} diff --git a/library/cpp/blockcodecs/core/register.h b/library/cpp/blockcodecs/core/register.h new file mode 100644 index 00000000000..fa1186dd705 --- /dev/null +++ b/library/cpp/blockcodecs/core/register.h @@ -0,0 +1,10 @@ +#pragma once + +#include "codecs.h" + +namespace NBlockCodecs{ + + void RegisterCodec(TCodecPtr codec); + void RegisterAlias(TStringBuf from, TStringBuf to); + +} diff --git a/library/cpp/blockcodecs/core/stream.cpp b/library/cpp/blockcodecs/core/stream.cpp new file mode 100644 index 00000000000..4f7db3c32be --- /dev/null +++ b/library/cpp/blockcodecs/core/stream.cpp @@ -0,0 +1,212 @@ +#include "stream.h" +#include "codecs.h" + +#include <util/digest/murmur.h> +#include <util/generic/scope.h> +#include <util/generic/cast.h> +#include <util/generic/hash.h> +#include <util/generic/singleton.h> +#include <util/stream/mem.h> +#include <util/ysaveload.h> + +using namespace NBlockCodecs; + +namespace { + constexpr size_t MAX_BUF_LEN = 128 * 1024 * 1024; + + typedef ui16 TCodecID; + typedef ui64 TBlockLen; + + struct TIds { + inline TIds() { + const TCodecList lst = ListAllCodecs(); + + for (size_t i = 0; i < lst.size(); ++i) { + const ICodec* c = Codec(lst[i]); + + ByID[CodecID(c)] = c; + } + } + + static inline TCodecID CodecID(const ICodec* c) { + const TStringBuf name = c->Name(); + + union { + ui16 Parts[2]; + ui32 Data; + } x; + + x.Data = MurmurHash<ui32>(name.data(), name.size()); + + return x.Parts[1] ^ x.Parts[0]; + } + + inline const ICodec* Find(TCodecID id) const { + TByID::const_iterator it = ByID.find(id); + + if (it != ByID.end()) { + return it->second; + } + + ythrow yexception() << "can not find codec by id " << id; + } + + typedef THashMap<TCodecID, const ICodec*> TByID; + TByID ByID; + }; + + TCodecID CodecID(const ICodec* c) { + return TIds::CodecID(c); + } + + const ICodec* CodecByID(TCodecID id) { + return Singleton<TIds>()->Find(id); + } +} + +TCodedOutput::TCodedOutput(IOutputStream* out, const ICodec* c, size_t bufLen) + : C_(c) + , D_(bufLen) + , S_(out) +{ + if (bufLen > MAX_BUF_LEN) { + ythrow yexception() << TStringBuf("too big buffer size: ") << bufLen; + } +} + +TCodedOutput::~TCodedOutput() { + try { + Finish(); + } catch (...) { + } +} + +void TCodedOutput::DoWrite(const void* buf, size_t len) { + const char* in = (const char*)buf; + + while (len) { + const size_t avail = D_.Avail(); + + if (len < avail) { + D_.Append(in, len); + + return; + } + + D_.Append(in, avail); + + Y_ASSERT(!D_.Avail()); + + in += avail; + len -= avail; + + Y_VERIFY(FlushImpl(), "flush on writing failed"); + } +} + +bool TCodedOutput::FlushImpl() { + const bool ret = !D_.Empty(); + const size_t payload = sizeof(TCodecID) + sizeof(TBlockLen); + O_.Reserve(C_->MaxCompressedLength(D_) + payload); + + void* out = O_.Data() + payload; + const size_t olen = C_->Compress(D_, out); + + { + TMemoryOutput mo(O_.Data(), payload); + + ::Save(&mo, CodecID(C_)); + ::Save(&mo, SafeIntegerCast<TBlockLen>(olen)); + } + + S_->Write(O_.Data(), payload + olen); + + D_.Clear(); + O_.Clear(); + + return ret; +} + +void TCodedOutput::DoFlush() { + if (S_ && !D_.Empty()) { + FlushImpl(); + } +} + +void TCodedOutput::DoFinish() { + if (S_) { + Y_DEFER { + S_ = nullptr; + }; + + if (FlushImpl()) { + //always write zero-length block as eos marker + FlushImpl(); + } + } +} + +TDecodedInput::TDecodedInput(IInputStream* in) + : S_(in) + , C_(nullptr) +{ +} + +TDecodedInput::TDecodedInput(IInputStream* in, const ICodec* codec) + : S_(in) + , C_(codec) +{ +} + +TDecodedInput::~TDecodedInput() = default; + +size_t TDecodedInput::DoUnboundedNext(const void** ptr) { + if (!S_) { + return 0; + } + + TCodecID codecId; + TBlockLen blockLen; + + { + const size_t payload = sizeof(TCodecID) + sizeof(TBlockLen); + char buf[32]; + + S_->LoadOrFail(buf, payload); + + TMemoryInput in(buf, payload); + + ::Load(&in, codecId); + ::Load(&in, blockLen); + } + + if (!blockLen) { + S_ = nullptr; + + return 0; + } + + if (Y_UNLIKELY(blockLen > 1024 * 1024 * 1024)) { + ythrow yexception() << "block size exceeds 1 GiB"; + } + + TBuffer block; + block.Resize(blockLen); + + S_->LoadOrFail(block.Data(), blockLen); + + auto codec = CodecByID(codecId); + + if (C_) { + Y_ENSURE(C_->Name() == codec->Name(), TStringBuf("incorrect stream codec")); + } + + if (codec->DecompressedLength(block) > MAX_BUF_LEN) { + ythrow yexception() << "broken stream"; + } + + codec->Decode(block, D_); + *ptr = D_.Data(); + + return D_.Size(); +} diff --git a/library/cpp/blockcodecs/core/stream.h b/library/cpp/blockcodecs/core/stream.h new file mode 100644 index 00000000000..fd44ef88f2c --- /dev/null +++ b/library/cpp/blockcodecs/core/stream.h @@ -0,0 +1,46 @@ +#pragma once + +#include <util/stream/walk.h> +#include <util/stream/input.h> +#include <util/stream/output.h> +#include <util/stream/zerocopy.h> +#include <util/generic/buffer.h> + +namespace NBlockCodecs { + struct ICodec; + + class TCodedOutput: public IOutputStream { + public: + TCodedOutput(IOutputStream* out, const ICodec* c, size_t bufLen); + ~TCodedOutput() override; + + private: + void DoWrite(const void* buf, size_t len) override; + void DoFlush() override; + void DoFinish() override; + + bool FlushImpl(); + + private: + const ICodec* C_; + TBuffer D_; + TBuffer O_; + IOutputStream* S_; + }; + + class TDecodedInput: public IWalkInput { + public: + TDecodedInput(IInputStream* in); + TDecodedInput(IInputStream* in, const ICodec* codec); + + ~TDecodedInput() override; + + private: + size_t DoUnboundedNext(const void** ptr) override; + + private: + TBuffer D_; + IInputStream* S_; + const ICodec* C_; + }; +} diff --git a/library/cpp/blockcodecs/core/ya.make b/library/cpp/blockcodecs/core/ya.make new file mode 100644 index 00000000000..069e15927bf --- /dev/null +++ b/library/cpp/blockcodecs/core/ya.make @@ -0,0 +1,10 @@ +LIBRARY() + +OWNER(pg) + +SRCS( + codecs.cpp + stream.cpp +) + +END() diff --git a/library/cpp/blockcodecs/fuzz/main.cpp b/library/cpp/blockcodecs/fuzz/main.cpp new file mode 100644 index 00000000000..763c6c5a10b --- /dev/null +++ b/library/cpp/blockcodecs/fuzz/main.cpp @@ -0,0 +1,84 @@ +#include <contrib/libs/protobuf-mutator/src/libfuzzer/libfuzzer_macro.h> +#include <google/protobuf/stubs/logging.h> + +#include <library/cpp/blockcodecs/codecs.h> +#include <library/cpp/blockcodecs/fuzz/proto/case.pb.h> +#include <library/cpp/blockcodecs/stream.h> + +#include <util/stream/input.h> +#include <util/stream/length.h> +#include <util/stream/mem.h> +#include <util/stream/null.h> +#include <util/stream/str.h> + +using NBlockCodecs::NFuzz::TPackUnpackCase; +using NBlockCodecs::TCodedOutput; +using NBlockCodecs::TDecodedInput; + +static void ValidateBufferSize(const ui32 size) { + Y_ENSURE(size > 0 && size <= 16ULL * 1024); +} + +static void DoOnlyDecode(const TPackUnpackCase& case_) { + if (!case_.GetPacked()) { + return; + } + + TMemoryInput mi(case_.GetData().data(), case_.GetData().size()); + TDecodedInput di(&mi); + TNullOutput no; + TCountingOutput cno(&no); + TransferData(&di, &cno); +} + +static void DoDecodeEncode(const TPackUnpackCase& case_) { + auto* const codec = NBlockCodecs::Codec(case_.GetCodecName()); + Y_ENSURE(codec); + + TMemoryInput mi(case_.GetData().data(), case_.GetData().size()); + TDecodedInput di(&mi, codec); + TStringStream decoded; + TransferData(&di, &decoded); + TNullOutput no; + TCountingOutput cno(&no); + TCodedOutput co(&cno, codec, case_.GetBufferLength()); + TransferData(&decoded, &co); + co.Flush(); + + Y_VERIFY((case_.GetData().size() > 0) == (cno.Counter() > 0)); + Y_VERIFY((case_.GetData().size() > 0) == (decoded.Str().size() > 0)); +} + +static void DoEncodeDecode(const TPackUnpackCase& case_) { + auto* const codec = NBlockCodecs::Codec(case_.GetCodecName()); + Y_ENSURE(codec); + + TMemoryInput mi(case_.GetData().data(), case_.GetData().size()); + TStringStream encoded; + TCodedOutput co(&encoded, codec, case_.GetBufferLength()); + TransferData(&mi, &co); + co.Flush(); + TStringStream decoded; + TDecodedInput di(&encoded, codec); + TransferData(&di, &decoded); + + Y_VERIFY((case_.GetData().size() > 0) == (encoded.Str().size() > 0)); + Y_VERIFY(case_.GetData() == decoded.Str()); +} + +DEFINE_BINARY_PROTO_FUZZER(const TPackUnpackCase& case_) { + try { + if (!case_.GetCodecName()) { + DoOnlyDecode(case_); + return; + } + + ValidateBufferSize(case_.GetBufferLength()); + if (case_.GetPacked()) { + DoDecodeEncode(case_); + } else { + DoEncodeDecode(case_); + } + } catch (const std::exception&) { + } +} diff --git a/library/cpp/blockcodecs/fuzz/proto/case.proto b/library/cpp/blockcodecs/fuzz/proto/case.proto new file mode 100644 index 00000000000..85518b0da9e --- /dev/null +++ b/library/cpp/blockcodecs/fuzz/proto/case.proto @@ -0,0 +1,10 @@ +syntax="proto3"; + +package NBlockCodecs.NFuzz; + +message TPackUnpackCase { + bool Packed = 1; + uint32 BufferLength = 2; + string CodecName = 3; + bytes Data = 4; +} diff --git a/library/cpp/blockcodecs/fuzz/proto/ya.make b/library/cpp/blockcodecs/fuzz/proto/ya.make new file mode 100644 index 00000000000..da840bc8c93 --- /dev/null +++ b/library/cpp/blockcodecs/fuzz/proto/ya.make @@ -0,0 +1,14 @@ +OWNER( + yazevnul + g:util +) + +PROTO_LIBRARY() + +SRCS( + case.proto +) + +EXCLUDE_TAGS(GO_PROTO) + +END() diff --git a/library/cpp/blockcodecs/fuzz/ya.make b/library/cpp/blockcodecs/fuzz/ya.make new file mode 100644 index 00000000000..bc8becc9e1d --- /dev/null +++ b/library/cpp/blockcodecs/fuzz/ya.make @@ -0,0 +1,23 @@ +OWNER( + pg + g:util +) + +IF (NOT MSVC) + FUZZ() + + SIZE(MEDIUM) + + SRCS( + main.cpp + ) + + PEERDIR( + contrib/libs/protobuf + contrib/libs/protobuf-mutator + library/cpp/blockcodecs + library/cpp/blockcodecs/fuzz/proto + ) + + END() +ENDIF() diff --git a/library/cpp/blockcodecs/stream.cpp b/library/cpp/blockcodecs/stream.cpp new file mode 100644 index 00000000000..65e61e92214 --- /dev/null +++ b/library/cpp/blockcodecs/stream.cpp @@ -0,0 +1 @@ +#include "stream.h" diff --git a/library/cpp/blockcodecs/stream.h b/library/cpp/blockcodecs/stream.h new file mode 100644 index 00000000000..96c479cf7e1 --- /dev/null +++ b/library/cpp/blockcodecs/stream.h @@ -0,0 +1,3 @@ +#pragma once + +#include <library/cpp/blockcodecs/core/stream.h> diff --git a/library/cpp/blockcodecs/ut/ya.make b/library/cpp/blockcodecs/ut/ya.make new file mode 100644 index 00000000000..25b882c15b2 --- /dev/null +++ b/library/cpp/blockcodecs/ut/ya.make @@ -0,0 +1,19 @@ +UNITTEST_FOR(library/cpp/blockcodecs) + +OWNER(pg) + +FORK_TESTS() + +FORK_SUBTESTS() + +SPLIT_FACTOR(40) + +TIMEOUT(300) + +SIZE(MEDIUM) + +SRCS( + codecs_ut.cpp +) + +END() diff --git a/library/cpp/blockcodecs/ya.make b/library/cpp/blockcodecs/ya.make new file mode 100644 index 00000000000..b8f03d5421c --- /dev/null +++ b/library/cpp/blockcodecs/ya.make @@ -0,0 +1,27 @@ +LIBRARY() + +OWNER(pg) + +PEERDIR( + library/cpp/blockcodecs/core + library/cpp/blockcodecs/codecs/brotli + library/cpp/blockcodecs/codecs/bzip + library/cpp/blockcodecs/codecs/fastlz + library/cpp/blockcodecs/codecs/legacy_zstd06 + library/cpp/blockcodecs/codecs/lz4 + library/cpp/blockcodecs/codecs/lzma + library/cpp/blockcodecs/codecs/snappy + library/cpp/blockcodecs/codecs/zlib + library/cpp/blockcodecs/codecs/zstd +) + +SRCS( + codecs.cpp + stream.cpp +) + +END() + +RECURSE_FOR_TESTS( + ut +) |