aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/blockcodecs
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/blockcodecs
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/blockcodecs')
-rw-r--r--library/cpp/blockcodecs/README.md23
-rw-r--r--library/cpp/blockcodecs/codecs.cpp1
-rw-r--r--library/cpp/blockcodecs/codecs.h3
-rw-r--r--library/cpp/blockcodecs/codecs/brotli/brotli.cpp67
-rw-r--r--library/cpp/blockcodecs/codecs/brotli/ya.make15
-rw-r--r--library/cpp/blockcodecs/codecs/bzip/bzip.cpp62
-rw-r--r--library/cpp/blockcodecs/codecs/bzip/ya.make14
-rw-r--r--library/cpp/blockcodecs/codecs/fastlz/fastlz.cpp54
-rw-r--r--library/cpp/blockcodecs/codecs/fastlz/ya.make14
-rw-r--r--library/cpp/blockcodecs/codecs/legacy_zstd06/legacy_zstd06.cpp58
-rw-r--r--library/cpp/blockcodecs/codecs/legacy_zstd06/ya.make14
-rw-r--r--library/cpp/blockcodecs/codecs/lz4/lz4.cpp123
-rw-r--r--library/cpp/blockcodecs/codecs/lz4/ya.make15
-rw-r--r--library/cpp/blockcodecs/codecs/lzma/lzma.cpp74
-rw-r--r--library/cpp/blockcodecs/codecs/lzma/ya.make14
-rw-r--r--library/cpp/blockcodecs/codecs/snappy/snappy.cpp52
-rw-r--r--library/cpp/blockcodecs/codecs/snappy/ya.make14
-rw-r--r--library/cpp/blockcodecs/codecs/zlib/ya.make14
-rw-r--r--library/cpp/blockcodecs/codecs/zlib/zlib.cpp64
-rw-r--r--library/cpp/blockcodecs/codecs/zstd/ya.make14
-rw-r--r--library/cpp/blockcodecs/codecs/zstd/zstd.cpp59
-rw-r--r--library/cpp/blockcodecs/codecs_ut.cpp340
-rw-r--r--library/cpp/blockcodecs/core/codecs.cpp148
-rw-r--r--library/cpp/blockcodecs/core/codecs.h90
-rw-r--r--library/cpp/blockcodecs/core/common.h105
-rw-r--r--library/cpp/blockcodecs/core/register.h10
-rw-r--r--library/cpp/blockcodecs/core/stream.cpp212
-rw-r--r--library/cpp/blockcodecs/core/stream.h46
-rw-r--r--library/cpp/blockcodecs/core/ya.make10
-rw-r--r--library/cpp/blockcodecs/fuzz/main.cpp84
-rw-r--r--library/cpp/blockcodecs/fuzz/proto/case.proto10
-rw-r--r--library/cpp/blockcodecs/fuzz/proto/ya.make14
-rw-r--r--library/cpp/blockcodecs/fuzz/ya.make23
-rw-r--r--library/cpp/blockcodecs/stream.cpp1
-rw-r--r--library/cpp/blockcodecs/stream.h3
-rw-r--r--library/cpp/blockcodecs/ut/ya.make19
-rw-r--r--library/cpp/blockcodecs/ya.make27
37 files changed, 1910 insertions, 0 deletions
diff --git a/library/cpp/blockcodecs/README.md b/library/cpp/blockcodecs/README.md
new file mode 100644
index 00000000000..417917a475b
--- /dev/null
+++ b/library/cpp/blockcodecs/README.md
@@ -0,0 +1,23 @@
+This is a simple library for block data compression (this means data is compressed/uncompressed
+by whole blocks in memory). It's a lite-version of the `library/cpp/codecs`. Lite here means that it
+provide only well-known compression algorithms, without the possibility of learning.
+
+There are two possible ways to work with it.
+
+Codec by name
+=============
+Use `NBlockCodec::Codec` to obtain the codec by name. The codec can be asked to compress
+or decompress something and in various ways.
+
+To get a full list of codecs there is a function `NBlockCodecs::ListAllCodecs()`.
+
+Streaming
+=========
+Use `stream.h` to obtain simple streams over block codecs (buffer data, compress them by blocks,
+write to the resulting stream).
+
+Using codec plugins
+===================
+If you don't want your code to bloat from unused codecs, you can use the small version of the
+library: `library/cpp/blockcodecs/core`. In that case, you need to manually set `PEERDIR()`s to
+needed codecs (i.e. `PEERDIR(library/cpp/blockcodecs/codecs/lzma)`).
diff --git a/library/cpp/blockcodecs/codecs.cpp b/library/cpp/blockcodecs/codecs.cpp
new file mode 100644
index 00000000000..fdec4809d37
--- /dev/null
+++ b/library/cpp/blockcodecs/codecs.cpp
@@ -0,0 +1 @@
+#include "codecs.h"
diff --git a/library/cpp/blockcodecs/codecs.h b/library/cpp/blockcodecs/codecs.h
new file mode 100644
index 00000000000..fd499b54b0d
--- /dev/null
+++ b/library/cpp/blockcodecs/codecs.h
@@ -0,0 +1,3 @@
+#pragma once
+
+#include <library/cpp/blockcodecs/core/codecs.h>
diff --git a/library/cpp/blockcodecs/codecs/brotli/brotli.cpp b/library/cpp/blockcodecs/codecs/brotli/brotli.cpp
new file mode 100644
index 00000000000..6e3cd971bdc
--- /dev/null
+++ b/library/cpp/blockcodecs/codecs/brotli/brotli.cpp
@@ -0,0 +1,67 @@
+#include <library/cpp/blockcodecs/core/codecs.h>
+#include <library/cpp/blockcodecs/core/common.h>
+#include <library/cpp/blockcodecs/core/register.h>
+
+#include <contrib/libs/brotli/include/brotli/encode.h>
+#include <contrib/libs/brotli/include/brotli/decode.h>
+
+using namespace NBlockCodecs;
+
+namespace {
+ struct TBrotliCodec : public TAddLengthCodec<TBrotliCodec> {
+ static constexpr int BEST_QUALITY = 11;
+
+ inline TBrotliCodec(ui32 level)
+ : Quality(level)
+ , MyName(TStringBuf("brotli_") + ToString(level))
+ {
+ }
+
+ static inline size_t DoMaxCompressedLength(size_t l) noexcept {
+ return BrotliEncoderMaxCompressedSize(l);
+ }
+
+ inline size_t DoCompress(const TData& in, void* out) const {
+ size_t resultSize = MaxCompressedLength(in);
+ auto result = BrotliEncoderCompress(
+ /*quality*/ Quality,
+ /*window*/ BROTLI_DEFAULT_WINDOW,
+ /*mode*/ BrotliEncoderMode::BROTLI_MODE_GENERIC,
+ /*input_size*/ in.size(),
+ /*input_buffer*/ (const unsigned char*)(in.data()),
+ /*encoded_size*/ &resultSize,
+ /*encoded_buffer*/ static_cast<unsigned char*>(out));
+ if (result != BROTLI_TRUE) {
+ ythrow yexception() << "internal brotli error during compression";
+ }
+
+ return resultSize;
+ }
+
+ inline void DoDecompress(const TData& in, void* out, size_t dsize) const {
+ size_t decoded = dsize;
+ auto result = BrotliDecoderDecompress(in.size(), (const unsigned char*)in.data(), &decoded, static_cast<unsigned char*>(out));
+ if (result != BROTLI_DECODER_RESULT_SUCCESS) {
+ ythrow yexception() << "internal brotli error during decompression";
+ } else if (decoded != dsize) {
+ ythrow TDecompressError(dsize, decoded);
+ }
+ }
+
+ TStringBuf Name() const noexcept override {
+ return MyName;
+ }
+
+ const int Quality = BEST_QUALITY;
+ const TString MyName;
+ };
+
+ struct TBrotliRegistrar {
+ TBrotliRegistrar() {
+ for (int i = 1; i <= TBrotliCodec::BEST_QUALITY; ++i) {
+ RegisterCodec(MakeHolder<TBrotliCodec>(i));
+ }
+ }
+ };
+ const TBrotliRegistrar Registrar{};
+}
diff --git a/library/cpp/blockcodecs/codecs/brotli/ya.make b/library/cpp/blockcodecs/codecs/brotli/ya.make
new file mode 100644
index 00000000000..17aff0bb722
--- /dev/null
+++ b/library/cpp/blockcodecs/codecs/brotli/ya.make
@@ -0,0 +1,15 @@
+LIBRARY()
+
+OWNER(pg)
+
+PEERDIR(
+ contrib/libs/brotli/enc
+ contrib/libs/brotli/dec
+ library/cpp/blockcodecs/core
+)
+
+SRCS(
+ GLOBAL brotli.cpp
+)
+
+END()
diff --git a/library/cpp/blockcodecs/codecs/bzip/bzip.cpp b/library/cpp/blockcodecs/codecs/bzip/bzip.cpp
new file mode 100644
index 00000000000..3a5cfdd0e9a
--- /dev/null
+++ b/library/cpp/blockcodecs/codecs/bzip/bzip.cpp
@@ -0,0 +1,62 @@
+#include <library/cpp/blockcodecs/core/codecs.h>
+#include <library/cpp/blockcodecs/core/common.h>
+#include <library/cpp/blockcodecs/core/register.h>
+
+#include <contrib/libs/libbz2/bzlib.h>
+
+using namespace NBlockCodecs;
+
+namespace {
+ struct TBZipCodec: public TAddLengthCodec<TBZipCodec> {
+ inline TBZipCodec(int level)
+ : Level(level)
+ , MyName("bzip2-" + ToString(Level))
+ {
+ }
+
+ static inline size_t DoMaxCompressedLength(size_t in) noexcept {
+ // very strange
+ return in * 2 + 128;
+ }
+
+ TStringBuf Name() const noexcept override {
+ return MyName;
+ }
+
+ inline size_t DoCompress(const TData& in, void* buf) const {
+ unsigned int ret = DoMaxCompressedLength(in.size());
+ const int res = BZ2_bzBuffToBuffCompress((char*)buf, &ret, (char*)in.data(), in.size(), Level, 0, 0);
+ if (res != BZ_OK) {
+ ythrow TCompressError(res);
+ }
+
+ return ret;
+ }
+
+ inline void DoDecompress(const TData& in, void* out, size_t len) const {
+ unsigned int tmp = SafeIntegerCast<unsigned int>(len);
+ const int res = BZ2_bzBuffToBuffDecompress((char*)out, &tmp, (char*)in.data(), in.size(), 0, 0);
+
+ if (res != BZ_OK) {
+ ythrow TDecompressError(res);
+ }
+
+ if (len != tmp) {
+ ythrow TDecompressError(len, tmp);
+ }
+ }
+
+ const int Level;
+ const TString MyName;
+ };
+
+ struct TBZipRegistrar {
+ TBZipRegistrar() {
+ for (int i = 1; i < 10; ++i) {
+ RegisterCodec(MakeHolder<TBZipCodec>(i));
+ }
+ RegisterAlias("bzip2", "bzip2-6");
+ }
+ };
+ const TBZipRegistrar Registrar{};
+}
diff --git a/library/cpp/blockcodecs/codecs/bzip/ya.make b/library/cpp/blockcodecs/codecs/bzip/ya.make
new file mode 100644
index 00000000000..f0a8aefd62d
--- /dev/null
+++ b/library/cpp/blockcodecs/codecs/bzip/ya.make
@@ -0,0 +1,14 @@
+LIBRARY()
+
+OWNER(pg)
+
+PEERDIR(
+ contrib/libs/libbz2
+ library/cpp/blockcodecs/core
+)
+
+SRCS(
+ GLOBAL bzip.cpp
+)
+
+END()
diff --git a/library/cpp/blockcodecs/codecs/fastlz/fastlz.cpp b/library/cpp/blockcodecs/codecs/fastlz/fastlz.cpp
new file mode 100644
index 00000000000..da2831fbd24
--- /dev/null
+++ b/library/cpp/blockcodecs/codecs/fastlz/fastlz.cpp
@@ -0,0 +1,54 @@
+#include <library/cpp/blockcodecs/core/codecs.h>
+#include <library/cpp/blockcodecs/core/common.h>
+#include <library/cpp/blockcodecs/core/register.h>
+
+#include <contrib/libs/fastlz/fastlz.h>
+
+using namespace NBlockCodecs;
+
+namespace {
+ struct TFastLZCodec: public TAddLengthCodec<TFastLZCodec> {
+ inline TFastLZCodec(int level)
+ : MyName("fastlz-" + ToString(level))
+ , Level(level)
+ {
+ }
+
+ static inline size_t DoMaxCompressedLength(size_t in) noexcept {
+ return Max<size_t>(in + in / 20, 128);
+ }
+
+ TStringBuf Name() const noexcept override {
+ return MyName;
+ }
+
+ inline size_t DoCompress(const TData& in, void* buf) const {
+ if (Level) {
+ return fastlz_compress_level(Level, in.data(), in.size(), buf);
+ }
+
+ return fastlz_compress(in.data(), in.size(), buf);
+ }
+
+ inline void DoDecompress(const TData& in, void* out, size_t len) const {
+ const int ret = fastlz_decompress(in.data(), in.size(), out, len);
+
+ if (ret < 0 || (size_t)ret != len) {
+ ythrow TDataError() << TStringBuf("can not decompress");
+ }
+ }
+
+ const TString MyName;
+ const int Level;
+ };
+
+ struct TFastLZRegistrar {
+ TFastLZRegistrar() {
+ for (int i = 0; i < 3; ++i) {
+ RegisterCodec(MakeHolder<TFastLZCodec>(i));
+ }
+ RegisterAlias("fastlz", "fastlz-0");
+ }
+ };
+ const TFastLZRegistrar Registrar{};
+}
diff --git a/library/cpp/blockcodecs/codecs/fastlz/ya.make b/library/cpp/blockcodecs/codecs/fastlz/ya.make
new file mode 100644
index 00000000000..59c09b329b3
--- /dev/null
+++ b/library/cpp/blockcodecs/codecs/fastlz/ya.make
@@ -0,0 +1,14 @@
+LIBRARY()
+
+OWNER(pg)
+
+PEERDIR(
+ contrib/libs/fastlz
+ library/cpp/blockcodecs/core
+)
+
+SRCS(
+ GLOBAL fastlz.cpp
+)
+
+END()
diff --git a/library/cpp/blockcodecs/codecs/legacy_zstd06/legacy_zstd06.cpp b/library/cpp/blockcodecs/codecs/legacy_zstd06/legacy_zstd06.cpp
new file mode 100644
index 00000000000..042f031679c
--- /dev/null
+++ b/library/cpp/blockcodecs/codecs/legacy_zstd06/legacy_zstd06.cpp
@@ -0,0 +1,58 @@
+#include <library/cpp/blockcodecs/core/codecs.h>
+#include <library/cpp/blockcodecs/core/common.h>
+#include <library/cpp/blockcodecs/core/register.h>
+
+#include <contrib/libs/zstd06/common/zstd.h>
+#include <contrib/libs/zstd06/common/zstd_static.h>
+
+using namespace NBlockCodecs;
+
+namespace {
+ struct TZStd06Codec: public TAddLengthCodec<TZStd06Codec> {
+ inline TZStd06Codec(unsigned level)
+ : Level(level)
+ , MyName(TStringBuf("zstd06_") + ToString(Level))
+ {
+ }
+
+ static inline size_t CheckError(size_t ret, const char* what) {
+ if (ZSTD_isError(ret)) {
+ ythrow yexception() << what << TStringBuf(" zstd error: ") << ZSTD_getErrorName(ret);
+ }
+
+ return ret;
+ }
+
+ static inline size_t DoMaxCompressedLength(size_t l) noexcept {
+ return ZSTD_compressBound(l);
+ }
+
+ inline size_t DoCompress(const TData& in, void* out) const {
+ return CheckError(ZSTD_compress(out, DoMaxCompressedLength(in.size()), in.data(), in.size(), Level), "compress");
+ }
+
+ inline void DoDecompress(const TData& in, void* out, size_t dsize) const {
+ const size_t res = CheckError(ZSTD_decompress(out, dsize, in.data(), in.size()), "decompress");
+
+ if (res != dsize) {
+ ythrow TDecompressError(dsize, res);
+ }
+ }
+
+ TStringBuf Name() const noexcept override {
+ return MyName;
+ }
+
+ const unsigned Level;
+ const TString MyName;
+ };
+
+ struct TZStd06Registrar {
+ TZStd06Registrar() {
+ for (unsigned i = 1; i <= ZSTD_maxCLevel(); ++i) {
+ RegisterCodec(MakeHolder<TZStd06Codec>(i));
+ }
+ }
+ };
+ const TZStd06Registrar Registrar{};
+}
diff --git a/library/cpp/blockcodecs/codecs/legacy_zstd06/ya.make b/library/cpp/blockcodecs/codecs/legacy_zstd06/ya.make
new file mode 100644
index 00000000000..067f7312330
--- /dev/null
+++ b/library/cpp/blockcodecs/codecs/legacy_zstd06/ya.make
@@ -0,0 +1,14 @@
+LIBRARY()
+
+OWNER(pg)
+
+PEERDIR(
+ contrib/libs/zstd06
+ library/cpp/blockcodecs/core
+)
+
+SRCS(
+ GLOBAL legacy_zstd06.cpp
+)
+
+END()
diff --git a/library/cpp/blockcodecs/codecs/lz4/lz4.cpp b/library/cpp/blockcodecs/codecs/lz4/lz4.cpp
new file mode 100644
index 00000000000..fbf0fe110f1
--- /dev/null
+++ b/library/cpp/blockcodecs/codecs/lz4/lz4.cpp
@@ -0,0 +1,123 @@
+#include <library/cpp/blockcodecs/core/codecs.h>
+#include <library/cpp/blockcodecs/core/common.h>
+#include <library/cpp/blockcodecs/core/register.h>
+
+#include <contrib/libs/lz4/lz4.h>
+#include <contrib/libs/lz4/lz4hc.h>
+#include <contrib/libs/lz4/generated/iface.h>
+
+using namespace NBlockCodecs;
+
+namespace {
+ struct TLz4Base {
+ static inline size_t DoMaxCompressedLength(size_t in) {
+ return LZ4_compressBound(SafeIntegerCast<int>(in));
+ }
+ };
+
+ struct TLz4FastCompress {
+ inline TLz4FastCompress(int memory)
+ : Memory(memory)
+ , Methods(LZ4Methods(Memory))
+ {
+ }
+
+ inline size_t DoCompress(const TData& in, void* buf) const {
+ return Methods->LZ4CompressLimited(in.data(), (char*)buf, in.size(), LZ4_compressBound(in.size()));
+ }
+
+ inline TString CPrefix() {
+ return "fast" + ToString(Memory);
+ }
+
+ const int Memory;
+ const TLZ4Methods* Methods;
+ };
+
+ struct TLz4BestCompress {
+ inline size_t DoCompress(const TData& in, void* buf) const {
+ return LZ4_compress_HC(in.data(), (char*)buf, in.size(), LZ4_compressBound(in.size()), 0);
+ }
+
+ static inline TString CPrefix() {
+ return "hc";
+ }
+ };
+
+ struct TLz4FastDecompress {
+ inline void DoDecompress(const TData& in, void* out, size_t len) const {
+ ssize_t res = LZ4_decompress_fast(in.data(), (char*)out, len);
+ if (res < 0) {
+ ythrow TDecompressError(res);
+ }
+ }
+
+ static inline TStringBuf DPrefix() {
+ return TStringBuf("fast");
+ }
+ };
+
+ struct TLz4SafeDecompress {
+ inline void DoDecompress(const TData& in, void* out, size_t len) const {
+ ssize_t res = LZ4_decompress_safe(in.data(), (char*)out, in.size(), len);
+ if (res < 0) {
+ ythrow TDecompressError(res);
+ }
+ }
+
+ static inline TStringBuf DPrefix() {
+ return TStringBuf("safe");
+ }
+ };
+
+ template <class TC, class TD>
+ struct TLz4Codec: public TAddLengthCodec<TLz4Codec<TC, TD>>, public TLz4Base, public TC, public TD {
+ inline TLz4Codec()
+ : MyName("lz4-" + TC::CPrefix() + "-" + TD::DPrefix())
+ {
+ }
+
+ template <class T>
+ inline TLz4Codec(const T& t)
+ : TC(t)
+ , MyName("lz4-" + TC::CPrefix() + "-" + TD::DPrefix())
+ {
+ }
+
+ TStringBuf Name() const noexcept override {
+ return MyName;
+ }
+
+ const TString MyName;
+ };
+
+ struct TLz4Registrar {
+ TLz4Registrar() {
+ for (int i = 0; i < 30; ++i) {
+ typedef TLz4Codec<TLz4FastCompress, TLz4FastDecompress> T1;
+ typedef TLz4Codec<TLz4FastCompress, TLz4SafeDecompress> T2;
+
+ THolder<T1> t1(new T1(i));
+ THolder<T2> t2(new T2(i));
+
+ if (t1->Methods) {
+ RegisterCodec(std::move(t1));
+ }
+
+ if (t2->Methods) {
+ RegisterCodec(std::move(t2));
+ }
+ }
+
+ RegisterCodec(MakeHolder<TLz4Codec<TLz4BestCompress, TLz4FastDecompress>>());
+ RegisterCodec(MakeHolder<TLz4Codec<TLz4BestCompress, TLz4SafeDecompress>>());
+
+ RegisterAlias("lz4-fast-safe", "lz4-fast14-safe");
+ RegisterAlias("lz4-fast-fast", "lz4-fast14-fast");
+ RegisterAlias("lz4", "lz4-fast-safe");
+ RegisterAlias("lz4fast", "lz4-fast-fast");
+ RegisterAlias("lz4hc", "lz4-hc-safe");
+ }
+ };
+ const TLz4Registrar Registrar{};
+}
diff --git a/library/cpp/blockcodecs/codecs/lz4/ya.make b/library/cpp/blockcodecs/codecs/lz4/ya.make
new file mode 100644
index 00000000000..f2471d7d96f
--- /dev/null
+++ b/library/cpp/blockcodecs/codecs/lz4/ya.make
@@ -0,0 +1,15 @@
+LIBRARY()
+
+OWNER(pg)
+
+PEERDIR(
+ contrib/libs/lz4
+ contrib/libs/lz4/generated
+ library/cpp/blockcodecs/core
+)
+
+SRCS(
+ GLOBAL lz4.cpp
+)
+
+END()
diff --git a/library/cpp/blockcodecs/codecs/lzma/lzma.cpp b/library/cpp/blockcodecs/codecs/lzma/lzma.cpp
new file mode 100644
index 00000000000..6c8d5fded42
--- /dev/null
+++ b/library/cpp/blockcodecs/codecs/lzma/lzma.cpp
@@ -0,0 +1,74 @@
+#include <library/cpp/blockcodecs/core/codecs.h>
+#include <library/cpp/blockcodecs/core/common.h>
+#include <library/cpp/blockcodecs/core/register.h>
+
+#include <contrib/libs/lzmasdk/LzmaLib.h>
+
+using namespace NBlockCodecs;
+
+namespace {
+ struct TLzmaCodec: public TAddLengthCodec<TLzmaCodec> {
+ inline TLzmaCodec(int level)
+ : Level(level)
+ , MyName("lzma-" + ToString(Level))
+ {
+ }
+
+ static inline size_t DoMaxCompressedLength(size_t in) noexcept {
+ return Max<size_t>(in + in / 20, 128) + LZMA_PROPS_SIZE;
+ }
+
+ TStringBuf Name() const noexcept override {
+ return MyName;
+ }
+
+ inline size_t DoCompress(const TData& in, void* buf) const {
+ unsigned char* props = (unsigned char*)buf;
+ unsigned char* data = props + LZMA_PROPS_SIZE;
+ size_t destLen = Max<size_t>();
+ size_t outPropsSize = LZMA_PROPS_SIZE;
+
+ const int ret = LzmaCompress(data, &destLen, (const unsigned char*)in.data(), in.size(), props, &outPropsSize, Level, 0, -1, -1, -1, -1, -1);
+
+ if (ret != SZ_OK) {
+ ythrow TCompressError(ret);
+ }
+
+ return destLen + LZMA_PROPS_SIZE;
+ }
+
+ inline void DoDecompress(const TData& in, void* out, size_t len) const {
+ if (in.size() <= LZMA_PROPS_SIZE) {
+ ythrow TDataError() << TStringBuf("broken lzma stream");
+ }
+
+ const unsigned char* props = (const unsigned char*)in.data();
+ const unsigned char* data = props + LZMA_PROPS_SIZE;
+ size_t destLen = len;
+ SizeT srcLen = in.size() - LZMA_PROPS_SIZE;
+
+ const int res = LzmaUncompress((unsigned char*)out, &destLen, data, &srcLen, props, LZMA_PROPS_SIZE);
+
+ if (res != SZ_OK) {
+ ythrow TDecompressError(res);
+ }
+
+ if (destLen != len) {
+ ythrow TDecompressError(len, destLen);
+ }
+ }
+
+ const int Level;
+ const TString MyName;
+ };
+
+ struct TLzmaRegistrar {
+ TLzmaRegistrar() {
+ for (int i = 0; i < 10; ++i) {
+ RegisterCodec(MakeHolder<TLzmaCodec>(i));
+ }
+ RegisterAlias("lzma", "lzma-5");
+ }
+ };
+ const TLzmaRegistrar Registrar{};
+}
diff --git a/library/cpp/blockcodecs/codecs/lzma/ya.make b/library/cpp/blockcodecs/codecs/lzma/ya.make
new file mode 100644
index 00000000000..e145834da68
--- /dev/null
+++ b/library/cpp/blockcodecs/codecs/lzma/ya.make
@@ -0,0 +1,14 @@
+LIBRARY()
+
+OWNER(pg)
+
+PEERDIR(
+ contrib/libs/lzmasdk
+ library/cpp/blockcodecs/core
+)
+
+SRCS(
+ GLOBAL lzma.cpp
+)
+
+END()
diff --git a/library/cpp/blockcodecs/codecs/snappy/snappy.cpp b/library/cpp/blockcodecs/codecs/snappy/snappy.cpp
new file mode 100644
index 00000000000..f6be05a05fc
--- /dev/null
+++ b/library/cpp/blockcodecs/codecs/snappy/snappy.cpp
@@ -0,0 +1,52 @@
+#include <library/cpp/blockcodecs/core/codecs.h>
+#include <library/cpp/blockcodecs/core/common.h>
+#include <library/cpp/blockcodecs/core/register.h>
+
+#include <contrib/libs/snappy/snappy.h>
+
+using namespace NBlockCodecs;
+
+namespace {
+ struct TSnappyCodec: public ICodec {
+ size_t DecompressedLength(const TData& in) const override {
+ size_t ret;
+
+ if (snappy::GetUncompressedLength(in.data(), in.size(), &ret)) {
+ return ret;
+ }
+
+ ythrow TDecompressError(0);
+ }
+
+ size_t MaxCompressedLength(const TData& in) const override {
+ return snappy::MaxCompressedLength(in.size());
+ }
+
+ size_t Compress(const TData& in, void* out) const override {
+ size_t ret;
+
+ snappy::RawCompress(in.data(), in.size(), (char*)out, &ret);
+
+ return ret;
+ }
+
+ size_t Decompress(const TData& in, void* out) const override {
+ if (snappy::RawUncompress(in.data(), in.size(), (char*)out)) {
+ return DecompressedLength(in);
+ }
+
+ ythrow TDecompressError(0);
+ }
+
+ TStringBuf Name() const noexcept override {
+ return "snappy";
+ }
+ };
+
+ struct TSnappyRegistrar {
+ TSnappyRegistrar() {
+ RegisterCodec(MakeHolder<TSnappyCodec>());
+ }
+ };
+ const TSnappyRegistrar Registrar{};
+}
diff --git a/library/cpp/blockcodecs/codecs/snappy/ya.make b/library/cpp/blockcodecs/codecs/snappy/ya.make
new file mode 100644
index 00000000000..0cf2be2f944
--- /dev/null
+++ b/library/cpp/blockcodecs/codecs/snappy/ya.make
@@ -0,0 +1,14 @@
+LIBRARY()
+
+OWNER(pg)
+
+PEERDIR(
+ contrib/libs/snappy
+ library/cpp/blockcodecs/core
+)
+
+SRCS(
+ GLOBAL snappy.cpp
+)
+
+END()
diff --git a/library/cpp/blockcodecs/codecs/zlib/ya.make b/library/cpp/blockcodecs/codecs/zlib/ya.make
new file mode 100644
index 00000000000..9f04995f667
--- /dev/null
+++ b/library/cpp/blockcodecs/codecs/zlib/ya.make
@@ -0,0 +1,14 @@
+LIBRARY()
+
+OWNER(pg)
+
+PEERDIR(
+ contrib/libs/zlib
+ library/cpp/blockcodecs/core
+)
+
+SRCS(
+ GLOBAL zlib.cpp
+)
+
+END()
diff --git a/library/cpp/blockcodecs/codecs/zlib/zlib.cpp b/library/cpp/blockcodecs/codecs/zlib/zlib.cpp
new file mode 100644
index 00000000000..cdb556c36d4
--- /dev/null
+++ b/library/cpp/blockcodecs/codecs/zlib/zlib.cpp
@@ -0,0 +1,64 @@
+#include <library/cpp/blockcodecs/core/codecs.h>
+#include <library/cpp/blockcodecs/core/common.h>
+#include <library/cpp/blockcodecs/core/register.h>
+
+#include <contrib/libs/zlib/zlib.h>
+
+using namespace NBlockCodecs;
+
+namespace {
+ struct TZLibCodec: public TAddLengthCodec<TZLibCodec> {
+ inline TZLibCodec(int level)
+ : MyName("zlib-" + ToString(level))
+ , Level(level)
+ {
+ }
+
+ static inline size_t DoMaxCompressedLength(size_t in) noexcept {
+ return compressBound(in);
+ }
+
+ TStringBuf Name() const noexcept override {
+ return MyName;
+ }
+
+ inline size_t DoCompress(const TData& in, void* buf) const {
+ //TRASH detected
+ uLong ret = Max<unsigned int>();
+
+ int cres = compress2((Bytef*)buf, &ret, (const Bytef*)in.data(), in.size(), Level);
+
+ if (cres != Z_OK) {
+ ythrow TCompressError(cres);
+ }
+
+ return ret;
+ }
+
+ inline void DoDecompress(const TData& in, void* out, size_t len) const {
+ uLong ret = len;
+
+ int uncres = uncompress((Bytef*)out, &ret, (const Bytef*)in.data(), in.size());
+ if (uncres != Z_OK) {
+ ythrow TDecompressError(uncres);
+ }
+
+ if (ret != len) {
+ ythrow TDecompressError(len, ret);
+ }
+ }
+
+ const TString MyName;
+ const int Level;
+ };
+
+ struct TZLibRegistrar {
+ TZLibRegistrar() {
+ for (int i = 0; i < 10; ++i) {
+ RegisterCodec(MakeHolder<TZLibCodec>(i));
+ }
+ RegisterAlias("zlib", "zlib-6");
+ }
+ };
+ const TZLibRegistrar Registrar{};
+}
diff --git a/library/cpp/blockcodecs/codecs/zstd/ya.make b/library/cpp/blockcodecs/codecs/zstd/ya.make
new file mode 100644
index 00000000000..c077dd47b74
--- /dev/null
+++ b/library/cpp/blockcodecs/codecs/zstd/ya.make
@@ -0,0 +1,14 @@
+LIBRARY()
+
+OWNER(pg)
+
+PEERDIR(
+ contrib/libs/zstd
+ library/cpp/blockcodecs/core
+)
+
+SRCS(
+ GLOBAL zstd.cpp
+)
+
+END()
diff --git a/library/cpp/blockcodecs/codecs/zstd/zstd.cpp b/library/cpp/blockcodecs/codecs/zstd/zstd.cpp
new file mode 100644
index 00000000000..95299b3f6d3
--- /dev/null
+++ b/library/cpp/blockcodecs/codecs/zstd/zstd.cpp
@@ -0,0 +1,59 @@
+#include <library/cpp/blockcodecs/core/codecs.h>
+#include <library/cpp/blockcodecs/core/common.h>
+#include <library/cpp/blockcodecs/core/register.h>
+
+#define ZSTD_STATIC_LINKING_ONLY
+#include <contrib/libs/zstd/include/zstd.h>
+
+using namespace NBlockCodecs;
+
+namespace {
+ struct TZStd08Codec: public TAddLengthCodec<TZStd08Codec> {
+ inline TZStd08Codec(unsigned level)
+ : Level(level)
+ , MyName(TStringBuf("zstd08_") + ToString(Level))
+ {
+ }
+
+ static inline size_t CheckError(size_t ret, const char* what) {
+ if (ZSTD_isError(ret)) {
+ ythrow yexception() << what << TStringBuf(" zstd error: ") << ZSTD_getErrorName(ret);
+ }
+
+ return ret;
+ }
+
+ static inline size_t DoMaxCompressedLength(size_t l) noexcept {
+ return ZSTD_compressBound(l);
+ }
+
+ inline size_t DoCompress(const TData& in, void* out) const {
+ return CheckError(ZSTD_compress(out, DoMaxCompressedLength(in.size()), in.data(), in.size(), Level), "compress");
+ }
+
+ inline void DoDecompress(const TData& in, void* out, size_t dsize) const {
+ const size_t res = CheckError(ZSTD_decompress(out, dsize, in.data(), in.size()), "decompress");
+
+ if (res != dsize) {
+ ythrow TDecompressError(dsize, res);
+ }
+ }
+
+ TStringBuf Name() const noexcept override {
+ return MyName;
+ }
+
+ const unsigned Level;
+ const TString MyName;
+ };
+
+ struct TZStd08Registrar {
+ TZStd08Registrar() {
+ for (int i = 1; i <= ZSTD_maxCLevel(); ++i) {
+ RegisterCodec(MakeHolder<TZStd08Codec>(i));
+ RegisterAlias("zstd_" + ToString(i), "zstd08_" + ToString(i));
+ }
+ }
+ };
+ const TZStd08Registrar Registrar{};
+}
diff --git a/library/cpp/blockcodecs/codecs_ut.cpp b/library/cpp/blockcodecs/codecs_ut.cpp
new file mode 100644
index 00000000000..bfe5a236909
--- /dev/null
+++ b/library/cpp/blockcodecs/codecs_ut.cpp
@@ -0,0 +1,340 @@
+#include "codecs.h"
+#include "stream.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/stream/str.h>
+#include <util/string/join.h>
+#include <util/digest/multi.h>
+
+Y_UNIT_TEST_SUITE(TBlockCodecsTest) {
+ using namespace NBlockCodecs;
+
+ TBuffer Buffer(TStringBuf b) {
+ TBuffer bb;
+ bb.Assign(b.data(), b.size());
+ return bb;
+ }
+
+ void TestAllAtOnce(size_t n, size_t m) {
+ TVector<TBuffer> datas;
+
+ datas.emplace_back();
+ datas.push_back(Buffer("na gorshke sidel korol"));
+ datas.push_back(Buffer(TStringBuf("", 1)));
+ datas.push_back(Buffer(" "));
+ datas.push_back(Buffer(" "));
+ datas.push_back(Buffer(" "));
+ datas.push_back(Buffer(" "));
+
+ {
+ TStringStream data;
+
+ for (size_t i = 0; i < 1024; ++i) {
+ data << " " << i;
+ }
+
+ datas.push_back(Buffer(data.Str()));
+ }
+
+ TCodecList lst = ListAllCodecs();
+
+ for (size_t i = 0; i < lst.size(); ++i) {
+ const ICodec* c = Codec(lst[i]);
+ const auto h = MultiHash(c->Name(), i, 1);
+
+ if (h % n == m) {
+ } else {
+ continue;
+ }
+
+ for (size_t j = 0; j < datas.size(); ++j) {
+ const TBuffer& data = datas[j];
+ TString res;
+
+ try {
+ TBuffer e, d;
+ c->Encode(data, e);
+ c->Decode(e, d);
+ d.AsString(res);
+ UNIT_ASSERT_EQUAL(NBlockCodecs::TData(res), NBlockCodecs::TData(data));
+ } catch (...) {
+ Cerr << c->Name() << "(" << res.Quote() << ")(" << TString{NBlockCodecs::TData(data)}.Quote() << ")" << Endl;
+
+ throw;
+ }
+ }
+ }
+ }
+
+ Y_UNIT_TEST(TestAllAtOnce0) {
+ TestAllAtOnce(20, 0);
+ }
+
+ Y_UNIT_TEST(TestAllAtOnce1) {
+ TestAllAtOnce(20, 1);
+ }
+
+ Y_UNIT_TEST(TestAllAtOnce2) {
+ TestAllAtOnce(20, 2);
+ }
+
+ Y_UNIT_TEST(TestAllAtOnce3) {
+ TestAllAtOnce(20, 3);
+ }
+
+ Y_UNIT_TEST(TestAllAtOnce4) {
+ TestAllAtOnce(20, 4);
+ }
+
+ Y_UNIT_TEST(TestAllAtOnce5) {
+ TestAllAtOnce(20, 5);
+ }
+
+ Y_UNIT_TEST(TestAllAtOnce6) {
+ TestAllAtOnce(20, 6);
+ }
+
+ Y_UNIT_TEST(TestAllAtOnce7) {
+ TestAllAtOnce(20, 7);
+ }
+
+ Y_UNIT_TEST(TestAllAtOnce8) {
+ TestAllAtOnce(20, 8);
+ }
+
+ Y_UNIT_TEST(TestAllAtOnce9) {
+ TestAllAtOnce(20, 9);
+ }
+
+ Y_UNIT_TEST(TestAllAtOnce10) {
+ TestAllAtOnce(20, 10);
+ }
+
+ Y_UNIT_TEST(TestAllAtOnce12) {
+ TestAllAtOnce(20, 12);
+ }
+
+ Y_UNIT_TEST(TestAllAtOnce13) {
+ TestAllAtOnce(20, 13);
+ }
+
+ Y_UNIT_TEST(TestAllAtOnce14) {
+ TestAllAtOnce(20, 14);
+ }
+
+ Y_UNIT_TEST(TestAllAtOnce15) {
+ TestAllAtOnce(20, 15);
+ }
+
+ Y_UNIT_TEST(TestAllAtOnce16) {
+ TestAllAtOnce(20, 16);
+ }
+
+ Y_UNIT_TEST(TestAllAtOnce17) {
+ TestAllAtOnce(20, 17);
+ }
+
+ Y_UNIT_TEST(TestAllAtOnce18) {
+ TestAllAtOnce(20, 18);
+ }
+
+ Y_UNIT_TEST(TestAllAtOnce19) {
+ TestAllAtOnce(20, 19);
+ }
+
+ void TestStreams(size_t n, size_t m) {
+ TVector<TString> datas;
+ TString res;
+
+ for (size_t i = 0; i < 256; ++i) {
+ datas.push_back(TString(i, (char)(i % 128)));
+ }
+
+ for (size_t i = 0; i < datas.size(); ++i) {
+ res += datas[i];
+ }
+
+ TCodecList lst = ListAllCodecs();
+
+ for (size_t i = 0; i < lst.size(); ++i) {
+ TStringStream ss;
+
+ const ICodec* c = Codec(lst[i]);
+ const auto h = MultiHash(c->Name(), i, 2);
+
+ if (h % n == m) {
+ } else {
+ continue;
+ }
+
+ {
+ TCodedOutput out(&ss, c, 1234);
+
+ for (size_t j = 0; j < datas.size(); ++j) {
+ out << datas[j];
+ }
+
+ out.Finish();
+ }
+
+ const TString resNew = TDecodedInput(&ss).ReadAll();
+
+ try {
+ UNIT_ASSERT_EQUAL(resNew, res);
+ } catch (...) {
+ Cerr << c->Name() << Endl;
+
+ throw;
+ }
+ }
+ }
+
+ Y_UNIT_TEST(TestStreams0) {
+ TestStreams(20, 0);
+ }
+
+ Y_UNIT_TEST(TestStreams1) {
+ TestStreams(20, 1);
+ }
+
+ Y_UNIT_TEST(TestStreams2) {
+ TestStreams(20, 2);
+ }
+
+ Y_UNIT_TEST(TestStreams3) {
+ TestStreams(20, 3);
+ }
+
+ Y_UNIT_TEST(TestStreams4) {
+ TestStreams(20, 4);
+ }
+
+ Y_UNIT_TEST(TestStreams5) {
+ TestStreams(20, 5);
+ }
+
+ Y_UNIT_TEST(TestStreams6) {
+ TestStreams(20, 6);
+ }
+
+ Y_UNIT_TEST(TestStreams7) {
+ TestStreams(20, 7);
+ }
+
+ Y_UNIT_TEST(TestStreams8) {
+ TestStreams(20, 8);
+ }
+
+ Y_UNIT_TEST(TestStreams9) {
+ TestStreams(20, 9);
+ }
+
+ Y_UNIT_TEST(TestStreams10) {
+ TestStreams(20, 10);
+ }
+
+ Y_UNIT_TEST(TestStreams11) {
+ TestStreams(20, 11);
+ }
+
+ Y_UNIT_TEST(TestStreams12) {
+ TestStreams(20, 12);
+ }
+
+ Y_UNIT_TEST(TestStreams13) {
+ TestStreams(20, 13);
+ }
+
+ Y_UNIT_TEST(TestStreams14) {
+ TestStreams(20, 14);
+ }
+
+ Y_UNIT_TEST(TestStreams15) {
+ TestStreams(20, 15);
+ }
+
+ Y_UNIT_TEST(TestStreams16) {
+ TestStreams(20, 16);
+ }
+
+ Y_UNIT_TEST(TestStreams17) {
+ TestStreams(20, 17);
+ }
+
+ Y_UNIT_TEST(TestStreams18) {
+ TestStreams(20, 18);
+ }
+
+ Y_UNIT_TEST(TestStreams19) {
+ TestStreams(20, 19);
+ }
+
+ Y_UNIT_TEST(TestMaxPossibleDecompressedSize) {
+
+ UNIT_ASSERT_VALUES_EQUAL(GetMaxPossibleDecompressedLength(), Max<size_t>());
+
+ TVector<char> input(10001, ' ');
+ TCodecList codecs = ListAllCodecs();
+ SetMaxPossibleDecompressedLength(10000);
+
+ for (const auto& codec : codecs) {
+ const ICodec* c = Codec(codec);
+ TBuffer inputBuffer(input.data(), input.size());
+ TBuffer output;
+ TBuffer decompressed;
+ c->Encode(inputBuffer, output);
+ UNIT_ASSERT_EXCEPTION(c->Decode(output, decompressed), yexception);
+ }
+
+ // restore status quo
+ SetMaxPossibleDecompressedLength(Max<size_t>());
+ }
+
+ Y_UNIT_TEST(TestListAllCodecs) {
+ static const TString ALL_CODECS =
+ "brotli_1,brotli_10,brotli_11,brotli_2,brotli_3,brotli_4,brotli_5,brotli_6,brotli_7,brotli_8,brotli_9,"
+
+ "bzip2,bzip2-1,bzip2-2,bzip2-3,bzip2-4,bzip2-5,bzip2-6,bzip2-7,bzip2-8,bzip2-9,"
+
+ "fastlz,fastlz-0,fastlz-1,fastlz-2,"
+
+ "lz4,lz4-fast-fast,lz4-fast-safe,lz4-fast10-fast,lz4-fast10-safe,lz4-fast11-fast,lz4-fast11-safe,"
+ "lz4-fast12-fast,lz4-fast12-safe,lz4-fast13-fast,lz4-fast13-safe,lz4-fast14-fast,lz4-fast14-safe,"
+ "lz4-fast15-fast,lz4-fast15-safe,lz4-fast16-fast,lz4-fast16-safe,lz4-fast17-fast,lz4-fast17-safe,"
+ "lz4-fast18-fast,lz4-fast18-safe,lz4-fast19-fast,lz4-fast19-safe,lz4-fast20-fast,lz4-fast20-safe,"
+ "lz4-hc-fast,lz4-hc-safe,lz4fast,lz4hc,"
+
+ "lzma,lzma-0,lzma-1,lzma-2,lzma-3,lzma-4,lzma-5,lzma-6,lzma-7,lzma-8,lzma-9,"
+
+ "null,"
+
+ "snappy,"
+
+ "zlib,zlib-0,zlib-1,zlib-2,zlib-3,zlib-4,zlib-5,zlib-6,zlib-7,zlib-8,zlib-9,"
+
+ "zstd06_1,zstd06_10,zstd06_11,zstd06_12,zstd06_13,zstd06_14,zstd06_15,zstd06_16,zstd06_17,zstd06_18,"
+ "zstd06_19,zstd06_2,zstd06_20,zstd06_21,zstd06_22,zstd06_3,zstd06_4,zstd06_5,zstd06_6,zstd06_7,zstd06_8,"
+ "zstd06_9,"
+
+ "zstd08_1,zstd08_10,zstd08_11,zstd08_12,zstd08_13,zstd08_14,zstd08_15,zstd08_16,zstd08_17,zstd08_18,"
+ "zstd08_19,zstd08_2,zstd08_20,zstd08_21,zstd08_22,zstd08_3,zstd08_4,zstd08_5,zstd08_6,zstd08_7,zstd08_8,"
+ "zstd08_9,zstd_1,zstd_10,zstd_11,zstd_12,zstd_13,zstd_14,zstd_15,zstd_16,zstd_17,zstd_18,zstd_19,zstd_2,"
+ "zstd_20,zstd_21,zstd_22,zstd_3,zstd_4,zstd_5,zstd_6,zstd_7,zstd_8,zstd_9";
+
+ UNIT_ASSERT_VALUES_EQUAL(ALL_CODECS, JoinSeq(",", ListAllCodecs()));
+ }
+
+ Y_UNIT_TEST(TestEncodeDecodeIntoString) {
+ TStringBuf data = "na gorshke sidel korol";
+
+ TCodecList codecs = ListAllCodecs();
+ for (const auto& codec : codecs) {
+ const ICodec* c = Codec(codec);
+ TString encoded = c->Encode(data);
+ TString decoded = c->Decode(encoded);
+
+ UNIT_ASSERT_VALUES_EQUAL(decoded, data);
+ }
+ }
+}
diff --git a/library/cpp/blockcodecs/core/codecs.cpp b/library/cpp/blockcodecs/core/codecs.cpp
new file mode 100644
index 00000000000..21506e812b4
--- /dev/null
+++ b/library/cpp/blockcodecs/core/codecs.cpp
@@ -0,0 +1,148 @@
+#include "codecs.h"
+#include "common.h"
+#include "register.h"
+
+#include <util/ysaveload.h>
+#include <util/stream/null.h>
+#include <util/stream/mem.h>
+#include <util/string/cast.h>
+#include <util/string/join.h>
+#include <util/system/align.h>
+#include <util/system/unaligned_mem.h>
+#include <util/generic/hash.h>
+#include <util/generic/cast.h>
+#include <util/generic/deque.h>
+#include <util/generic/buffer.h>
+#include <util/generic/array_ref.h>
+#include <util/generic/singleton.h>
+#include <util/generic/algorithm.h>
+#include <util/generic/mem_copy.h>
+
+using namespace NBlockCodecs;
+
+namespace {
+
+ struct TCodecFactory {
+ inline TCodecFactory() {
+ Add(&Null);
+ }
+
+ inline const ICodec* Find(const TStringBuf& name) const {
+ auto it = Registry.find(name);
+
+ if (it == Registry.end()) {
+ ythrow TNotFound() << "can not found " << name << " codec";
+ }
+
+ return it->second;
+ }
+
+ inline void ListCodecs(TCodecList& lst) const {
+ for (const auto& it : Registry) {
+ lst.push_back(it.first);
+ }
+
+ Sort(lst.begin(), lst.end());
+ }
+
+ inline void Add(ICodec* codec) {
+ Registry[codec->Name()] = codec;
+ }
+
+ inline void Add(TCodecPtr codec) {
+ Codecs.push_back(std::move(codec));
+ Add(Codecs.back().Get());
+ }
+
+ inline void Alias(TStringBuf from, TStringBuf to) {
+ Tmp.emplace_back(from);
+ Registry[Tmp.back()] = Registry[to];
+ }
+
+ TDeque<TString> Tmp;
+ TNullCodec Null;
+ TVector<TCodecPtr> Codecs;
+ typedef THashMap<TStringBuf, ICodec*> TRegistry;
+ TRegistry Registry;
+
+ // SEARCH-8344: Global decompressed size limiter (to prevent remote DoS)
+ size_t MaxPossibleDecompressedLength = Max<size_t>();
+ };
+}
+
+const ICodec* NBlockCodecs::Codec(const TStringBuf& name) {
+ return Singleton<TCodecFactory>()->Find(name);
+}
+
+TCodecList NBlockCodecs::ListAllCodecs() {
+ TCodecList ret;
+
+ Singleton<TCodecFactory>()->ListCodecs(ret);
+
+ return ret;
+}
+
+TString NBlockCodecs::ListAllCodecsAsString() {
+ return JoinSeq(TStringBuf(","), ListAllCodecs());
+}
+
+void NBlockCodecs::RegisterCodec(TCodecPtr codec) {
+ Singleton<TCodecFactory>()->Add(std::move(codec));
+}
+
+void NBlockCodecs::RegisterAlias(TStringBuf from, TStringBuf to) {
+ Singleton<TCodecFactory>()->Alias(from, to);
+}
+
+void NBlockCodecs::SetMaxPossibleDecompressedLength(size_t maxPossibleDecompressedLength) {
+ Singleton<TCodecFactory>()->MaxPossibleDecompressedLength = maxPossibleDecompressedLength;
+}
+
+size_t NBlockCodecs::GetMaxPossibleDecompressedLength() {
+ return Singleton<TCodecFactory>()->MaxPossibleDecompressedLength;
+}
+
+size_t ICodec::GetDecompressedLength(const TData& in) const {
+ const size_t len = DecompressedLength(in);
+
+ Y_ENSURE(
+ len <= NBlockCodecs::GetMaxPossibleDecompressedLength(),
+ "Attempt to decompress the block that is larger than maximum possible decompressed length, "
+ "see SEARCH-8344 for details. "
+ );
+ return len;
+}
+
+void ICodec::Encode(const TData& in, TBuffer& out) const {
+ const size_t maxLen = MaxCompressedLength(in);
+
+ out.Reserve(maxLen);
+ out.Resize(Compress(in, out.Data()));
+}
+
+void ICodec::Decode(const TData& in, TBuffer& out) const {
+ const size_t len = GetDecompressedLength(in);
+
+ out.Reserve(len);
+ out.Resize(Decompress(in, out.Data()));
+}
+
+void ICodec::Encode(const TData& in, TString& out) const {
+ const size_t maxLen = MaxCompressedLength(in);
+ out.ReserveAndResize(maxLen);
+
+ size_t actualLen = Compress(in, out.begin());
+ Y_ASSERT(actualLen <= maxLen);
+ out.resize(actualLen);
+}
+
+void ICodec::Decode(const TData& in, TString& out) const {
+ const size_t maxLen = GetDecompressedLength(in);
+ out.ReserveAndResize(maxLen);
+
+ size_t actualLen = Decompress(in, out.begin());
+ Y_ASSERT(actualLen <= maxLen);
+ out.resize(actualLen);
+}
+
+ICodec::~ICodec() = default;
diff --git a/library/cpp/blockcodecs/core/codecs.h b/library/cpp/blockcodecs/core/codecs.h
new file mode 100644
index 00000000000..9c93c002748
--- /dev/null
+++ b/library/cpp/blockcodecs/core/codecs.h
@@ -0,0 +1,90 @@
+#pragma once
+
+#include <util/generic/buffer.h>
+#include <util/generic/strbuf.h>
+#include <util/generic/string.h>
+#include <util/generic/typetraits.h>
+#include <util/generic/vector.h>
+#include <util/generic/yexception.h>
+
+namespace NBlockCodecs {
+ struct TData: public TStringBuf {
+ inline TData() = default;
+
+ Y_HAS_MEMBER(Data);
+ Y_HAS_MEMBER(Size);
+
+ template <class T, std::enable_if_t<!THasSize<T>::value || !THasData<T>::value, int> = 0>
+ inline TData(const T& t)
+ : TStringBuf((const char*)t.data(), t.size())
+ {
+ }
+
+ template <class T, std::enable_if_t<THasSize<T>::value && THasData<T>::value, int> = 0>
+ inline TData(const T& t)
+ : TStringBuf((const char*)t.Data(), t.Size())
+ {
+ }
+ };
+
+ struct TCodecError: public yexception {
+ };
+
+ struct TNotFound: public TCodecError {
+ };
+
+ struct TDataError: public TCodecError {
+ };
+
+ struct ICodec {
+ virtual ~ICodec();
+
+ // main interface
+ virtual size_t DecompressedLength(const TData& in) const = 0;
+ virtual size_t MaxCompressedLength(const TData& in) const = 0;
+ virtual size_t Compress(const TData& in, void* out) const = 0;
+ virtual size_t Decompress(const TData& in, void* out) const = 0;
+
+ virtual TStringBuf Name() const noexcept = 0;
+
+ // some useful helpers
+ void Encode(const TData& in, TBuffer& out) const;
+ void Decode(const TData& in, TBuffer& out) const;
+
+ void Encode(const TData& in, TString& out) const;
+ void Decode(const TData& in, TString& out) const;
+
+ inline TString Encode(const TData& in) const {
+ TString out;
+
+ Encode(in, out);
+
+ return out;
+ }
+
+ inline TString Decode(const TData& in) const {
+ TString out;
+
+ Decode(in, out);
+
+ return out;
+ }
+ private:
+ size_t GetDecompressedLength(const TData& in) const;
+ };
+
+ using TCodecPtr = THolder<ICodec>;
+
+ const ICodec* Codec(const TStringBuf& name);
+
+ // some aux methods
+ typedef TVector<TStringBuf> TCodecList;
+ TCodecList ListAllCodecs();
+ TString ListAllCodecsAsString();
+
+ // SEARCH-8344: Get the size of max possible decompressed block
+ size_t GetMaxPossibleDecompressedLength();
+ // SEARCH-8344: Globally set the size of max possible decompressed block
+ void SetMaxPossibleDecompressedLength(size_t maxPossibleDecompressedLength);
+
+}
diff --git a/library/cpp/blockcodecs/core/common.h b/library/cpp/blockcodecs/core/common.h
new file mode 100644
index 00000000000..f05df4d3341
--- /dev/null
+++ b/library/cpp/blockcodecs/core/common.h
@@ -0,0 +1,105 @@
+#pragma once
+
+#include "codecs.h"
+
+#include <util/ysaveload.h>
+#include <util/stream/null.h>
+#include <util/stream/mem.h>
+#include <util/string/cast.h>
+#include <util/string/join.h>
+#include <util/system/align.h>
+#include <util/system/unaligned_mem.h>
+#include <util/generic/hash.h>
+#include <util/generic/cast.h>
+#include <util/generic/buffer.h>
+#include <util/generic/array_ref.h>
+#include <util/generic/singleton.h>
+#include <util/generic/algorithm.h>
+#include <util/generic/mem_copy.h>
+
+namespace NBlockCodecs {
+ struct TDecompressError: public TDataError {
+ TDecompressError(int code) {
+ *this << "cannot decompress (errcode " << code << ")";
+ }
+
+ TDecompressError(size_t exp, size_t real) {
+ *this << "broken input (expected len: " << exp << ", got: " << real << ")";
+ }
+ };
+
+ struct TCompressError: public TDataError {
+ TCompressError(int code) {
+ *this << "cannot compress (errcode " << code << ")";
+ }
+ };
+
+ struct TNullCodec: public ICodec {
+ size_t DecompressedLength(const TData& in) const override {
+ return in.size();
+ }
+
+ size_t MaxCompressedLength(const TData& in) const override {
+ return in.size();
+ }
+
+ size_t Compress(const TData& in, void* out) const override {
+ MemCopy((char*)out, in.data(), in.size());
+
+ return in.size();
+ }
+
+ size_t Decompress(const TData& in, void* out) const override {
+ MemCopy((char*)out, in.data(), in.size());
+
+ return in.size();
+ }
+
+ TStringBuf Name() const noexcept override {
+ return TStringBuf("null");
+ }
+ };
+
+ template <class T>
+ struct TAddLengthCodec: public ICodec {
+ static inline void Check(const TData& in) {
+ if (in.size() < sizeof(ui64)) {
+ ythrow TDataError() << "too small input";
+ }
+ }
+
+ size_t DecompressedLength(const TData& in) const override {
+ Check(in);
+
+ return ReadUnaligned<ui64>(in.data());
+ }
+
+ size_t MaxCompressedLength(const TData& in) const override {
+ return T::DoMaxCompressedLength(in.size()) + sizeof(ui64);
+ }
+
+ size_t Compress(const TData& in, void* out) const override {
+ ui64* ptr = (ui64*)out;
+
+ WriteUnaligned<ui64>(ptr, (ui64) in.size());
+
+ return Base()->DoCompress(!in ? TData(TStringBuf("")) : in, ptr + 1) + sizeof(*ptr);
+ }
+
+ size_t Decompress(const TData& in, void* out) const override {
+ Check(in);
+
+ const auto len = ReadUnaligned<ui64>(in.data());
+
+ if (!len)
+ return 0;
+
+ Base()->DoDecompress(TData(in).Skip(sizeof(len)), out, len);
+ return len;
+ }
+
+ inline const T* Base() const noexcept {
+ return static_cast<const T*>(this);
+ }
+ };
+}
diff --git a/library/cpp/blockcodecs/core/register.h b/library/cpp/blockcodecs/core/register.h
new file mode 100644
index 00000000000..fa1186dd705
--- /dev/null
+++ b/library/cpp/blockcodecs/core/register.h
@@ -0,0 +1,10 @@
+#pragma once
+
+#include "codecs.h"
+
+namespace NBlockCodecs{
+
+ void RegisterCodec(TCodecPtr codec);
+ void RegisterAlias(TStringBuf from, TStringBuf to);
+
+}
diff --git a/library/cpp/blockcodecs/core/stream.cpp b/library/cpp/blockcodecs/core/stream.cpp
new file mode 100644
index 00000000000..4f7db3c32be
--- /dev/null
+++ b/library/cpp/blockcodecs/core/stream.cpp
@@ -0,0 +1,212 @@
+#include "stream.h"
+#include "codecs.h"
+
+#include <util/digest/murmur.h>
+#include <util/generic/scope.h>
+#include <util/generic/cast.h>
+#include <util/generic/hash.h>
+#include <util/generic/singleton.h>
+#include <util/stream/mem.h>
+#include <util/ysaveload.h>
+
+using namespace NBlockCodecs;
+
+namespace {
+ constexpr size_t MAX_BUF_LEN = 128 * 1024 * 1024;
+
+ typedef ui16 TCodecID;
+ typedef ui64 TBlockLen;
+
+ struct TIds {
+ inline TIds() {
+ const TCodecList lst = ListAllCodecs();
+
+ for (size_t i = 0; i < lst.size(); ++i) {
+ const ICodec* c = Codec(lst[i]);
+
+ ByID[CodecID(c)] = c;
+ }
+ }
+
+ static inline TCodecID CodecID(const ICodec* c) {
+ const TStringBuf name = c->Name();
+
+ union {
+ ui16 Parts[2];
+ ui32 Data;
+ } x;
+
+ x.Data = MurmurHash<ui32>(name.data(), name.size());
+
+ return x.Parts[1] ^ x.Parts[0];
+ }
+
+ inline const ICodec* Find(TCodecID id) const {
+ TByID::const_iterator it = ByID.find(id);
+
+ if (it != ByID.end()) {
+ return it->second;
+ }
+
+ ythrow yexception() << "can not find codec by id " << id;
+ }
+
+ typedef THashMap<TCodecID, const ICodec*> TByID;
+ TByID ByID;
+ };
+
+ TCodecID CodecID(const ICodec* c) {
+ return TIds::CodecID(c);
+ }
+
+ const ICodec* CodecByID(TCodecID id) {
+ return Singleton<TIds>()->Find(id);
+ }
+}
+
+TCodedOutput::TCodedOutput(IOutputStream* out, const ICodec* c, size_t bufLen)
+ : C_(c)
+ , D_(bufLen)
+ , S_(out)
+{
+ if (bufLen > MAX_BUF_LEN) {
+ ythrow yexception() << TStringBuf("too big buffer size: ") << bufLen;
+ }
+}
+
+TCodedOutput::~TCodedOutput() {
+ try {
+ Finish();
+ } catch (...) {
+ }
+}
+
+void TCodedOutput::DoWrite(const void* buf, size_t len) {
+ const char* in = (const char*)buf;
+
+ while (len) {
+ const size_t avail = D_.Avail();
+
+ if (len < avail) {
+ D_.Append(in, len);
+
+ return;
+ }
+
+ D_.Append(in, avail);
+
+ Y_ASSERT(!D_.Avail());
+
+ in += avail;
+ len -= avail;
+
+ Y_VERIFY(FlushImpl(), "flush on writing failed");
+ }
+}
+
+bool TCodedOutput::FlushImpl() {
+ const bool ret = !D_.Empty();
+ const size_t payload = sizeof(TCodecID) + sizeof(TBlockLen);
+ O_.Reserve(C_->MaxCompressedLength(D_) + payload);
+
+ void* out = O_.Data() + payload;
+ const size_t olen = C_->Compress(D_, out);
+
+ {
+ TMemoryOutput mo(O_.Data(), payload);
+
+ ::Save(&mo, CodecID(C_));
+ ::Save(&mo, SafeIntegerCast<TBlockLen>(olen));
+ }
+
+ S_->Write(O_.Data(), payload + olen);
+
+ D_.Clear();
+ O_.Clear();
+
+ return ret;
+}
+
+void TCodedOutput::DoFlush() {
+ if (S_ && !D_.Empty()) {
+ FlushImpl();
+ }
+}
+
+void TCodedOutput::DoFinish() {
+ if (S_) {
+ Y_DEFER {
+ S_ = nullptr;
+ };
+
+ if (FlushImpl()) {
+ //always write zero-length block as eos marker
+ FlushImpl();
+ }
+ }
+}
+
+TDecodedInput::TDecodedInput(IInputStream* in)
+ : S_(in)
+ , C_(nullptr)
+{
+}
+
+TDecodedInput::TDecodedInput(IInputStream* in, const ICodec* codec)
+ : S_(in)
+ , C_(codec)
+{
+}
+
+TDecodedInput::~TDecodedInput() = default;
+
+size_t TDecodedInput::DoUnboundedNext(const void** ptr) {
+ if (!S_) {
+ return 0;
+ }
+
+ TCodecID codecId;
+ TBlockLen blockLen;
+
+ {
+ const size_t payload = sizeof(TCodecID) + sizeof(TBlockLen);
+ char buf[32];
+
+ S_->LoadOrFail(buf, payload);
+
+ TMemoryInput in(buf, payload);
+
+ ::Load(&in, codecId);
+ ::Load(&in, blockLen);
+ }
+
+ if (!blockLen) {
+ S_ = nullptr;
+
+ return 0;
+ }
+
+ if (Y_UNLIKELY(blockLen > 1024 * 1024 * 1024)) {
+ ythrow yexception() << "block size exceeds 1 GiB";
+ }
+
+ TBuffer block;
+ block.Resize(blockLen);
+
+ S_->LoadOrFail(block.Data(), blockLen);
+
+ auto codec = CodecByID(codecId);
+
+ if (C_) {
+ Y_ENSURE(C_->Name() == codec->Name(), TStringBuf("incorrect stream codec"));
+ }
+
+ if (codec->DecompressedLength(block) > MAX_BUF_LEN) {
+ ythrow yexception() << "broken stream";
+ }
+
+ codec->Decode(block, D_);
+ *ptr = D_.Data();
+
+ return D_.Size();
+}
diff --git a/library/cpp/blockcodecs/core/stream.h b/library/cpp/blockcodecs/core/stream.h
new file mode 100644
index 00000000000..fd44ef88f2c
--- /dev/null
+++ b/library/cpp/blockcodecs/core/stream.h
@@ -0,0 +1,46 @@
+#pragma once
+
+#include <util/stream/walk.h>
+#include <util/stream/input.h>
+#include <util/stream/output.h>
+#include <util/stream/zerocopy.h>
+#include <util/generic/buffer.h>
+
+namespace NBlockCodecs {
+ struct ICodec;
+
+ class TCodedOutput: public IOutputStream {
+ public:
+ TCodedOutput(IOutputStream* out, const ICodec* c, size_t bufLen);
+ ~TCodedOutput() override;
+
+ private:
+ void DoWrite(const void* buf, size_t len) override;
+ void DoFlush() override;
+ void DoFinish() override;
+
+ bool FlushImpl();
+
+ private:
+ const ICodec* C_;
+ TBuffer D_;
+ TBuffer O_;
+ IOutputStream* S_;
+ };
+
+ class TDecodedInput: public IWalkInput {
+ public:
+ TDecodedInput(IInputStream* in);
+ TDecodedInput(IInputStream* in, const ICodec* codec);
+
+ ~TDecodedInput() override;
+
+ private:
+ size_t DoUnboundedNext(const void** ptr) override;
+
+ private:
+ TBuffer D_;
+ IInputStream* S_;
+ const ICodec* C_;
+ };
+}
diff --git a/library/cpp/blockcodecs/core/ya.make b/library/cpp/blockcodecs/core/ya.make
new file mode 100644
index 00000000000..069e15927bf
--- /dev/null
+++ b/library/cpp/blockcodecs/core/ya.make
@@ -0,0 +1,10 @@
+LIBRARY()
+
+OWNER(pg)
+
+SRCS(
+ codecs.cpp
+ stream.cpp
+)
+
+END()
diff --git a/library/cpp/blockcodecs/fuzz/main.cpp b/library/cpp/blockcodecs/fuzz/main.cpp
new file mode 100644
index 00000000000..763c6c5a10b
--- /dev/null
+++ b/library/cpp/blockcodecs/fuzz/main.cpp
@@ -0,0 +1,84 @@
+#include <contrib/libs/protobuf-mutator/src/libfuzzer/libfuzzer_macro.h>
+#include <google/protobuf/stubs/logging.h>
+
+#include <library/cpp/blockcodecs/codecs.h>
+#include <library/cpp/blockcodecs/fuzz/proto/case.pb.h>
+#include <library/cpp/blockcodecs/stream.h>
+
+#include <util/stream/input.h>
+#include <util/stream/length.h>
+#include <util/stream/mem.h>
+#include <util/stream/null.h>
+#include <util/stream/str.h>
+
+using NBlockCodecs::NFuzz::TPackUnpackCase;
+using NBlockCodecs::TCodedOutput;
+using NBlockCodecs::TDecodedInput;
+
+static void ValidateBufferSize(const ui32 size) {
+ Y_ENSURE(size > 0 && size <= 16ULL * 1024);
+}
+
+static void DoOnlyDecode(const TPackUnpackCase& case_) {
+ if (!case_.GetPacked()) {
+ return;
+ }
+
+ TMemoryInput mi(case_.GetData().data(), case_.GetData().size());
+ TDecodedInput di(&mi);
+ TNullOutput no;
+ TCountingOutput cno(&no);
+ TransferData(&di, &cno);
+}
+
+static void DoDecodeEncode(const TPackUnpackCase& case_) {
+ auto* const codec = NBlockCodecs::Codec(case_.GetCodecName());
+ Y_ENSURE(codec);
+
+ TMemoryInput mi(case_.GetData().data(), case_.GetData().size());
+ TDecodedInput di(&mi, codec);
+ TStringStream decoded;
+ TransferData(&di, &decoded);
+ TNullOutput no;
+ TCountingOutput cno(&no);
+ TCodedOutput co(&cno, codec, case_.GetBufferLength());
+ TransferData(&decoded, &co);
+ co.Flush();
+
+ Y_VERIFY((case_.GetData().size() > 0) == (cno.Counter() > 0));
+ Y_VERIFY((case_.GetData().size() > 0) == (decoded.Str().size() > 0));
+}
+
+static void DoEncodeDecode(const TPackUnpackCase& case_) {
+ auto* const codec = NBlockCodecs::Codec(case_.GetCodecName());
+ Y_ENSURE(codec);
+
+ TMemoryInput mi(case_.GetData().data(), case_.GetData().size());
+ TStringStream encoded;
+ TCodedOutput co(&encoded, codec, case_.GetBufferLength());
+ TransferData(&mi, &co);
+ co.Flush();
+ TStringStream decoded;
+ TDecodedInput di(&encoded, codec);
+ TransferData(&di, &decoded);
+
+ Y_VERIFY((case_.GetData().size() > 0) == (encoded.Str().size() > 0));
+ Y_VERIFY(case_.GetData() == decoded.Str());
+}
+
+DEFINE_BINARY_PROTO_FUZZER(const TPackUnpackCase& case_) {
+ try {
+ if (!case_.GetCodecName()) {
+ DoOnlyDecode(case_);
+ return;
+ }
+
+ ValidateBufferSize(case_.GetBufferLength());
+ if (case_.GetPacked()) {
+ DoDecodeEncode(case_);
+ } else {
+ DoEncodeDecode(case_);
+ }
+ } catch (const std::exception&) {
+ }
+}
diff --git a/library/cpp/blockcodecs/fuzz/proto/case.proto b/library/cpp/blockcodecs/fuzz/proto/case.proto
new file mode 100644
index 00000000000..85518b0da9e
--- /dev/null
+++ b/library/cpp/blockcodecs/fuzz/proto/case.proto
@@ -0,0 +1,10 @@
+syntax="proto3";
+
+package NBlockCodecs.NFuzz;
+
+message TPackUnpackCase {
+ bool Packed = 1;
+ uint32 BufferLength = 2;
+ string CodecName = 3;
+ bytes Data = 4;
+}
diff --git a/library/cpp/blockcodecs/fuzz/proto/ya.make b/library/cpp/blockcodecs/fuzz/proto/ya.make
new file mode 100644
index 00000000000..da840bc8c93
--- /dev/null
+++ b/library/cpp/blockcodecs/fuzz/proto/ya.make
@@ -0,0 +1,14 @@
+OWNER(
+ yazevnul
+ g:util
+)
+
+PROTO_LIBRARY()
+
+SRCS(
+ case.proto
+)
+
+EXCLUDE_TAGS(GO_PROTO)
+
+END()
diff --git a/library/cpp/blockcodecs/fuzz/ya.make b/library/cpp/blockcodecs/fuzz/ya.make
new file mode 100644
index 00000000000..bc8becc9e1d
--- /dev/null
+++ b/library/cpp/blockcodecs/fuzz/ya.make
@@ -0,0 +1,23 @@
+OWNER(
+ pg
+ g:util
+)
+
+IF (NOT MSVC)
+ FUZZ()
+
+ SIZE(MEDIUM)
+
+ SRCS(
+ main.cpp
+ )
+
+ PEERDIR(
+ contrib/libs/protobuf
+ contrib/libs/protobuf-mutator
+ library/cpp/blockcodecs
+ library/cpp/blockcodecs/fuzz/proto
+ )
+
+ END()
+ENDIF()
diff --git a/library/cpp/blockcodecs/stream.cpp b/library/cpp/blockcodecs/stream.cpp
new file mode 100644
index 00000000000..65e61e92214
--- /dev/null
+++ b/library/cpp/blockcodecs/stream.cpp
@@ -0,0 +1 @@
+#include "stream.h"
diff --git a/library/cpp/blockcodecs/stream.h b/library/cpp/blockcodecs/stream.h
new file mode 100644
index 00000000000..96c479cf7e1
--- /dev/null
+++ b/library/cpp/blockcodecs/stream.h
@@ -0,0 +1,3 @@
+#pragma once
+
+#include <library/cpp/blockcodecs/core/stream.h>
diff --git a/library/cpp/blockcodecs/ut/ya.make b/library/cpp/blockcodecs/ut/ya.make
new file mode 100644
index 00000000000..25b882c15b2
--- /dev/null
+++ b/library/cpp/blockcodecs/ut/ya.make
@@ -0,0 +1,19 @@
+UNITTEST_FOR(library/cpp/blockcodecs)
+
+OWNER(pg)
+
+FORK_TESTS()
+
+FORK_SUBTESTS()
+
+SPLIT_FACTOR(40)
+
+TIMEOUT(300)
+
+SIZE(MEDIUM)
+
+SRCS(
+ codecs_ut.cpp
+)
+
+END()
diff --git a/library/cpp/blockcodecs/ya.make b/library/cpp/blockcodecs/ya.make
new file mode 100644
index 00000000000..b8f03d5421c
--- /dev/null
+++ b/library/cpp/blockcodecs/ya.make
@@ -0,0 +1,27 @@
+LIBRARY()
+
+OWNER(pg)
+
+PEERDIR(
+ library/cpp/blockcodecs/core
+ library/cpp/blockcodecs/codecs/brotli
+ library/cpp/blockcodecs/codecs/bzip
+ library/cpp/blockcodecs/codecs/fastlz
+ library/cpp/blockcodecs/codecs/legacy_zstd06
+ library/cpp/blockcodecs/codecs/lz4
+ library/cpp/blockcodecs/codecs/lzma
+ library/cpp/blockcodecs/codecs/snappy
+ library/cpp/blockcodecs/codecs/zlib
+ library/cpp/blockcodecs/codecs/zstd
+)
+
+SRCS(
+ codecs.cpp
+ stream.cpp
+)
+
+END()
+
+RECURSE_FOR_TESTS(
+ ut
+)