aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorakozhikhov <akozhikhov@yandex-team.com>2023-12-11 02:53:55 +0300
committerakozhikhov <akozhikhov@yandex-team.com>2023-12-11 03:14:58 +0300
commit81ca8992fffd775c006627b82c211b30ea6f5bb2 (patch)
tree18b4072d8353c710cd78185728580d9a5464ea5a
parentdc0db37252e88d8f0af234fd17e1e95622f6b123 (diff)
downloadydb-81ca8992fffd775c006627b82c211b30ea6f5bb2.tar.gz
YT-20593: Support Zstd dictionary compression in yt/core
-rw-r--r--yt/yt/core/CMakeLists.darwin-arm64.txt1
-rw-r--r--yt/yt/core/CMakeLists.darwin-x86_64.txt1
-rw-r--r--yt/yt/core/CMakeLists.linux-aarch64.txt1
-rw-r--r--yt/yt/core/CMakeLists.linux-x86_64.txt1
-rw-r--r--yt/yt/core/CMakeLists.windows-x86_64.txt1
-rw-r--r--yt/yt/core/compression/codec.cpp1
-rw-r--r--yt/yt/core/compression/codec.h1
-rw-r--r--yt/yt/core/compression/dictionary_codec.cpp79
-rw-r--r--yt/yt/core/compression/dictionary_codec.h112
-rw-r--r--yt/yt/core/compression/public.h6
-rw-r--r--yt/yt/core/compression/unittests/dictionary_compression_ut.cpp240
-rw-r--r--yt/yt/core/compression/unittests/ya.make1
-rw-r--r--yt/yt/core/compression/zstd.cpp386
-rw-r--r--yt/yt/core/compression/zstd.h32
-rw-r--r--yt/yt/core/misc/serialize-inl.h2
-rw-r--r--yt/yt/core/ya.make1
16 files changed, 855 insertions, 11 deletions
diff --git a/yt/yt/core/CMakeLists.darwin-arm64.txt b/yt/yt/core/CMakeLists.darwin-arm64.txt
index 21ed07c2d3..192a65f028 100644
--- a/yt/yt/core/CMakeLists.darwin-arm64.txt
+++ b/yt/yt/core/CMakeLists.darwin-arm64.txt
@@ -80,6 +80,7 @@ target_sources(yt-yt-core PRIVATE
${CMAKE_SOURCE_DIR}/yt/yt/core/compression/brotli.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/compression/bzip2.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/compression/codec.cpp
+ ${CMAKE_SOURCE_DIR}/yt/yt/core/compression/dictionary_codec.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/compression/stream.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/compression/lz.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/compression/lzma.cpp
diff --git a/yt/yt/core/CMakeLists.darwin-x86_64.txt b/yt/yt/core/CMakeLists.darwin-x86_64.txt
index 7a6160415a..0c1266e936 100644
--- a/yt/yt/core/CMakeLists.darwin-x86_64.txt
+++ b/yt/yt/core/CMakeLists.darwin-x86_64.txt
@@ -81,6 +81,7 @@ target_sources(yt-yt-core PRIVATE
${CMAKE_SOURCE_DIR}/yt/yt/core/compression/brotli.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/compression/bzip2.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/compression/codec.cpp
+ ${CMAKE_SOURCE_DIR}/yt/yt/core/compression/dictionary_codec.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/compression/stream.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/compression/lz.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/compression/lzma.cpp
diff --git a/yt/yt/core/CMakeLists.linux-aarch64.txt b/yt/yt/core/CMakeLists.linux-aarch64.txt
index fb60bd26a5..d2a2d8b067 100644
--- a/yt/yt/core/CMakeLists.linux-aarch64.txt
+++ b/yt/yt/core/CMakeLists.linux-aarch64.txt
@@ -81,6 +81,7 @@ target_sources(yt-yt-core PRIVATE
${CMAKE_SOURCE_DIR}/yt/yt/core/compression/brotli.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/compression/bzip2.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/compression/codec.cpp
+ ${CMAKE_SOURCE_DIR}/yt/yt/core/compression/dictionary_codec.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/compression/stream.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/compression/lz.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/compression/lzma.cpp
diff --git a/yt/yt/core/CMakeLists.linux-x86_64.txt b/yt/yt/core/CMakeLists.linux-x86_64.txt
index 02cb5341e7..66f859aadc 100644
--- a/yt/yt/core/CMakeLists.linux-x86_64.txt
+++ b/yt/yt/core/CMakeLists.linux-x86_64.txt
@@ -82,6 +82,7 @@ target_sources(yt-yt-core PRIVATE
${CMAKE_SOURCE_DIR}/yt/yt/core/compression/brotli.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/compression/bzip2.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/compression/codec.cpp
+ ${CMAKE_SOURCE_DIR}/yt/yt/core/compression/dictionary_codec.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/compression/stream.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/compression/lz.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/compression/lzma.cpp
diff --git a/yt/yt/core/CMakeLists.windows-x86_64.txt b/yt/yt/core/CMakeLists.windows-x86_64.txt
index dba46ac9ef..6c965d6cbf 100644
--- a/yt/yt/core/CMakeLists.windows-x86_64.txt
+++ b/yt/yt/core/CMakeLists.windows-x86_64.txt
@@ -80,6 +80,7 @@ target_sources(yt-yt-core PRIVATE
${CMAKE_SOURCE_DIR}/yt/yt/core/compression/brotli.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/compression/bzip2.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/compression/codec.cpp
+ ${CMAKE_SOURCE_DIR}/yt/yt/core/compression/dictionary_codec.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/compression/stream.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/compression/lz.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/compression/lzma.cpp
diff --git a/yt/yt/core/compression/codec.cpp b/yt/yt/core/compression/codec.cpp
index 36e789e26d..3edaae9de3 100644
--- a/yt/yt/core/compression/codec.cpp
+++ b/yt/yt/core/compression/codec.cpp
@@ -454,4 +454,3 @@ const std::vector<ECodec>& GetSupportedCodecIds()
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NCompression
-
diff --git a/yt/yt/core/compression/codec.h b/yt/yt/core/compression/codec.h
index 57483645e5..ec102e6a55 100644
--- a/yt/yt/core/compression/codec.h
+++ b/yt/yt/core/compression/codec.h
@@ -40,4 +40,3 @@ const std::vector<ECodec>& GetSupportedCodecIds();
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NCompression
-
diff --git a/yt/yt/core/compression/dictionary_codec.cpp b/yt/yt/core/compression/dictionary_codec.cpp
new file mode 100644
index 0000000000..08b5ff2da7
--- /dev/null
+++ b/yt/yt/core/compression/dictionary_codec.cpp
@@ -0,0 +1,79 @@
+#include "dictionary_codec.h"
+#include "zstd.h"
+
+namespace NYT::NCompression {
+
+using namespace NDetail;
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TZstdDictionaryCompressionCodec
+ : public IDictionaryCompressionCodec
+{
+public:
+ int GetMinDictionarySize() const override
+ {
+ return ZstdGetMinDictionarySize();
+ }
+
+ int GetMaxCompressionLevel() const override
+ {
+ return ZstdGetMaxCompressionLevel();
+ }
+
+ int GetDefaultCompressionLevel() const override
+ {
+ return ZstdGetDefaultCompressionLevel();
+ }
+
+ TErrorOr<TSharedRef> TrainCompressionDictionary(
+ i64 dictionarySize,
+ const std::vector<TSharedRef>& samples) const override
+ {
+ return ZstdTrainCompressionDictionary(dictionarySize, samples);
+ }
+
+ IDictionaryCompressorPtr CreateDictionaryCompressor(
+ const IDigestedCompressionDictionaryPtr& digestedCompressionDictionary) const override
+ {
+ return ZstdCreateDictionaryCompressor(digestedCompressionDictionary);
+ }
+
+ IDictionaryDecompressorPtr CreateDictionaryDecompressor(
+ const IDigestedDecompressionDictionaryPtr& digestedDecompressionDictionary) const override
+ {
+ return ZstdCreateDictionaryDecompressor(digestedDecompressionDictionary);
+ }
+
+ IDigestedCompressionDictionaryPtr CreateDigestedCompressionDictionary(
+ const TSharedRef& compressionDictionary,
+ int compressionLevel) const override
+ {
+ return ZstdCreateDigestedCompressionDictionary(
+ compressionDictionary,
+ compressionLevel);
+ }
+
+ IDigestedDecompressionDictionaryPtr CreateDigestedDecompressionDictionary(
+ const TSharedRef& compressionDictionary) const override
+ {
+ return ZstdCreateDigestedDecompressionDictionary(compressionDictionary);
+ }
+
+ TDictionaryCompressionFrameInfo GetFrameInfo(TRef input) const override
+ {
+ return ZstdGetFrameInfo(input);
+ }
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+IDictionaryCompressionCodec* GetDictionaryCompressionCodec()
+{
+ static TZstdDictionaryCompressionCodec result;
+ return &result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NCompression
diff --git a/yt/yt/core/compression/dictionary_codec.h b/yt/yt/core/compression/dictionary_codec.h
new file mode 100644
index 0000000000..8205fa2c50
--- /dev/null
+++ b/yt/yt/core/compression/dictionary_codec.h
@@ -0,0 +1,112 @@
+#pragma once
+
+#include "public.h"
+
+#include <library/cpp/yt/memory/ref.h>
+
+namespace NYT::NCompression {
+
+////////////////////////////////////////////////////////////////////////////////
+
+//! Each compression frame (i.e. blob compressed via single call to IDictionaryCompressor) contains this header.
+struct TDictionaryCompressionFrameInfo
+{
+ ui64 ContentSize;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+//! Compressor interface that is aware of compression context.
+//! Thread affinity: single-threaded.
+struct IDictionaryCompressor
+ : public TRefCounted
+{
+ //! Returns ref to compressed data. Memory will be allocated via #pool.
+ virtual TRef Compress(
+ TChunkedMemoryPool* pool,
+ TRef input) = 0;
+};
+
+DEFINE_REFCOUNTED_TYPE(IDictionaryCompressor)
+
+//! Decompressor interface that is aware of decompression context.
+//! Thread affinity: single-threaded.
+struct IDictionaryDecompressor
+ : public TRefCounted
+{
+ //! Decompresses #input into #ouput.
+ //! Memory for output must be pre-allocated, its size can be infered from frame info.
+ virtual void Decompress(
+ TRef input,
+ TMutableRef output) = 0;
+};
+
+DEFINE_REFCOUNTED_TYPE(IDictionaryDecompressor)
+
+////////////////////////////////////////////////////////////////////////////////
+
+//! Dictionary digested and ready for compression.
+//! Stores preprocessed dictionary data and can be used to create instance of IDictionaryCompressor.
+//! May be used concurrently from multiple compressors.
+//! Thread affinity: any.
+struct IDigestedCompressionDictionary
+ : public TRefCounted
+{
+ virtual i64 GetMemoryUsage() const = 0;
+};
+
+DEFINE_REFCOUNTED_TYPE(IDigestedCompressionDictionary)
+
+//! Dictionary digested and ready for decompression.
+//! Stores preprocessed dictionary data and can be used to create instance of IDictionaryDecompressor.
+//! May be used concurrently from multiple decompressors.
+//! Thread affinity: any.
+struct IDigestedDecompressionDictionary
+ : public TRefCounted
+{
+ virtual i64 GetMemoryUsage() const = 0;
+};
+
+DEFINE_REFCOUNTED_TYPE(IDigestedDecompressionDictionary)
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct IDictionaryCompressionCodec
+{
+ virtual int GetMinDictionarySize() const = 0;
+
+ virtual int GetMaxCompressionLevel() const = 0;
+ virtual int GetDefaultCompressionLevel() const = 0;
+
+ //! Trains compression dictionary of size not exceeding #dictionarySize.
+ //! This dicionary may then be digested for (de)compression.
+ //! NB: May return null if training failed, e.g. due to lack of #samples
+ //! or no sufficient profit from using dictionary on them.
+ virtual TErrorOr<TSharedRef> TrainCompressionDictionary(
+ i64 dictionarySize,
+ const std::vector<TSharedRef>& samples) const = 0;
+
+ //! NB: Digested dictionary data will not be copied.
+ //! (De)compressor will reference digested dictionary for safe access.
+ virtual IDictionaryCompressorPtr CreateDictionaryCompressor(
+ const IDigestedCompressionDictionaryPtr& digestedCompressionDictionary) const = 0;
+ virtual IDictionaryDecompressorPtr CreateDictionaryDecompressor(
+ const IDigestedDecompressionDictionaryPtr& digestedDecompressionDictionary) const = 0;
+
+ // NB: Raw #compressionDictionary data will be copied and stored within digested dictionary in a preprocessed form.
+ //! #compressionLevel determines compression level that will be applied for each compression with that dictionary later on.
+ virtual IDigestedCompressionDictionaryPtr CreateDigestedCompressionDictionary(
+ const TSharedRef& compressionDictionary,
+ int compressionLevel) const = 0;
+ virtual IDigestedDecompressionDictionaryPtr CreateDigestedDecompressionDictionary(
+ const TSharedRef& compressionDictionary) const = 0;
+
+ //! Parses header of compressed frame #input and returns specified frame info.
+ virtual TDictionaryCompressionFrameInfo GetFrameInfo(TRef input) const = 0;
+};
+
+IDictionaryCompressionCodec* GetDictionaryCompressionCodec();
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NCompression
diff --git a/yt/yt/core/compression/public.h b/yt/yt/core/compression/public.h
index c0921f70a8..7e8c585c63 100644
--- a/yt/yt/core/compression/public.h
+++ b/yt/yt/core/compression/public.h
@@ -8,6 +8,12 @@ namespace NYT::NCompression {
struct ICodec;
+struct TDictionaryCompressionFrameInfo;
+DECLARE_REFCOUNTED_STRUCT(IDictionaryCompressor)
+DECLARE_REFCOUNTED_STRUCT(IDictionaryDecompressor)
+DECLARE_REFCOUNTED_STRUCT(IDigestedCompressionDictionary)
+DECLARE_REFCOUNTED_STRUCT(IDigestedDecompressionDictionary)
+
DEFINE_AMBIGUOUS_ENUM_WITH_UNDERLYING_TYPE(ECodec, i8,
((None) (0))
((Snappy) (1))
diff --git a/yt/yt/core/compression/unittests/dictionary_compression_ut.cpp b/yt/yt/core/compression/unittests/dictionary_compression_ut.cpp
new file mode 100644
index 0000000000..c96d1bc40c
--- /dev/null
+++ b/yt/yt/core/compression/unittests/dictionary_compression_ut.cpp
@@ -0,0 +1,240 @@
+#include <yt/yt/core/test_framework/framework.h>
+
+#include <yt/yt/core/compression/dictionary_codec.h>
+
+#include <yt/yt/core/misc/blob.h>
+#include <yt/yt/core/misc/error.h>
+#include <yt/yt/core/misc/serialize.h>
+
+#include <library/cpp/yt/memory/chunked_memory_pool.h>
+
+#include <util/random/fast.h>
+
+#include <contrib/libs/zstd/lib/zstd_errors.h>
+#define ZSTD_STATIC_LINKING_ONLY
+#include <contrib/libs/zstd/include/zstd.h>
+
+namespace NYT::NCompression {
+namespace {
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TCompressionContext
+{
+ IDictionaryCompressorPtr Compressor;
+ IDictionaryDecompressorPtr Decompressor;
+};
+
+class TDictionaryCompressionTest
+ : public ::testing::Test
+{
+protected:
+ TDictionaryCompressionTest()
+ : Rng_(100)
+ { }
+
+ void SetUp() override
+ {
+ FixedSequence_ = GenerateRandomSequence(100);
+ YT_VERIFY(FixedSequence_.Size() == 100);
+ for (int sampleIndex = 0; sampleIndex < 1000; ++sampleIndex) {
+ Samples_.push_back(DoGenerateSample());
+ }
+
+ CompressionDictionary_ = GetDictionaryCompressionCodec()->TrainCompressionDictionary(
+ 2 * GetDictionaryCompressionCodec()->GetMinDictionarySize(),
+ Samples_);
+ }
+
+ const std::vector<TSharedRef>& GetSamples() const
+ {
+ return Samples_;
+ }
+
+ TSharedRef GenerateNewSample()
+ {
+ return DoGenerateSample();
+ }
+
+ const TErrorOr<TSharedRef>& GetCompressionDictionary() const
+ {
+ return CompressionDictionary_;
+ }
+
+ TCompressionContext CreateCompressionContext() const
+ {
+ auto digestedCompressionDictionary = GetDictionaryCompressionCodec()->CreateDigestedCompressionDictionary(
+ GetCompressionDictionary().Value(),
+ GetDictionaryCompressionCodec()->GetDefaultCompressionLevel());
+ auto compressor = GetDictionaryCompressionCodec()->CreateDictionaryCompressor(digestedCompressionDictionary);
+
+ auto digestedDecompressionDictionary = GetDictionaryCompressionCodec()->CreateDigestedDecompressionDictionary(
+ GetCompressionDictionary().Value());
+ auto decompressor = GetDictionaryCompressionCodec()->CreateDictionaryDecompressor(digestedDecompressionDictionary);
+
+ // NB: We do not need to store digested dictionaries as they must be referenced within (de)compressor.
+ return {
+ .Compressor = compressor,
+ .Decompressor = decompressor,
+ };
+ }
+
+private:
+ TFastRng64 Rng_;
+
+ TSharedRef FixedSequence_;
+ std::vector<TSharedRef> Samples_;
+ TErrorOr<TSharedRef> CompressionDictionary_;
+
+
+ TSharedRef GenerateRandomSequence(int size)
+ {
+ int alignedSize = AlignUp<int>(size, 8);
+ TBlob blob(GetRefCountedTypeCookie<TDefaultBlobTag>(), alignedSize);
+ auto* ptr = blob.Begin();
+ while (ptr - blob.Begin() < alignedSize) {
+ auto value = Rng_.GenRand();
+ WritePod(ptr, value);
+ }
+
+ blob.Resize(size);
+ return TSharedRef::FromBlob(std::move(blob));
+ }
+
+ TSharedRef DoGenerateSample()
+ {
+ int segmentCount = Rng_.GenRand() % 4 + 1;
+ std::vector<int> segmentSizes(segmentCount);
+ for (int segmentIndex = 0; segmentIndex < segmentCount; ++segmentIndex) {
+ segmentSizes[segmentIndex] = std::max<int>(1, Rng_.GenRand() % FixedSequence_.Size());
+ }
+
+ TBlob blob(
+ GetRefCountedTypeCookie<TDefaultBlobTag>(),
+ std::accumulate(segmentSizes.begin(), segmentSizes.end(), 0));
+ auto* ptr = blob.Begin();
+
+ for (int segmentIndex = 0; segmentIndex < segmentCount; ++segmentIndex) {
+ if (Rng_.GenRand() % 2 == 0) {
+ WriteRef(ptr, TRef(FixedSequence_.Begin(), segmentSizes[segmentIndex]));
+ } else {
+ auto randomSequence = GenerateRandomSequence(segmentSizes[segmentIndex]);
+ WriteRef(ptr, randomSequence);
+ }
+ }
+ YT_VERIFY(ptr == blob.End());
+
+ return TSharedRef::FromBlob(std::move(blob));
+ }
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+TEST_F(TDictionaryCompressionTest, TestMinDictionaryCompressionSize)
+{
+ EXPECT_TRUE(GetCompressionDictionary().IsOK());
+
+ auto dictionarySize = GetDictionaryCompressionCodec()->GetMinDictionarySize() >> 1;
+ EXPECT_GT(dictionarySize, 0);
+ auto dictionaryOrError = GetDictionaryCompressionCodec()->TrainCompressionDictionary(
+ dictionarySize,
+ GetSamples());
+ EXPECT_FALSE(dictionaryOrError.IsOK());
+
+ auto errorAttribute = dictionaryOrError.Attributes().Find<int>("zstd_error_code");
+ EXPECT_TRUE(errorAttribute);
+ EXPECT_EQ(ZSTD_ErrorCode::ZSTD_error_dstSize_tooSmall, errorAttribute);
+}
+
+TEST_F(TDictionaryCompressionTest, TestCompressDecompressSimple)
+{
+ EXPECT_TRUE(GetCompressionDictionary().IsOK());
+
+ TChunkedMemoryPool pool;
+ auto context = CreateCompressionContext();
+
+ auto data = GenerateNewSample();
+ EXPECT_LT(0, std::ssize(data));
+ auto compressedData = context.Compressor->Compress(&pool, data);
+ auto* decompressedData = pool.AllocateUnaligned(data.Size());
+ context.Decompressor->Decompress(compressedData, TMutableRef(decompressedData, data.Size()));
+
+ EXPECT_EQ(TStringBuf(data.Begin(), data.Size()), TStringBuf(decompressedData, data.Size()));
+}
+
+TEST_F(TDictionaryCompressionTest, TestFrameInfo)
+{
+ EXPECT_TRUE(GetCompressionDictionary().IsOK());
+
+ TChunkedMemoryPool pool;
+ auto context = CreateCompressionContext();
+
+ auto data = GenerateNewSample();
+ auto compressedData = context.Compressor->Compress(&pool, data);
+ // This is fine as long as seed is fixed.
+ EXPECT_LT(compressedData.Size(), data.Size());
+
+ auto frameInfo = GetDictionaryCompressionCodec()->GetFrameInfo(compressedData);
+ EXPECT_EQ(data.Size(), frameInfo.ContentSize);
+}
+
+TEST_F(TDictionaryCompressionTest, TestCompressionRatio)
+{
+ EXPECT_TRUE(GetCompressionDictionary().IsOK());
+
+ TChunkedMemoryPool pool;
+ auto context = CreateCompressionContext();
+
+ int compressedSize = 0;
+ int decompressedSize = 0;
+ for (int index = 0; index < 1000; ++index) {
+ auto data = GenerateNewSample();
+ auto compressedData = context.Compressor->Compress(&pool, data);
+ auto* decompressedData = pool.AllocateUnaligned(data.Size());
+ context.Decompressor->Decompress(compressedData, TMutableRef(decompressedData, data.Size()));
+ EXPECT_EQ(TStringBuf(data.Begin(), data.Size()), TStringBuf(decompressedData, data.Size()));
+
+ compressedSize += compressedData.Size();
+ decompressedSize += data.Size();
+ }
+
+ EXPECT_LT(compressedSize, 0.8 * decompressedSize);
+}
+
+TEST_F(TDictionaryCompressionTest, ModifiedFrameHeader)
+{
+ EXPECT_TRUE(GetCompressionDictionary().IsOK());
+
+ TChunkedMemoryPool pool;
+ auto context = CreateCompressionContext();
+
+ auto data = GenerateNewSample();
+ auto compressedData = context.Compressor->Compress(&pool, data);
+
+ ZSTD_frameHeader frameHeader;
+ auto result = ZSTD_getFrameHeader_advanced(
+ &frameHeader,
+ compressedData.Begin(),
+ compressedData.Size(),
+ ZSTD_f_zstd1);
+ EXPECT_TRUE(ZSTD_isError(result));
+ // Magic number shall not be written to the frame.
+ EXPECT_EQ(ZSTD_ErrorCode::ZSTD_error_prefix_unknown, ZSTD_getErrorCode(result));
+
+ result = ZSTD_getFrameHeader_advanced(
+ &frameHeader,
+ compressedData.Begin(),
+ compressedData.Size(),
+ ZSTD_f_zstd1_magicless);
+ EXPECT_FALSE(ZSTD_isError(result));
+ // These are explicitly disabled.
+ EXPECT_EQ(0u, frameHeader.dictID);
+ EXPECT_EQ(0u, frameHeader.checksumFlag);
+ // We do not use other frame types.
+ EXPECT_EQ(ZSTD_frame, frameHeader.frameType);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace
+} // namespace NYT::NCompression
diff --git a/yt/yt/core/compression/unittests/ya.make b/yt/yt/core/compression/unittests/ya.make
index 079474b5ef..ec506bdc6f 100644
--- a/yt/yt/core/compression/unittests/ya.make
+++ b/yt/yt/core/compression/unittests/ya.make
@@ -10,6 +10,7 @@ PROTO_NAMESPACE(yt)
SRCS(
codec_ut.cpp
+ dictionary_compression_ut.cpp
stream_ut.cpp
)
diff --git a/yt/yt/core/compression/zstd.cpp b/yt/yt/core/compression/zstd.cpp
index 3ee31b31ac..24351eaca9 100644
--- a/yt/yt/core/compression/zstd.cpp
+++ b/yt/yt/core/compression/zstd.cpp
@@ -1,11 +1,18 @@
#include "zstd.h"
+#include "codec.h"
+#include "dictionary_codec.h"
#include "private.h"
#include <yt/yt/core/misc/blob.h>
#include <yt/yt/core/misc/finally.h>
+#include <library/cpp/yt/memory/chunked_memory_pool.h>
+
+#include <contrib/libs/zstd/lib/zstd_errors.h>
#define ZSTD_STATIC_LINKING_ONLY
#include <contrib/libs/zstd/include/zstd.h>
+#define ZDICT_STATIC_LINKING_ONLY
+#include <contrib/libs/zstd/include/zdict.h>
namespace NYT::NCompression::NDetail {
@@ -20,6 +27,15 @@ struct TZstdCompressBufferTag
////////////////////////////////////////////////////////////////////////////////
+void VerifyError(size_t result)
+{
+ YT_LOG_FATAL_IF(ZSTD_isError(result),
+ "Zstd compression failed (Error: %v)",
+ ZSTD_getErrorName(result));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
void ZstdCompress(int level, TSource* source, TBlob* output)
{
ui64 totalInputSize = source->Available();
@@ -33,11 +49,6 @@ void ZstdCompress(int level, TSource* source, TBlob* output)
curOutputPos += sizeof(totalInputSize);
}
- auto checkError = [] (size_t result) {
- YT_LOG_FATAL_IF(ZSTD_isError(result), "Zstd compression failed (Error: %v)",
- ZSTD_getErrorName(result));
- };
-
auto context = ZSTD_createCCtx();
auto contextGuard = Finally([&] () {
ZSTD_freeCCtx(context);
@@ -45,7 +56,7 @@ void ZstdCompress(int level, TSource* source, TBlob* output)
{
auto result = ZSTD_CCtx_setParameter(context, ZSTD_c_compressionLevel, level);
- checkError(result);
+ VerifyError(result);
}
struct CompressResult
@@ -61,7 +72,7 @@ void ZstdCompress(int level, TSource* source, TBlob* output)
ZSTD_outBuffer zstdOutput = {output->Begin(), output->Size(), curOutputPos};
size_t result = ZSTD_compressStream2(context, &zstdOutput, &zstdInput, mode);
- checkError(result);
+ VerifyError(result);
curOutputPos = zstdOutput.pos;
return CompressResult{
@@ -173,5 +184,366 @@ void ZstdDecompress(TSource* source, TBlob* output)
////////////////////////////////////////////////////////////////////////////////
+int ZstdGetMinDictionarySize()
+{
+ return ZDICT_DICTSIZE_MIN;
+}
+
+int ZstdGetMaxCompressionLevel()
+{
+ return ZSTD_maxCLevel();
+}
+
+int ZstdGetDefaultCompressionLevel()
+{
+ return ZSTD_defaultCLevel();
+}
+
+TDictionaryCompressionFrameInfo ZstdGetFrameInfo(TRef input)
+{
+ YT_VERIFY(input.Size() >= ZSTD_FRAMEHEADERSIZE_MIN(ZSTD_f_zstd1_magicless));
+
+ ZSTD_frameHeader frameHeader;
+ auto result = ZSTD_getFrameHeader_advanced(
+ &frameHeader,
+ input.Begin(),
+ input.Size(),
+ ZSTD_f_zstd1_magicless);
+ YT_VERIFY(result == 0);
+
+ return {
+ .ContentSize = frameHeader.frameContentSize,
+ };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+DECLARE_REFCOUNTED_CLASS(TDigestedCompressionDictionary)
+
+class TDigestedCompressionDictionary
+ : public IDigestedCompressionDictionary
+ , private TNonCopyable
+{
+public:
+ TDigestedCompressionDictionary(ZSTD_CDict* digestedDictionary)
+ : DigestedDictionary_(digestedDictionary)
+ {
+ YT_VERIFY(DigestedDictionary_);
+ }
+
+ ~TDigestedCompressionDictionary()
+ {
+ ZSTD_freeCDict(DigestedDictionary_);
+ }
+
+ i64 GetMemoryUsage() const override
+ {
+ return ZSTD_sizeof_CDict(DigestedDictionary_);
+ }
+
+ ZSTD_CDict* GetDigestedDictionary() const
+ {
+ return DigestedDictionary_;
+ }
+
+private:
+ ZSTD_CDict* const DigestedDictionary_;
+};
+
+DEFINE_REFCOUNTED_TYPE(TDigestedCompressionDictionary)
+
+////////////////////////////////////////////////////////////////////////////////
+
+DECLARE_REFCOUNTED_CLASS(TDigestedDecompressionDictionary)
+
+class TDigestedDecompressionDictionary
+ : public IDigestedDecompressionDictionary
+ , private TNonCopyable
+{
+public:
+ TDigestedDecompressionDictionary(ZSTD_DDict* digestedDictionary)
+ : DigestedDictionary_(digestedDictionary)
+ {
+ YT_VERIFY(DigestedDictionary_);
+ }
+
+ ~TDigestedDecompressionDictionary()
+ {
+ ZSTD_freeDDict(DigestedDictionary_);
+ }
+
+ i64 GetMemoryUsage() const override
+ {
+ return ZSTD_sizeof_DDict(DigestedDictionary_);
+ }
+
+ ZSTD_DDict* GetDigestedDictionary() const
+ {
+ return DigestedDictionary_;
+ }
+
+private:
+ ZSTD_DDict* const DigestedDictionary_;
+};
+
+DEFINE_REFCOUNTED_TYPE(TDigestedDecompressionDictionary)
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TDictionaryCompressionContextGuard
+ : private TMoveOnly
+{
+public:
+ TDictionaryCompressionContextGuard()
+ : Context_(ZSTD_createCCtx())
+ { }
+
+ TDictionaryCompressionContextGuard(TDictionaryCompressionContextGuard&& other)
+ : Context_(other.Context_)
+ {
+ other.Context_ = nullptr;
+ }
+
+ TDictionaryCompressionContextGuard& operator = (TDictionaryCompressionContextGuard&& other) = delete;
+
+ ~TDictionaryCompressionContextGuard()
+ {
+ if (Context_) {
+ ZSTD_freeCCtx(Context_);
+ }
+ }
+
+ ZSTD_CCtx* GetContext() const
+ {
+ return Context_;
+ }
+
+private:
+ ZSTD_CCtx* Context_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TDictionaryDecompressionContextGuard
+ : private TMoveOnly
+{
+public:
+ TDictionaryDecompressionContextGuard()
+ : Context_(ZSTD_createDCtx())
+ { }
+
+ TDictionaryDecompressionContextGuard(TDictionaryDecompressionContextGuard&& other)
+ : Context_(other.Context_)
+ {
+ other.Context_ = nullptr;
+ }
+
+ TDictionaryDecompressionContextGuard& operator = (TDictionaryDecompressionContextGuard&& other) = delete;
+
+ ~TDictionaryDecompressionContextGuard()
+ {
+ if (Context_) {
+ ZSTD_freeDCtx(Context_);
+ }
+ }
+
+ ZSTD_DCtx* GetContext() const
+ {
+ return Context_;
+ }
+
+private:
+ ZSTD_DCtx* Context_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TDictionaryCompressor
+ : public IDictionaryCompressor
+{
+public:
+ TDictionaryCompressor(
+ TDictionaryCompressionContextGuard context,
+ TDigestedCompressionDictionaryPtr digestedDictionary)
+ : Context_(std::move(context))
+ , DigestedDictionary_(std::move(digestedDictionary))
+ { }
+
+ TRef Compress(
+ TChunkedMemoryPool* pool,
+ TRef input) override
+ {
+ auto maxSize = ZSTD_compressBound(input.Size());
+ char* output = pool->AllocateUnaligned(maxSize);
+
+ auto actualSize = ZSTD_compress2(
+ Context_.GetContext(),
+ output,
+ maxSize,
+ input.Begin(),
+ input.Size());
+ VerifyError(actualSize);
+ YT_VERIFY(actualSize <= maxSize);
+ pool->Free(output + actualSize, output + maxSize);
+
+ return TRef(output, actualSize);
+ }
+
+private:
+ const TDictionaryCompressionContextGuard Context_;
+ const TDigestedCompressionDictionaryPtr DigestedDictionary_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TDictionaryDecompressor
+ : public IDictionaryDecompressor
+{
+public:
+ TDictionaryDecompressor(
+ TDictionaryDecompressionContextGuard context,
+ TDigestedDecompressionDictionaryPtr digestedDictionary)
+ : Context_(std::move(context))
+ , DigestedDictionary_(std::move(digestedDictionary))
+ { }
+
+ void Decompress(
+ TRef input,
+ TMutableRef output) override
+ {
+ auto decompressedSize = ZSTD_decompressDCtx(
+ Context_.GetContext(),
+ output.Begin(),
+ output.Size(),
+ input.Begin(),
+ input.Size());
+ VerifyError(decompressedSize);
+ YT_VERIFY(decompressedSize == output.Size());
+ }
+
+private:
+ const TDictionaryDecompressionContextGuard Context_;
+ const TDigestedDecompressionDictionaryPtr DigestedDictionary_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TCompressionDictionaryBlobTag
+{ };
+
+struct TCompressionDictionarySamplesBlobTag
+{ };
+
+TErrorOr<TSharedRef> ZstdTrainCompressionDictionary(i64 dictionarySize, const std::vector<TSharedRef>& samples)
+{
+ TBlob dictionary(
+ GetRefCountedTypeCookie<TCompressionDictionaryBlobTag>(),
+ /*size*/ dictionarySize,
+ /*initiailizeStorage*/ false,
+ /*pageAligned*/ true);
+
+ std::vector<size_t> sampleSizes;
+ sampleSizes.reserve(samples.size());
+ for (const auto& sample : samples) {
+ sampleSizes.push_back(sample.size());
+ }
+
+ auto mergedSamples = samples.size() == 1
+ ? samples[0]
+ : MergeRefsToRef<TCompressionDictionarySamplesBlobTag>(samples);
+
+ auto resultDictionarySize = ZDICT_trainFromBuffer(
+ dictionary.Begin(),
+ dictionarySize,
+ mergedSamples.Begin(),
+ sampleSizes.data(),
+ sampleSizes.size());
+ if (ZSTD_isError(resultDictionarySize)) {
+ auto error = TError("Compression dictionary training failed")
+ << TErrorAttribute("zstd_error_code", static_cast<int>(ZSTD_getErrorCode(resultDictionarySize)))
+ << TErrorAttribute("zstd_error_name", ZSTD_getErrorName(resultDictionarySize));
+ YT_LOG_DEBUG(error);
+ return error;
+ }
+
+ YT_VERIFY(resultDictionarySize <= dictionary.Size());
+ dictionary.Resize(resultDictionarySize);
+
+ return TSharedRef::FromBlob(std::move(dictionary));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+IDigestedCompressionDictionaryPtr ZstdCreateDigestedCompressionDictionary(
+ const TSharedRef& compressionDictionary,
+ int compressionLevel)
+{
+ YT_VERIFY(compressionDictionary);
+
+ auto* digestedDictionary = ZSTD_createCDict(
+ compressionDictionary.Begin(),
+ compressionDictionary.Size(),
+ compressionLevel);
+ return New<TDigestedCompressionDictionary>(digestedDictionary);
+}
+
+IDigestedDecompressionDictionaryPtr ZstdCreateDigestedDecompressionDictionary(
+ const TSharedRef& compressionDictionary)
+{
+ YT_VERIFY(compressionDictionary);
+
+ auto* digestedDictionary = ZSTD_createDDict(
+ compressionDictionary.Begin(),
+ compressionDictionary.Size());
+ return New<TDigestedDecompressionDictionary>(digestedDictionary);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+IDictionaryCompressorPtr ZstdCreateDictionaryCompressor(
+ const IDigestedCompressionDictionaryPtr& digestedCompressionDictionary)
+{
+ YT_VERIFY(digestedCompressionDictionary);
+ auto* typedDictionary = dynamic_cast<TDigestedCompressionDictionary*>(digestedCompressionDictionary.Get());
+ YT_VERIFY(typedDictionary);
+
+ TDictionaryCompressionContextGuard context;
+
+ // Omit writing dictID and checksum to frame header.
+ ZSTD_frameParameters frameParameters{
+ .contentSizeFlag = 1,
+ .checksumFlag = 0,
+ .noDictIDFlag = 1,
+ };
+ VerifyError(ZSTD_CCtx_setFParams(context.GetContext(), frameParameters));
+
+ // Omit writing magic number to frame header.
+ // NB: This parameter must remain intact for compressor-decompressor compatibility.
+ VerifyError(ZSTD_CCtx_setParameter(context.GetContext(), ZSTD_c_format, ZSTD_f_zstd1_magicless));
+
+ // Omit copying digested dictionary content.
+ VerifyError(ZSTD_CCtx_refCDict(context.GetContext(), typedDictionary->GetDigestedDictionary()));
+
+ return New<TDictionaryCompressor>(std::move(context), typedDictionary);
+}
+
+IDictionaryDecompressorPtr ZstdCreateDictionaryDecompressor(
+ const IDigestedDecompressionDictionaryPtr& digestedDecompressionDictionary)
+{
+ YT_VERIFY(digestedDecompressionDictionary);
+ auto* typedDictionary = dynamic_cast<TDigestedDecompressionDictionary*>(digestedDecompressionDictionary.Get());
+ YT_VERIFY(typedDictionary);
+
+ TDictionaryDecompressionContextGuard context;
+
+ VerifyError(ZSTD_DCtx_setParameter(context.GetContext(), ZSTD_d_format, ZSTD_f_zstd1_magicless));
+
+ VerifyError(ZSTD_DCtx_refDDict(context.GetContext(), typedDictionary->GetDigestedDictionary()));
+
+ return New<TDictionaryDecompressor>(std::move(context), typedDictionary);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT::NCompression::NDetail
diff --git a/yt/yt/core/compression/zstd.h b/yt/yt/core/compression/zstd.h
index d812be590d..c8e4ab8b2f 100644
--- a/yt/yt/core/compression/zstd.h
+++ b/yt/yt/core/compression/zstd.h
@@ -1,5 +1,6 @@
#pragma once
+#include "public.h"
#include "stream.h"
namespace NYT::NCompression::NDetail {
@@ -11,5 +12,34 @@ void ZstdDecompress(TSource* source, TBlob* output);
////////////////////////////////////////////////////////////////////////////////
-} // namespace NYT::NCompression::NDetail
+int ZstdGetMinDictionarySize();
+
+int ZstdGetMaxCompressionLevel();
+int ZstdGetDefaultCompressionLevel();
+
+////////////////////////////////////////////////////////////////////////////////
+
+//! See codec.h for clarification on these functions.
+
+TErrorOr<TSharedRef> ZstdTrainCompressionDictionary(
+ i64 dictionarySize,
+ const std::vector<TSharedRef>& samples);
+
+IDictionaryCompressorPtr ZstdCreateDictionaryCompressor(
+ const IDigestedCompressionDictionaryPtr& digestedCompressionDictionary);
+IDictionaryDecompressorPtr ZstdCreateDictionaryDecompressor(
+ const IDigestedDecompressionDictionaryPtr& digestedDecompressionDictionary);
+
+IDigestedCompressionDictionaryPtr ZstdCreateDigestedCompressionDictionary(
+ const TSharedRef& compressionDictionary,
+ int compressionLevel);
+
+IDigestedDecompressionDictionaryPtr ZstdCreateDigestedDecompressionDictionary(
+ const TSharedRef& compressionDictionary);
+
+TDictionaryCompressionFrameInfo ZstdGetFrameInfo(TRef input);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NCompression::NDetail
diff --git a/yt/yt/core/misc/serialize-inl.h b/yt/yt/core/misc/serialize-inl.h
index 4546621a37..5269014db1 100644
--- a/yt/yt/core/misc/serialize-inl.h
+++ b/yt/yt/core/misc/serialize-inl.h
@@ -125,7 +125,7 @@ void ReadPod(TInput& input, T& obj)
}
template <class T>
-void ReadPod(char*& ptr, T& obj)
+void ReadPod(const char*& ptr, T& obj)
{
static_assert(TTypeTraits<T>::IsPod || std::is_trivial_v<T>, "T must be a pod-type.");
memcpy(&obj, ptr, sizeof(obj));
diff --git a/yt/yt/core/ya.make b/yt/yt/core/ya.make
index 0dc4666a01..456d013d1d 100644
--- a/yt/yt/core/ya.make
+++ b/yt/yt/core/ya.make
@@ -32,6 +32,7 @@ SRCS(
compression/brotli.cpp
compression/bzip2.cpp
compression/codec.cpp
+ compression/dictionary_codec.cpp
compression/stream.cpp
compression/lz.cpp
compression/lzma.cpp