diff options
author | akozhikhov <akozhikhov@yandex-team.com> | 2023-12-11 02:53:55 +0300 |
---|---|---|
committer | akozhikhov <akozhikhov@yandex-team.com> | 2023-12-11 03:14:58 +0300 |
commit | 81ca8992fffd775c006627b82c211b30ea6f5bb2 (patch) | |
tree | 18b4072d8353c710cd78185728580d9a5464ea5a | |
parent | dc0db37252e88d8f0af234fd17e1e95622f6b123 (diff) | |
download | ydb-81ca8992fffd775c006627b82c211b30ea6f5bb2.tar.gz |
YT-20593: Support Zstd dictionary compression in yt/core
-rw-r--r-- | yt/yt/core/CMakeLists.darwin-arm64.txt | 1 | ||||
-rw-r--r-- | yt/yt/core/CMakeLists.darwin-x86_64.txt | 1 | ||||
-rw-r--r-- | yt/yt/core/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | yt/yt/core/CMakeLists.linux-x86_64.txt | 1 | ||||
-rw-r--r-- | yt/yt/core/CMakeLists.windows-x86_64.txt | 1 | ||||
-rw-r--r-- | yt/yt/core/compression/codec.cpp | 1 | ||||
-rw-r--r-- | yt/yt/core/compression/codec.h | 1 | ||||
-rw-r--r-- | yt/yt/core/compression/dictionary_codec.cpp | 79 | ||||
-rw-r--r-- | yt/yt/core/compression/dictionary_codec.h | 112 | ||||
-rw-r--r-- | yt/yt/core/compression/public.h | 6 | ||||
-rw-r--r-- | yt/yt/core/compression/unittests/dictionary_compression_ut.cpp | 240 | ||||
-rw-r--r-- | yt/yt/core/compression/unittests/ya.make | 1 | ||||
-rw-r--r-- | yt/yt/core/compression/zstd.cpp | 386 | ||||
-rw-r--r-- | yt/yt/core/compression/zstd.h | 32 | ||||
-rw-r--r-- | yt/yt/core/misc/serialize-inl.h | 2 | ||||
-rw-r--r-- | yt/yt/core/ya.make | 1 |
16 files changed, 855 insertions, 11 deletions
diff --git a/yt/yt/core/CMakeLists.darwin-arm64.txt b/yt/yt/core/CMakeLists.darwin-arm64.txt index 21ed07c2d3..192a65f028 100644 --- a/yt/yt/core/CMakeLists.darwin-arm64.txt +++ b/yt/yt/core/CMakeLists.darwin-arm64.txt @@ -80,6 +80,7 @@ target_sources(yt-yt-core PRIVATE ${CMAKE_SOURCE_DIR}/yt/yt/core/compression/brotli.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/compression/bzip2.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/compression/codec.cpp + ${CMAKE_SOURCE_DIR}/yt/yt/core/compression/dictionary_codec.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/compression/stream.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/compression/lz.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/compression/lzma.cpp diff --git a/yt/yt/core/CMakeLists.darwin-x86_64.txt b/yt/yt/core/CMakeLists.darwin-x86_64.txt index 7a6160415a..0c1266e936 100644 --- a/yt/yt/core/CMakeLists.darwin-x86_64.txt +++ b/yt/yt/core/CMakeLists.darwin-x86_64.txt @@ -81,6 +81,7 @@ target_sources(yt-yt-core PRIVATE ${CMAKE_SOURCE_DIR}/yt/yt/core/compression/brotli.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/compression/bzip2.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/compression/codec.cpp + ${CMAKE_SOURCE_DIR}/yt/yt/core/compression/dictionary_codec.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/compression/stream.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/compression/lz.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/compression/lzma.cpp diff --git a/yt/yt/core/CMakeLists.linux-aarch64.txt b/yt/yt/core/CMakeLists.linux-aarch64.txt index fb60bd26a5..d2a2d8b067 100644 --- a/yt/yt/core/CMakeLists.linux-aarch64.txt +++ b/yt/yt/core/CMakeLists.linux-aarch64.txt @@ -81,6 +81,7 @@ target_sources(yt-yt-core PRIVATE ${CMAKE_SOURCE_DIR}/yt/yt/core/compression/brotli.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/compression/bzip2.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/compression/codec.cpp + ${CMAKE_SOURCE_DIR}/yt/yt/core/compression/dictionary_codec.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/compression/stream.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/compression/lz.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/compression/lzma.cpp diff --git a/yt/yt/core/CMakeLists.linux-x86_64.txt b/yt/yt/core/CMakeLists.linux-x86_64.txt index 02cb5341e7..66f859aadc 100644 --- a/yt/yt/core/CMakeLists.linux-x86_64.txt +++ b/yt/yt/core/CMakeLists.linux-x86_64.txt @@ -82,6 +82,7 @@ target_sources(yt-yt-core PRIVATE ${CMAKE_SOURCE_DIR}/yt/yt/core/compression/brotli.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/compression/bzip2.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/compression/codec.cpp + ${CMAKE_SOURCE_DIR}/yt/yt/core/compression/dictionary_codec.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/compression/stream.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/compression/lz.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/compression/lzma.cpp diff --git a/yt/yt/core/CMakeLists.windows-x86_64.txt b/yt/yt/core/CMakeLists.windows-x86_64.txt index dba46ac9ef..6c965d6cbf 100644 --- a/yt/yt/core/CMakeLists.windows-x86_64.txt +++ b/yt/yt/core/CMakeLists.windows-x86_64.txt @@ -80,6 +80,7 @@ target_sources(yt-yt-core PRIVATE ${CMAKE_SOURCE_DIR}/yt/yt/core/compression/brotli.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/compression/bzip2.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/compression/codec.cpp + ${CMAKE_SOURCE_DIR}/yt/yt/core/compression/dictionary_codec.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/compression/stream.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/compression/lz.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/compression/lzma.cpp diff --git a/yt/yt/core/compression/codec.cpp b/yt/yt/core/compression/codec.cpp index 36e789e26d..3edaae9de3 100644 --- a/yt/yt/core/compression/codec.cpp +++ b/yt/yt/core/compression/codec.cpp @@ -454,4 +454,3 @@ const std::vector<ECodec>& GetSupportedCodecIds() //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NCompression - diff --git a/yt/yt/core/compression/codec.h b/yt/yt/core/compression/codec.h index 57483645e5..ec102e6a55 100644 --- a/yt/yt/core/compression/codec.h +++ b/yt/yt/core/compression/codec.h @@ -40,4 +40,3 @@ const std::vector<ECodec>& GetSupportedCodecIds(); //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NCompression - diff --git a/yt/yt/core/compression/dictionary_codec.cpp b/yt/yt/core/compression/dictionary_codec.cpp new file mode 100644 index 0000000000..08b5ff2da7 --- /dev/null +++ b/yt/yt/core/compression/dictionary_codec.cpp @@ -0,0 +1,79 @@ +#include "dictionary_codec.h" +#include "zstd.h" + +namespace NYT::NCompression { + +using namespace NDetail; + +//////////////////////////////////////////////////////////////////////////////// + +class TZstdDictionaryCompressionCodec + : public IDictionaryCompressionCodec +{ +public: + int GetMinDictionarySize() const override + { + return ZstdGetMinDictionarySize(); + } + + int GetMaxCompressionLevel() const override + { + return ZstdGetMaxCompressionLevel(); + } + + int GetDefaultCompressionLevel() const override + { + return ZstdGetDefaultCompressionLevel(); + } + + TErrorOr<TSharedRef> TrainCompressionDictionary( + i64 dictionarySize, + const std::vector<TSharedRef>& samples) const override + { + return ZstdTrainCompressionDictionary(dictionarySize, samples); + } + + IDictionaryCompressorPtr CreateDictionaryCompressor( + const IDigestedCompressionDictionaryPtr& digestedCompressionDictionary) const override + { + return ZstdCreateDictionaryCompressor(digestedCompressionDictionary); + } + + IDictionaryDecompressorPtr CreateDictionaryDecompressor( + const IDigestedDecompressionDictionaryPtr& digestedDecompressionDictionary) const override + { + return ZstdCreateDictionaryDecompressor(digestedDecompressionDictionary); + } + + IDigestedCompressionDictionaryPtr CreateDigestedCompressionDictionary( + const TSharedRef& compressionDictionary, + int compressionLevel) const override + { + return ZstdCreateDigestedCompressionDictionary( + compressionDictionary, + compressionLevel); + } + + IDigestedDecompressionDictionaryPtr CreateDigestedDecompressionDictionary( + const TSharedRef& compressionDictionary) const override + { + return ZstdCreateDigestedDecompressionDictionary(compressionDictionary); + } + + TDictionaryCompressionFrameInfo GetFrameInfo(TRef input) const override + { + return ZstdGetFrameInfo(input); + } +}; + +//////////////////////////////////////////////////////////////////////////////// + +IDictionaryCompressionCodec* GetDictionaryCompressionCodec() +{ + static TZstdDictionaryCompressionCodec result; + return &result; +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NCompression diff --git a/yt/yt/core/compression/dictionary_codec.h b/yt/yt/core/compression/dictionary_codec.h new file mode 100644 index 0000000000..8205fa2c50 --- /dev/null +++ b/yt/yt/core/compression/dictionary_codec.h @@ -0,0 +1,112 @@ +#pragma once + +#include "public.h" + +#include <library/cpp/yt/memory/ref.h> + +namespace NYT::NCompression { + +//////////////////////////////////////////////////////////////////////////////// + +//! Each compression frame (i.e. blob compressed via single call to IDictionaryCompressor) contains this header. +struct TDictionaryCompressionFrameInfo +{ + ui64 ContentSize; +}; + +//////////////////////////////////////////////////////////////////////////////// + +//! Compressor interface that is aware of compression context. +//! Thread affinity: single-threaded. +struct IDictionaryCompressor + : public TRefCounted +{ + //! Returns ref to compressed data. Memory will be allocated via #pool. + virtual TRef Compress( + TChunkedMemoryPool* pool, + TRef input) = 0; +}; + +DEFINE_REFCOUNTED_TYPE(IDictionaryCompressor) + +//! Decompressor interface that is aware of decompression context. +//! Thread affinity: single-threaded. +struct IDictionaryDecompressor + : public TRefCounted +{ + //! Decompresses #input into #ouput. + //! Memory for output must be pre-allocated, its size can be infered from frame info. + virtual void Decompress( + TRef input, + TMutableRef output) = 0; +}; + +DEFINE_REFCOUNTED_TYPE(IDictionaryDecompressor) + +//////////////////////////////////////////////////////////////////////////////// + +//! Dictionary digested and ready for compression. +//! Stores preprocessed dictionary data and can be used to create instance of IDictionaryCompressor. +//! May be used concurrently from multiple compressors. +//! Thread affinity: any. +struct IDigestedCompressionDictionary + : public TRefCounted +{ + virtual i64 GetMemoryUsage() const = 0; +}; + +DEFINE_REFCOUNTED_TYPE(IDigestedCompressionDictionary) + +//! Dictionary digested and ready for decompression. +//! Stores preprocessed dictionary data and can be used to create instance of IDictionaryDecompressor. +//! May be used concurrently from multiple decompressors. +//! Thread affinity: any. +struct IDigestedDecompressionDictionary + : public TRefCounted +{ + virtual i64 GetMemoryUsage() const = 0; +}; + +DEFINE_REFCOUNTED_TYPE(IDigestedDecompressionDictionary) + +//////////////////////////////////////////////////////////////////////////////// + +struct IDictionaryCompressionCodec +{ + virtual int GetMinDictionarySize() const = 0; + + virtual int GetMaxCompressionLevel() const = 0; + virtual int GetDefaultCompressionLevel() const = 0; + + //! Trains compression dictionary of size not exceeding #dictionarySize. + //! This dicionary may then be digested for (de)compression. + //! NB: May return null if training failed, e.g. due to lack of #samples + //! or no sufficient profit from using dictionary on them. + virtual TErrorOr<TSharedRef> TrainCompressionDictionary( + i64 dictionarySize, + const std::vector<TSharedRef>& samples) const = 0; + + //! NB: Digested dictionary data will not be copied. + //! (De)compressor will reference digested dictionary for safe access. + virtual IDictionaryCompressorPtr CreateDictionaryCompressor( + const IDigestedCompressionDictionaryPtr& digestedCompressionDictionary) const = 0; + virtual IDictionaryDecompressorPtr CreateDictionaryDecompressor( + const IDigestedDecompressionDictionaryPtr& digestedDecompressionDictionary) const = 0; + + // NB: Raw #compressionDictionary data will be copied and stored within digested dictionary in a preprocessed form. + //! #compressionLevel determines compression level that will be applied for each compression with that dictionary later on. + virtual IDigestedCompressionDictionaryPtr CreateDigestedCompressionDictionary( + const TSharedRef& compressionDictionary, + int compressionLevel) const = 0; + virtual IDigestedDecompressionDictionaryPtr CreateDigestedDecompressionDictionary( + const TSharedRef& compressionDictionary) const = 0; + + //! Parses header of compressed frame #input and returns specified frame info. + virtual TDictionaryCompressionFrameInfo GetFrameInfo(TRef input) const = 0; +}; + +IDictionaryCompressionCodec* GetDictionaryCompressionCodec(); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NCompression diff --git a/yt/yt/core/compression/public.h b/yt/yt/core/compression/public.h index c0921f70a8..7e8c585c63 100644 --- a/yt/yt/core/compression/public.h +++ b/yt/yt/core/compression/public.h @@ -8,6 +8,12 @@ namespace NYT::NCompression { struct ICodec; +struct TDictionaryCompressionFrameInfo; +DECLARE_REFCOUNTED_STRUCT(IDictionaryCompressor) +DECLARE_REFCOUNTED_STRUCT(IDictionaryDecompressor) +DECLARE_REFCOUNTED_STRUCT(IDigestedCompressionDictionary) +DECLARE_REFCOUNTED_STRUCT(IDigestedDecompressionDictionary) + DEFINE_AMBIGUOUS_ENUM_WITH_UNDERLYING_TYPE(ECodec, i8, ((None) (0)) ((Snappy) (1)) diff --git a/yt/yt/core/compression/unittests/dictionary_compression_ut.cpp b/yt/yt/core/compression/unittests/dictionary_compression_ut.cpp new file mode 100644 index 0000000000..c96d1bc40c --- /dev/null +++ b/yt/yt/core/compression/unittests/dictionary_compression_ut.cpp @@ -0,0 +1,240 @@ +#include <yt/yt/core/test_framework/framework.h> + +#include <yt/yt/core/compression/dictionary_codec.h> + +#include <yt/yt/core/misc/blob.h> +#include <yt/yt/core/misc/error.h> +#include <yt/yt/core/misc/serialize.h> + +#include <library/cpp/yt/memory/chunked_memory_pool.h> + +#include <util/random/fast.h> + +#include <contrib/libs/zstd/lib/zstd_errors.h> +#define ZSTD_STATIC_LINKING_ONLY +#include <contrib/libs/zstd/include/zstd.h> + +namespace NYT::NCompression { +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +struct TCompressionContext +{ + IDictionaryCompressorPtr Compressor; + IDictionaryDecompressorPtr Decompressor; +}; + +class TDictionaryCompressionTest + : public ::testing::Test +{ +protected: + TDictionaryCompressionTest() + : Rng_(100) + { } + + void SetUp() override + { + FixedSequence_ = GenerateRandomSequence(100); + YT_VERIFY(FixedSequence_.Size() == 100); + for (int sampleIndex = 0; sampleIndex < 1000; ++sampleIndex) { + Samples_.push_back(DoGenerateSample()); + } + + CompressionDictionary_ = GetDictionaryCompressionCodec()->TrainCompressionDictionary( + 2 * GetDictionaryCompressionCodec()->GetMinDictionarySize(), + Samples_); + } + + const std::vector<TSharedRef>& GetSamples() const + { + return Samples_; + } + + TSharedRef GenerateNewSample() + { + return DoGenerateSample(); + } + + const TErrorOr<TSharedRef>& GetCompressionDictionary() const + { + return CompressionDictionary_; + } + + TCompressionContext CreateCompressionContext() const + { + auto digestedCompressionDictionary = GetDictionaryCompressionCodec()->CreateDigestedCompressionDictionary( + GetCompressionDictionary().Value(), + GetDictionaryCompressionCodec()->GetDefaultCompressionLevel()); + auto compressor = GetDictionaryCompressionCodec()->CreateDictionaryCompressor(digestedCompressionDictionary); + + auto digestedDecompressionDictionary = GetDictionaryCompressionCodec()->CreateDigestedDecompressionDictionary( + GetCompressionDictionary().Value()); + auto decompressor = GetDictionaryCompressionCodec()->CreateDictionaryDecompressor(digestedDecompressionDictionary); + + // NB: We do not need to store digested dictionaries as they must be referenced within (de)compressor. + return { + .Compressor = compressor, + .Decompressor = decompressor, + }; + } + +private: + TFastRng64 Rng_; + + TSharedRef FixedSequence_; + std::vector<TSharedRef> Samples_; + TErrorOr<TSharedRef> CompressionDictionary_; + + + TSharedRef GenerateRandomSequence(int size) + { + int alignedSize = AlignUp<int>(size, 8); + TBlob blob(GetRefCountedTypeCookie<TDefaultBlobTag>(), alignedSize); + auto* ptr = blob.Begin(); + while (ptr - blob.Begin() < alignedSize) { + auto value = Rng_.GenRand(); + WritePod(ptr, value); + } + + blob.Resize(size); + return TSharedRef::FromBlob(std::move(blob)); + } + + TSharedRef DoGenerateSample() + { + int segmentCount = Rng_.GenRand() % 4 + 1; + std::vector<int> segmentSizes(segmentCount); + for (int segmentIndex = 0; segmentIndex < segmentCount; ++segmentIndex) { + segmentSizes[segmentIndex] = std::max<int>(1, Rng_.GenRand() % FixedSequence_.Size()); + } + + TBlob blob( + GetRefCountedTypeCookie<TDefaultBlobTag>(), + std::accumulate(segmentSizes.begin(), segmentSizes.end(), 0)); + auto* ptr = blob.Begin(); + + for (int segmentIndex = 0; segmentIndex < segmentCount; ++segmentIndex) { + if (Rng_.GenRand() % 2 == 0) { + WriteRef(ptr, TRef(FixedSequence_.Begin(), segmentSizes[segmentIndex])); + } else { + auto randomSequence = GenerateRandomSequence(segmentSizes[segmentIndex]); + WriteRef(ptr, randomSequence); + } + } + YT_VERIFY(ptr == blob.End()); + + return TSharedRef::FromBlob(std::move(blob)); + } +}; + +//////////////////////////////////////////////////////////////////////////////// + +TEST_F(TDictionaryCompressionTest, TestMinDictionaryCompressionSize) +{ + EXPECT_TRUE(GetCompressionDictionary().IsOK()); + + auto dictionarySize = GetDictionaryCompressionCodec()->GetMinDictionarySize() >> 1; + EXPECT_GT(dictionarySize, 0); + auto dictionaryOrError = GetDictionaryCompressionCodec()->TrainCompressionDictionary( + dictionarySize, + GetSamples()); + EXPECT_FALSE(dictionaryOrError.IsOK()); + + auto errorAttribute = dictionaryOrError.Attributes().Find<int>("zstd_error_code"); + EXPECT_TRUE(errorAttribute); + EXPECT_EQ(ZSTD_ErrorCode::ZSTD_error_dstSize_tooSmall, errorAttribute); +} + +TEST_F(TDictionaryCompressionTest, TestCompressDecompressSimple) +{ + EXPECT_TRUE(GetCompressionDictionary().IsOK()); + + TChunkedMemoryPool pool; + auto context = CreateCompressionContext(); + + auto data = GenerateNewSample(); + EXPECT_LT(0, std::ssize(data)); + auto compressedData = context.Compressor->Compress(&pool, data); + auto* decompressedData = pool.AllocateUnaligned(data.Size()); + context.Decompressor->Decompress(compressedData, TMutableRef(decompressedData, data.Size())); + + EXPECT_EQ(TStringBuf(data.Begin(), data.Size()), TStringBuf(decompressedData, data.Size())); +} + +TEST_F(TDictionaryCompressionTest, TestFrameInfo) +{ + EXPECT_TRUE(GetCompressionDictionary().IsOK()); + + TChunkedMemoryPool pool; + auto context = CreateCompressionContext(); + + auto data = GenerateNewSample(); + auto compressedData = context.Compressor->Compress(&pool, data); + // This is fine as long as seed is fixed. + EXPECT_LT(compressedData.Size(), data.Size()); + + auto frameInfo = GetDictionaryCompressionCodec()->GetFrameInfo(compressedData); + EXPECT_EQ(data.Size(), frameInfo.ContentSize); +} + +TEST_F(TDictionaryCompressionTest, TestCompressionRatio) +{ + EXPECT_TRUE(GetCompressionDictionary().IsOK()); + + TChunkedMemoryPool pool; + auto context = CreateCompressionContext(); + + int compressedSize = 0; + int decompressedSize = 0; + for (int index = 0; index < 1000; ++index) { + auto data = GenerateNewSample(); + auto compressedData = context.Compressor->Compress(&pool, data); + auto* decompressedData = pool.AllocateUnaligned(data.Size()); + context.Decompressor->Decompress(compressedData, TMutableRef(decompressedData, data.Size())); + EXPECT_EQ(TStringBuf(data.Begin(), data.Size()), TStringBuf(decompressedData, data.Size())); + + compressedSize += compressedData.Size(); + decompressedSize += data.Size(); + } + + EXPECT_LT(compressedSize, 0.8 * decompressedSize); +} + +TEST_F(TDictionaryCompressionTest, ModifiedFrameHeader) +{ + EXPECT_TRUE(GetCompressionDictionary().IsOK()); + + TChunkedMemoryPool pool; + auto context = CreateCompressionContext(); + + auto data = GenerateNewSample(); + auto compressedData = context.Compressor->Compress(&pool, data); + + ZSTD_frameHeader frameHeader; + auto result = ZSTD_getFrameHeader_advanced( + &frameHeader, + compressedData.Begin(), + compressedData.Size(), + ZSTD_f_zstd1); + EXPECT_TRUE(ZSTD_isError(result)); + // Magic number shall not be written to the frame. + EXPECT_EQ(ZSTD_ErrorCode::ZSTD_error_prefix_unknown, ZSTD_getErrorCode(result)); + + result = ZSTD_getFrameHeader_advanced( + &frameHeader, + compressedData.Begin(), + compressedData.Size(), + ZSTD_f_zstd1_magicless); + EXPECT_FALSE(ZSTD_isError(result)); + // These are explicitly disabled. + EXPECT_EQ(0u, frameHeader.dictID); + EXPECT_EQ(0u, frameHeader.checksumFlag); + // We do not use other frame types. + EXPECT_EQ(ZSTD_frame, frameHeader.frameType); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace +} // namespace NYT::NCompression diff --git a/yt/yt/core/compression/unittests/ya.make b/yt/yt/core/compression/unittests/ya.make index 079474b5ef..ec506bdc6f 100644 --- a/yt/yt/core/compression/unittests/ya.make +++ b/yt/yt/core/compression/unittests/ya.make @@ -10,6 +10,7 @@ PROTO_NAMESPACE(yt) SRCS( codec_ut.cpp + dictionary_compression_ut.cpp stream_ut.cpp ) diff --git a/yt/yt/core/compression/zstd.cpp b/yt/yt/core/compression/zstd.cpp index 3ee31b31ac..24351eaca9 100644 --- a/yt/yt/core/compression/zstd.cpp +++ b/yt/yt/core/compression/zstd.cpp @@ -1,11 +1,18 @@ #include "zstd.h" +#include "codec.h" +#include "dictionary_codec.h" #include "private.h" #include <yt/yt/core/misc/blob.h> #include <yt/yt/core/misc/finally.h> +#include <library/cpp/yt/memory/chunked_memory_pool.h> + +#include <contrib/libs/zstd/lib/zstd_errors.h> #define ZSTD_STATIC_LINKING_ONLY #include <contrib/libs/zstd/include/zstd.h> +#define ZDICT_STATIC_LINKING_ONLY +#include <contrib/libs/zstd/include/zdict.h> namespace NYT::NCompression::NDetail { @@ -20,6 +27,15 @@ struct TZstdCompressBufferTag //////////////////////////////////////////////////////////////////////////////// +void VerifyError(size_t result) +{ + YT_LOG_FATAL_IF(ZSTD_isError(result), + "Zstd compression failed (Error: %v)", + ZSTD_getErrorName(result)); +} + +//////////////////////////////////////////////////////////////////////////////// + void ZstdCompress(int level, TSource* source, TBlob* output) { ui64 totalInputSize = source->Available(); @@ -33,11 +49,6 @@ void ZstdCompress(int level, TSource* source, TBlob* output) curOutputPos += sizeof(totalInputSize); } - auto checkError = [] (size_t result) { - YT_LOG_FATAL_IF(ZSTD_isError(result), "Zstd compression failed (Error: %v)", - ZSTD_getErrorName(result)); - }; - auto context = ZSTD_createCCtx(); auto contextGuard = Finally([&] () { ZSTD_freeCCtx(context); @@ -45,7 +56,7 @@ void ZstdCompress(int level, TSource* source, TBlob* output) { auto result = ZSTD_CCtx_setParameter(context, ZSTD_c_compressionLevel, level); - checkError(result); + VerifyError(result); } struct CompressResult @@ -61,7 +72,7 @@ void ZstdCompress(int level, TSource* source, TBlob* output) ZSTD_outBuffer zstdOutput = {output->Begin(), output->Size(), curOutputPos}; size_t result = ZSTD_compressStream2(context, &zstdOutput, &zstdInput, mode); - checkError(result); + VerifyError(result); curOutputPos = zstdOutput.pos; return CompressResult{ @@ -173,5 +184,366 @@ void ZstdDecompress(TSource* source, TBlob* output) //////////////////////////////////////////////////////////////////////////////// +int ZstdGetMinDictionarySize() +{ + return ZDICT_DICTSIZE_MIN; +} + +int ZstdGetMaxCompressionLevel() +{ + return ZSTD_maxCLevel(); +} + +int ZstdGetDefaultCompressionLevel() +{ + return ZSTD_defaultCLevel(); +} + +TDictionaryCompressionFrameInfo ZstdGetFrameInfo(TRef input) +{ + YT_VERIFY(input.Size() >= ZSTD_FRAMEHEADERSIZE_MIN(ZSTD_f_zstd1_magicless)); + + ZSTD_frameHeader frameHeader; + auto result = ZSTD_getFrameHeader_advanced( + &frameHeader, + input.Begin(), + input.Size(), + ZSTD_f_zstd1_magicless); + YT_VERIFY(result == 0); + + return { + .ContentSize = frameHeader.frameContentSize, + }; +} + +//////////////////////////////////////////////////////////////////////////////// + +DECLARE_REFCOUNTED_CLASS(TDigestedCompressionDictionary) + +class TDigestedCompressionDictionary + : public IDigestedCompressionDictionary + , private TNonCopyable +{ +public: + TDigestedCompressionDictionary(ZSTD_CDict* digestedDictionary) + : DigestedDictionary_(digestedDictionary) + { + YT_VERIFY(DigestedDictionary_); + } + + ~TDigestedCompressionDictionary() + { + ZSTD_freeCDict(DigestedDictionary_); + } + + i64 GetMemoryUsage() const override + { + return ZSTD_sizeof_CDict(DigestedDictionary_); + } + + ZSTD_CDict* GetDigestedDictionary() const + { + return DigestedDictionary_; + } + +private: + ZSTD_CDict* const DigestedDictionary_; +}; + +DEFINE_REFCOUNTED_TYPE(TDigestedCompressionDictionary) + +//////////////////////////////////////////////////////////////////////////////// + +DECLARE_REFCOUNTED_CLASS(TDigestedDecompressionDictionary) + +class TDigestedDecompressionDictionary + : public IDigestedDecompressionDictionary + , private TNonCopyable +{ +public: + TDigestedDecompressionDictionary(ZSTD_DDict* digestedDictionary) + : DigestedDictionary_(digestedDictionary) + { + YT_VERIFY(DigestedDictionary_); + } + + ~TDigestedDecompressionDictionary() + { + ZSTD_freeDDict(DigestedDictionary_); + } + + i64 GetMemoryUsage() const override + { + return ZSTD_sizeof_DDict(DigestedDictionary_); + } + + ZSTD_DDict* GetDigestedDictionary() const + { + return DigestedDictionary_; + } + +private: + ZSTD_DDict* const DigestedDictionary_; +}; + +DEFINE_REFCOUNTED_TYPE(TDigestedDecompressionDictionary) + +//////////////////////////////////////////////////////////////////////////////// + +class TDictionaryCompressionContextGuard + : private TMoveOnly +{ +public: + TDictionaryCompressionContextGuard() + : Context_(ZSTD_createCCtx()) + { } + + TDictionaryCompressionContextGuard(TDictionaryCompressionContextGuard&& other) + : Context_(other.Context_) + { + other.Context_ = nullptr; + } + + TDictionaryCompressionContextGuard& operator = (TDictionaryCompressionContextGuard&& other) = delete; + + ~TDictionaryCompressionContextGuard() + { + if (Context_) { + ZSTD_freeCCtx(Context_); + } + } + + ZSTD_CCtx* GetContext() const + { + return Context_; + } + +private: + ZSTD_CCtx* Context_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +class TDictionaryDecompressionContextGuard + : private TMoveOnly +{ +public: + TDictionaryDecompressionContextGuard() + : Context_(ZSTD_createDCtx()) + { } + + TDictionaryDecompressionContextGuard(TDictionaryDecompressionContextGuard&& other) + : Context_(other.Context_) + { + other.Context_ = nullptr; + } + + TDictionaryDecompressionContextGuard& operator = (TDictionaryDecompressionContextGuard&& other) = delete; + + ~TDictionaryDecompressionContextGuard() + { + if (Context_) { + ZSTD_freeDCtx(Context_); + } + } + + ZSTD_DCtx* GetContext() const + { + return Context_; + } + +private: + ZSTD_DCtx* Context_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +class TDictionaryCompressor + : public IDictionaryCompressor +{ +public: + TDictionaryCompressor( + TDictionaryCompressionContextGuard context, + TDigestedCompressionDictionaryPtr digestedDictionary) + : Context_(std::move(context)) + , DigestedDictionary_(std::move(digestedDictionary)) + { } + + TRef Compress( + TChunkedMemoryPool* pool, + TRef input) override + { + auto maxSize = ZSTD_compressBound(input.Size()); + char* output = pool->AllocateUnaligned(maxSize); + + auto actualSize = ZSTD_compress2( + Context_.GetContext(), + output, + maxSize, + input.Begin(), + input.Size()); + VerifyError(actualSize); + YT_VERIFY(actualSize <= maxSize); + pool->Free(output + actualSize, output + maxSize); + + return TRef(output, actualSize); + } + +private: + const TDictionaryCompressionContextGuard Context_; + const TDigestedCompressionDictionaryPtr DigestedDictionary_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +class TDictionaryDecompressor + : public IDictionaryDecompressor +{ +public: + TDictionaryDecompressor( + TDictionaryDecompressionContextGuard context, + TDigestedDecompressionDictionaryPtr digestedDictionary) + : Context_(std::move(context)) + , DigestedDictionary_(std::move(digestedDictionary)) + { } + + void Decompress( + TRef input, + TMutableRef output) override + { + auto decompressedSize = ZSTD_decompressDCtx( + Context_.GetContext(), + output.Begin(), + output.Size(), + input.Begin(), + input.Size()); + VerifyError(decompressedSize); + YT_VERIFY(decompressedSize == output.Size()); + } + +private: + const TDictionaryDecompressionContextGuard Context_; + const TDigestedDecompressionDictionaryPtr DigestedDictionary_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +struct TCompressionDictionaryBlobTag +{ }; + +struct TCompressionDictionarySamplesBlobTag +{ }; + +TErrorOr<TSharedRef> ZstdTrainCompressionDictionary(i64 dictionarySize, const std::vector<TSharedRef>& samples) +{ + TBlob dictionary( + GetRefCountedTypeCookie<TCompressionDictionaryBlobTag>(), + /*size*/ dictionarySize, + /*initiailizeStorage*/ false, + /*pageAligned*/ true); + + std::vector<size_t> sampleSizes; + sampleSizes.reserve(samples.size()); + for (const auto& sample : samples) { + sampleSizes.push_back(sample.size()); + } + + auto mergedSamples = samples.size() == 1 + ? samples[0] + : MergeRefsToRef<TCompressionDictionarySamplesBlobTag>(samples); + + auto resultDictionarySize = ZDICT_trainFromBuffer( + dictionary.Begin(), + dictionarySize, + mergedSamples.Begin(), + sampleSizes.data(), + sampleSizes.size()); + if (ZSTD_isError(resultDictionarySize)) { + auto error = TError("Compression dictionary training failed") + << TErrorAttribute("zstd_error_code", static_cast<int>(ZSTD_getErrorCode(resultDictionarySize))) + << TErrorAttribute("zstd_error_name", ZSTD_getErrorName(resultDictionarySize)); + YT_LOG_DEBUG(error); + return error; + } + + YT_VERIFY(resultDictionarySize <= dictionary.Size()); + dictionary.Resize(resultDictionarySize); + + return TSharedRef::FromBlob(std::move(dictionary)); +} + +//////////////////////////////////////////////////////////////////////////////// + +IDigestedCompressionDictionaryPtr ZstdCreateDigestedCompressionDictionary( + const TSharedRef& compressionDictionary, + int compressionLevel) +{ + YT_VERIFY(compressionDictionary); + + auto* digestedDictionary = ZSTD_createCDict( + compressionDictionary.Begin(), + compressionDictionary.Size(), + compressionLevel); + return New<TDigestedCompressionDictionary>(digestedDictionary); +} + +IDigestedDecompressionDictionaryPtr ZstdCreateDigestedDecompressionDictionary( + const TSharedRef& compressionDictionary) +{ + YT_VERIFY(compressionDictionary); + + auto* digestedDictionary = ZSTD_createDDict( + compressionDictionary.Begin(), + compressionDictionary.Size()); + return New<TDigestedDecompressionDictionary>(digestedDictionary); +} + +//////////////////////////////////////////////////////////////////////////////// + +IDictionaryCompressorPtr ZstdCreateDictionaryCompressor( + const IDigestedCompressionDictionaryPtr& digestedCompressionDictionary) +{ + YT_VERIFY(digestedCompressionDictionary); + auto* typedDictionary = dynamic_cast<TDigestedCompressionDictionary*>(digestedCompressionDictionary.Get()); + YT_VERIFY(typedDictionary); + + TDictionaryCompressionContextGuard context; + + // Omit writing dictID and checksum to frame header. + ZSTD_frameParameters frameParameters{ + .contentSizeFlag = 1, + .checksumFlag = 0, + .noDictIDFlag = 1, + }; + VerifyError(ZSTD_CCtx_setFParams(context.GetContext(), frameParameters)); + + // Omit writing magic number to frame header. + // NB: This parameter must remain intact for compressor-decompressor compatibility. + VerifyError(ZSTD_CCtx_setParameter(context.GetContext(), ZSTD_c_format, ZSTD_f_zstd1_magicless)); + + // Omit copying digested dictionary content. + VerifyError(ZSTD_CCtx_refCDict(context.GetContext(), typedDictionary->GetDigestedDictionary())); + + return New<TDictionaryCompressor>(std::move(context), typedDictionary); +} + +IDictionaryDecompressorPtr ZstdCreateDictionaryDecompressor( + const IDigestedDecompressionDictionaryPtr& digestedDecompressionDictionary) +{ + YT_VERIFY(digestedDecompressionDictionary); + auto* typedDictionary = dynamic_cast<TDigestedDecompressionDictionary*>(digestedDecompressionDictionary.Get()); + YT_VERIFY(typedDictionary); + + TDictionaryDecompressionContextGuard context; + + VerifyError(ZSTD_DCtx_setParameter(context.GetContext(), ZSTD_d_format, ZSTD_f_zstd1_magicless)); + + VerifyError(ZSTD_DCtx_refDDict(context.GetContext(), typedDictionary->GetDigestedDictionary())); + + return New<TDictionaryDecompressor>(std::move(context), typedDictionary); +} + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT::NCompression::NDetail diff --git a/yt/yt/core/compression/zstd.h b/yt/yt/core/compression/zstd.h index d812be590d..c8e4ab8b2f 100644 --- a/yt/yt/core/compression/zstd.h +++ b/yt/yt/core/compression/zstd.h @@ -1,5 +1,6 @@ #pragma once +#include "public.h" #include "stream.h" namespace NYT::NCompression::NDetail { @@ -11,5 +12,34 @@ void ZstdDecompress(TSource* source, TBlob* output); //////////////////////////////////////////////////////////////////////////////// -} // namespace NYT::NCompression::NDetail +int ZstdGetMinDictionarySize(); + +int ZstdGetMaxCompressionLevel(); +int ZstdGetDefaultCompressionLevel(); + +//////////////////////////////////////////////////////////////////////////////// + +//! See codec.h for clarification on these functions. + +TErrorOr<TSharedRef> ZstdTrainCompressionDictionary( + i64 dictionarySize, + const std::vector<TSharedRef>& samples); + +IDictionaryCompressorPtr ZstdCreateDictionaryCompressor( + const IDigestedCompressionDictionaryPtr& digestedCompressionDictionary); +IDictionaryDecompressorPtr ZstdCreateDictionaryDecompressor( + const IDigestedDecompressionDictionaryPtr& digestedDecompressionDictionary); + +IDigestedCompressionDictionaryPtr ZstdCreateDigestedCompressionDictionary( + const TSharedRef& compressionDictionary, + int compressionLevel); + +IDigestedDecompressionDictionaryPtr ZstdCreateDigestedDecompressionDictionary( + const TSharedRef& compressionDictionary); + +TDictionaryCompressionFrameInfo ZstdGetFrameInfo(TRef input); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NCompression::NDetail diff --git a/yt/yt/core/misc/serialize-inl.h b/yt/yt/core/misc/serialize-inl.h index 4546621a37..5269014db1 100644 --- a/yt/yt/core/misc/serialize-inl.h +++ b/yt/yt/core/misc/serialize-inl.h @@ -125,7 +125,7 @@ void ReadPod(TInput& input, T& obj) } template <class T> -void ReadPod(char*& ptr, T& obj) +void ReadPod(const char*& ptr, T& obj) { static_assert(TTypeTraits<T>::IsPod || std::is_trivial_v<T>, "T must be a pod-type."); memcpy(&obj, ptr, sizeof(obj)); diff --git a/yt/yt/core/ya.make b/yt/yt/core/ya.make index 0dc4666a01..456d013d1d 100644 --- a/yt/yt/core/ya.make +++ b/yt/yt/core/ya.make @@ -32,6 +32,7 @@ SRCS( compression/brotli.cpp compression/bzip2.cpp compression/codec.cpp + compression/dictionary_codec.cpp compression/stream.cpp compression/lz.cpp compression/lzma.cpp |