aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorakozhikhov <akozhikhov@yandex-team.com>2024-11-28 20:41:57 +0300
committerakozhikhov <akozhikhov@yandex-team.com>2024-11-28 21:07:12 +0300
commit8b2cd268e9cc93e17143f759bd7c7b4300b2f7c0 (patch)
treeaa083dbb12889dab6d6d89d6dd49e2c038ba0192
parentde077773020afae2efc3905f4a88bdb8726efb59 (diff)
downloadydb-8b2cd268e9cc93e17143f759bd7c7b4300b2f7c0.tar.gz
YT-23398: When digesting compression dictionary make memory management more explicit
commit_hash:8b61a4f302590ecce3069f4dbe797125997147de
-rw-r--r--yt/yt/core/compression/dictionary_codec.cpp25
-rw-r--r--yt/yt/core/compression/dictionary_codec.h19
-rw-r--r--yt/yt/core/compression/unittests/dictionary_compression_ut.cpp25
-rw-r--r--yt/yt/core/compression/zstd.cpp104
-rw-r--r--yt/yt/core/compression/zstd.h12
5 files changed, 137 insertions, 48 deletions
diff --git a/yt/yt/core/compression/dictionary_codec.cpp b/yt/yt/core/compression/dictionary_codec.cpp
index 08b5ff2da7..36b158a70b 100644
--- a/yt/yt/core/compression/dictionary_codec.cpp
+++ b/yt/yt/core/compression/dictionary_codec.cpp
@@ -45,19 +45,34 @@ public:
return ZstdCreateDictionaryDecompressor(digestedDecompressionDictionary);
}
- IDigestedCompressionDictionaryPtr CreateDigestedCompressionDictionary(
+ i64 EstimateDigestedCompressionDictionarySize(i64 dictionarySize, int compressionLevel) const override
+ {
+ return ZstdEstimateDigestedCompressionDictionarySize(dictionarySize, compressionLevel);
+ }
+
+ i64 EstimateDigestedDecompressionDictionarySize(i64 dictionarySize) const override
+ {
+ return ZstdEstimateDigestedDecompressionDictionarySize(dictionarySize);
+ }
+
+ IDigestedCompressionDictionaryPtr ConstructDigestedCompressionDictionary(
const TSharedRef& compressionDictionary,
+ TSharedMutableRef storage,
int compressionLevel) const override
{
- return ZstdCreateDigestedCompressionDictionary(
+ return ZstdConstructDigestedCompressionDictionary(
compressionDictionary,
+ std::move(storage),
compressionLevel);
}
- IDigestedDecompressionDictionaryPtr CreateDigestedDecompressionDictionary(
- const TSharedRef& compressionDictionary) const override
+ IDigestedDecompressionDictionaryPtr ConstructDigestedDecompressionDictionary(
+ const TSharedRef& decompressionDictionary,
+ TSharedMutableRef storage) const override
{
- return ZstdCreateDigestedDecompressionDictionary(compressionDictionary);
+ return ZstdConstructDigestedDecompressionDictionary(
+ decompressionDictionary,
+ std::move(storage));
}
TDictionaryCompressionFrameInfo GetFrameInfo(TRef input) const override
diff --git a/yt/yt/core/compression/dictionary_codec.h b/yt/yt/core/compression/dictionary_codec.h
index fc36b4812d..214d938103 100644
--- a/yt/yt/core/compression/dictionary_codec.h
+++ b/yt/yt/core/compression/dictionary_codec.h
@@ -95,14 +95,25 @@ struct IDictionaryCompressionCodec
virtual IDictionaryDecompressorPtr CreateDictionaryDecompressor(
const IDigestedDecompressionDictionaryPtr& digestedDecompressionDictionary) const = 0;
- //! NB: Raw #compressionDictionary data will be copied and stored within digested dictionary in a preprocessed form.
+ //! NB: These functions provide means for creating digested (de)compression dictionary,
+ //! i.e. a special in-memory representation of a dictionary that can then be used within (de)compressor
+ //! for fast (de)compression of string values of any size.
+ //! First one must get estimation on size of the digested dictionary with one of the two methods below,
+ //! so memory blob of at least that size can be provided to one of the two construction methods.
//! #compressionLevel determines compression level that will be applied for each compression with that dictionary later on.
+ virtual i64 EstimateDigestedCompressionDictionarySize(i64 dictionarySize, int compressionLevel) const = 0;
+ virtual i64 EstimateDigestedDecompressionDictionarySize(i64 dictionarySize) const = 0;
+ //! NB: Raw #compressionDictionary data will be copied and stored within digested dictionary in a preprocessed form,
+ //! so the memory corresponding to #compressionDictionary can be freed. Digested dictionary will own the memory,
+ //! referenced by #storage.
//! These methods may throw.
- virtual IDigestedCompressionDictionaryPtr CreateDigestedCompressionDictionary(
+ virtual IDigestedCompressionDictionaryPtr ConstructDigestedCompressionDictionary(
const TSharedRef& compressionDictionary,
+ TSharedMutableRef storage,
int compressionLevel) const = 0;
- virtual IDigestedDecompressionDictionaryPtr CreateDigestedDecompressionDictionary(
- const TSharedRef& compressionDictionary) const = 0;
+ virtual IDigestedDecompressionDictionaryPtr ConstructDigestedDecompressionDictionary(
+ const TSharedRef& decompressionDictionary,
+ TSharedMutableRef storage) const = 0;
//! Parses header of compressed frame #input and returns specified frame info.
//! This method may throw.
diff --git a/yt/yt/core/compression/unittests/dictionary_compression_ut.cpp b/yt/yt/core/compression/unittests/dictionary_compression_ut.cpp
index 5eb10dee42..2d00080b0d 100644
--- a/yt/yt/core/compression/unittests/dictionary_compression_ut.cpp
+++ b/yt/yt/core/compression/unittests/dictionary_compression_ut.cpp
@@ -65,13 +65,30 @@ protected:
TCompressionContext CreateCompressionContext() const
{
- auto digestedCompressionDictionary = GetDictionaryCompressionCodec()->CreateDigestedCompressionDictionary(
- GetCompressionDictionary().Value(),
+ auto dictionary = GetCompressionDictionary().Value();
+
+ auto dictionarySize = GetDictionaryCompressionCodec()->EstimateDigestedCompressionDictionarySize(
+ dictionary.Size(),
+ GetDictionaryCompressionCodec()->GetDefaultCompressionLevel());
+ auto storage = TSharedMutableRef::Allocate(
+ dictionarySize,
+ { .InitializeStorage = false });
+
+ auto digestedCompressionDictionary = GetDictionaryCompressionCodec()->ConstructDigestedCompressionDictionary(
+ dictionary,
+ storage,
GetDictionaryCompressionCodec()->GetDefaultCompressionLevel());
auto compressor = GetDictionaryCompressionCodec()->CreateDictionaryCompressor(digestedCompressionDictionary);
- auto digestedDecompressionDictionary = GetDictionaryCompressionCodec()->CreateDigestedDecompressionDictionary(
- GetCompressionDictionary().Value());
+ dictionarySize = GetDictionaryCompressionCodec()->EstimateDigestedDecompressionDictionarySize(
+ dictionary.Size());
+ storage = TSharedMutableRef::Allocate(
+ dictionarySize,
+ { .InitializeStorage = false });
+
+ auto digestedDecompressionDictionary = GetDictionaryCompressionCodec()->ConstructDigestedDecompressionDictionary(
+ dictionary,
+ storage);
auto decompressor = GetDictionaryCompressionCodec()->CreateDictionaryDecompressor(digestedDecompressionDictionary);
// NB: We do not need to store digested dictionaries as they must be referenced within (de)compressor.
diff --git a/yt/yt/core/compression/zstd.cpp b/yt/yt/core/compression/zstd.cpp
index 8772de735b..ba8a71aca0 100644
--- a/yt/yt/core/compression/zstd.cpp
+++ b/yt/yt/core/compression/zstd.cpp
@@ -241,29 +241,31 @@ class TDigestedCompressionDictionary
, private TNonCopyable
{
public:
- explicit TDigestedCompressionDictionary(ZSTD_CDict* digestedDictionary)
- : DigestedDictionary_(digestedDictionary)
+ explicit TDigestedCompressionDictionary(
+ TSharedRef storage,
+ const ZSTD_CDict* digestedDictionary)
+ : Storage_(std::move(storage))
+ , DigestedDictionary_(digestedDictionary)
{
+ YT_VERIFY(Storage_);
YT_VERIFY(DigestedDictionary_);
}
- ~TDigestedCompressionDictionary()
- {
- ZSTD_freeCDict(DigestedDictionary_);
- }
-
i64 GetMemoryUsage() const override
{
return ZSTD_sizeof_CDict(DigestedDictionary_);
}
- ZSTD_CDict* GetDigestedDictionary() const
+ const ZSTD_CDict* GetDigestedDictionary() const
{
return DigestedDictionary_;
}
private:
- ZSTD_CDict* const DigestedDictionary_;
+ //! NB: DigestedDictionary is initialized over preallocated memory referenced by Storage.
+ //! For that reason we must assure that Storage lifetime is longer and also do not need to call ZSTD_freeCDict.
+ const TSharedRef Storage_;
+ const ZSTD_CDict* const DigestedDictionary_;
};
DEFINE_REFCOUNTED_TYPE(TDigestedCompressionDictionary)
@@ -277,29 +279,31 @@ class TDigestedDecompressionDictionary
, private TNonCopyable
{
public:
- explicit TDigestedDecompressionDictionary(ZSTD_DDict* digestedDictionary)
- : DigestedDictionary_(digestedDictionary)
+ explicit TDigestedDecompressionDictionary(
+ TSharedRef storage,
+ const ZSTD_DDict* digestedDictionary)
+ : Storage_(std::move(storage))
+ , DigestedDictionary_(digestedDictionary)
{
+ YT_VERIFY(Storage_);
YT_VERIFY(DigestedDictionary_);
}
- ~TDigestedDecompressionDictionary()
- {
- ZSTD_freeDDict(DigestedDictionary_);
- }
-
i64 GetMemoryUsage() const override
{
return ZSTD_sizeof_DDict(DigestedDictionary_);
}
- ZSTD_DDict* GetDigestedDictionary() const
+ const ZSTD_DDict* GetDigestedDictionary() const
{
return DigestedDictionary_;
}
private:
- ZSTD_DDict* const DigestedDictionary_;
+ //! NB: DigestedDictionary is initialized over preallocated memory referenced by Storage.
+ //! For that reason we must assure that Storage lifetime is longer and also do not need to call ZSTD_freeDDict.
+ const TSharedRef Storage_;
+ const ZSTD_DDict* const DigestedDictionary_;
};
DEFINE_REFCOUNTED_TYPE(TDigestedDecompressionDictionary)
@@ -488,38 +492,76 @@ TErrorOr<TSharedRef> ZstdTrainCompressionDictionary(i64 dictionarySize, const st
////////////////////////////////////////////////////////////////////////////////
-IDigestedCompressionDictionaryPtr ZstdCreateDigestedCompressionDictionary(
+i64 ZstdEstimateDigestedCompressionDictionarySize(i64 dictionarySize, int compressionLevel)
+{
+ return ZSTD_estimateCDictSize(dictionarySize, compressionLevel);
+}
+
+i64 ZstdEstimateDigestedDecompressionDictionarySize(i64 dictionarySize)
+{
+ return ZSTD_estimateDDictSize(dictionarySize, ZSTD_dlm_byCopy);
+}
+
+IDigestedCompressionDictionaryPtr ZstdConstructDigestedCompressionDictionary(
const TSharedRef& compressionDictionary,
+ TSharedMutableRef storage,
int compressionLevel)
{
YT_VERIFY(compressionDictionary);
+ YT_VERIFY(storage);
YT_VERIFY(compressionLevel >= 0 && compressionLevel <= ZstdGetMaxCompressionLevel());
+ YT_VERIFY(AlignUp(storage.Begin(), 8ull) == storage.Begin());
+
+ // XXX(akozhikhov): We have to assign #srcSizeHint here to 1 instead of 0 (as a default unknown value)
+ // because of the mode that this function sets upon calling its internal implementation.
+ // According to documentation 'ZSTD_cParamMode_e::ZSTD_cpm_createCDict' and 'ZSTD_cParamMode_e::ZSTD_cpm_unknown' seems to
+ // have the same effect, however for some reason this is not true in case of zero #srcSizeHint.
+ auto compressionParams = ZSTD_getCParams(
+ compressionLevel,
+ /*srcSizeHint*/ 1,
+ compressionDictionary.Size());
- auto* digestedDictionary = ZSTD_createCDict(
+ auto* digestedDictionary = ZSTD_initStaticCDict(
+ storage.Begin(),
+ storage.Size(),
compressionDictionary.Begin(),
compressionDictionary.Size(),
- compressionLevel);
+ ZSTD_dlm_byCopy,
+ ZSTD_dct_auto,
+ compressionParams);
+
if (!digestedDictionary) {
THROW_ERROR_EXCEPTION("Failed to create digested compression dictionary")
<< TErrorAttribute("compression_level", compressionLevel)
- << TErrorAttribute("dictionary_size", compressionDictionary.Size());
+ << TErrorAttribute("dictionary_size", compressionDictionary.Size())
+ << TErrorAttribute("storage_size", storage.Size());
}
- return New<TDigestedCompressionDictionary>(digestedDictionary);
+
+ return New<TDigestedCompressionDictionary>(std::move(storage), digestedDictionary);
}
-IDigestedDecompressionDictionaryPtr ZstdCreateDigestedDecompressionDictionary(
- const TSharedRef& compressionDictionary)
+IDigestedDecompressionDictionaryPtr ZstdConstructDigestedDecompressionDictionary(
+ const TSharedRef& decompressionDictionary,
+ TSharedMutableRef storage)
{
- YT_VERIFY(compressionDictionary);
+ YT_VERIFY(decompressionDictionary);
+ YT_VERIFY(storage);
+
+ auto* digestedDictionary = ZSTD_initStaticDDict(
+ storage.Begin(),
+ storage.Size(),
+ decompressionDictionary.Begin(),
+ decompressionDictionary.Size(),
+ ZSTD_dlm_byCopy,
+ ZSTD_dct_auto);
- auto* digestedDictionary = ZSTD_createDDict(
- compressionDictionary.Begin(),
- compressionDictionary.Size());
if (!digestedDictionary) {
THROW_ERROR_EXCEPTION("Failed to create digested decompression dictionary")
- << TErrorAttribute("dictionary_size", compressionDictionary.Size());
+ << TErrorAttribute("dictionary_size", decompressionDictionary.Size())
+ << TErrorAttribute("storage_size", storage.Size());
}
- return New<TDigestedDecompressionDictionary>(digestedDictionary);
+
+ return New<TDigestedDecompressionDictionary>(std::move(storage), digestedDictionary);
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/compression/zstd.h b/yt/yt/core/compression/zstd.h
index c8e4ab8b2f..ef58708e8f 100644
--- a/yt/yt/core/compression/zstd.h
+++ b/yt/yt/core/compression/zstd.h
@@ -31,12 +31,16 @@ IDictionaryCompressorPtr ZstdCreateDictionaryCompressor(
IDictionaryDecompressorPtr ZstdCreateDictionaryDecompressor(
const IDigestedDecompressionDictionaryPtr& digestedDecompressionDictionary);
-IDigestedCompressionDictionaryPtr ZstdCreateDigestedCompressionDictionary(
+i64 ZstdEstimateDigestedCompressionDictionarySize(i64 dictionarySize, int compressionLevel);
+i64 ZstdEstimateDigestedDecompressionDictionarySize(i64 dictionarySize);
+
+IDigestedCompressionDictionaryPtr ZstdConstructDigestedCompressionDictionary(
const TSharedRef& compressionDictionary,
+ TSharedMutableRef storage,
int compressionLevel);
-
-IDigestedDecompressionDictionaryPtr ZstdCreateDigestedDecompressionDictionary(
- const TSharedRef& compressionDictionary);
+IDigestedDecompressionDictionaryPtr ZstdConstructDigestedDecompressionDictionary(
+ const TSharedRef& decompressionDictionary,
+ TSharedMutableRef storage);
TDictionaryCompressionFrameInfo ZstdGetFrameInfo(TRef input);