diff options
author | akozhikhov <akozhikhov@yandex-team.com> | 2024-11-28 20:41:57 +0300 |
---|---|---|
committer | akozhikhov <akozhikhov@yandex-team.com> | 2024-11-28 21:07:12 +0300 |
commit | 8b2cd268e9cc93e17143f759bd7c7b4300b2f7c0 (patch) | |
tree | aa083dbb12889dab6d6d89d6dd49e2c038ba0192 | |
parent | de077773020afae2efc3905f4a88bdb8726efb59 (diff) | |
download | ydb-8b2cd268e9cc93e17143f759bd7c7b4300b2f7c0.tar.gz |
YT-23398: When digesting compression dictionary make memory management more explicit
commit_hash:8b61a4f302590ecce3069f4dbe797125997147de
-rw-r--r-- | yt/yt/core/compression/dictionary_codec.cpp | 25 | ||||
-rw-r--r-- | yt/yt/core/compression/dictionary_codec.h | 19 | ||||
-rw-r--r-- | yt/yt/core/compression/unittests/dictionary_compression_ut.cpp | 25 | ||||
-rw-r--r-- | yt/yt/core/compression/zstd.cpp | 104 | ||||
-rw-r--r-- | yt/yt/core/compression/zstd.h | 12 |
5 files changed, 137 insertions, 48 deletions
diff --git a/yt/yt/core/compression/dictionary_codec.cpp b/yt/yt/core/compression/dictionary_codec.cpp index 08b5ff2da7..36b158a70b 100644 --- a/yt/yt/core/compression/dictionary_codec.cpp +++ b/yt/yt/core/compression/dictionary_codec.cpp @@ -45,19 +45,34 @@ public: return ZstdCreateDictionaryDecompressor(digestedDecompressionDictionary); } - IDigestedCompressionDictionaryPtr CreateDigestedCompressionDictionary( + i64 EstimateDigestedCompressionDictionarySize(i64 dictionarySize, int compressionLevel) const override + { + return ZstdEstimateDigestedCompressionDictionarySize(dictionarySize, compressionLevel); + } + + i64 EstimateDigestedDecompressionDictionarySize(i64 dictionarySize) const override + { + return ZstdEstimateDigestedDecompressionDictionarySize(dictionarySize); + } + + IDigestedCompressionDictionaryPtr ConstructDigestedCompressionDictionary( const TSharedRef& compressionDictionary, + TSharedMutableRef storage, int compressionLevel) const override { - return ZstdCreateDigestedCompressionDictionary( + return ZstdConstructDigestedCompressionDictionary( compressionDictionary, + std::move(storage), compressionLevel); } - IDigestedDecompressionDictionaryPtr CreateDigestedDecompressionDictionary( - const TSharedRef& compressionDictionary) const override + IDigestedDecompressionDictionaryPtr ConstructDigestedDecompressionDictionary( + const TSharedRef& decompressionDictionary, + TSharedMutableRef storage) const override { - return ZstdCreateDigestedDecompressionDictionary(compressionDictionary); + return ZstdConstructDigestedDecompressionDictionary( + decompressionDictionary, + std::move(storage)); } TDictionaryCompressionFrameInfo GetFrameInfo(TRef input) const override diff --git a/yt/yt/core/compression/dictionary_codec.h b/yt/yt/core/compression/dictionary_codec.h index fc36b4812d..214d938103 100644 --- a/yt/yt/core/compression/dictionary_codec.h +++ b/yt/yt/core/compression/dictionary_codec.h @@ -95,14 +95,25 @@ struct IDictionaryCompressionCodec virtual IDictionaryDecompressorPtr CreateDictionaryDecompressor( const IDigestedDecompressionDictionaryPtr& digestedDecompressionDictionary) const = 0; - //! NB: Raw #compressionDictionary data will be copied and stored within digested dictionary in a preprocessed form. + //! NB: These functions provide means for creating digested (de)compression dictionary, + //! i.e. a special in-memory representation of a dictionary that can then be used within (de)compressor + //! for fast (de)compression of string values of any size. + //! First one must get estimation on size of the digested dictionary with one of the two methods below, + //! so memory blob of at least that size can be provided to one of the two construction methods. //! #compressionLevel determines compression level that will be applied for each compression with that dictionary later on. + virtual i64 EstimateDigestedCompressionDictionarySize(i64 dictionarySize, int compressionLevel) const = 0; + virtual i64 EstimateDigestedDecompressionDictionarySize(i64 dictionarySize) const = 0; + //! NB: Raw #compressionDictionary data will be copied and stored within digested dictionary in a preprocessed form, + //! so the memory corresponding to #compressionDictionary can be freed. Digested dictionary will own the memory, + //! referenced by #storage. //! These methods may throw. - virtual IDigestedCompressionDictionaryPtr CreateDigestedCompressionDictionary( + virtual IDigestedCompressionDictionaryPtr ConstructDigestedCompressionDictionary( const TSharedRef& compressionDictionary, + TSharedMutableRef storage, int compressionLevel) const = 0; - virtual IDigestedDecompressionDictionaryPtr CreateDigestedDecompressionDictionary( - const TSharedRef& compressionDictionary) const = 0; + virtual IDigestedDecompressionDictionaryPtr ConstructDigestedDecompressionDictionary( + const TSharedRef& decompressionDictionary, + TSharedMutableRef storage) const = 0; //! Parses header of compressed frame #input and returns specified frame info. //! This method may throw. diff --git a/yt/yt/core/compression/unittests/dictionary_compression_ut.cpp b/yt/yt/core/compression/unittests/dictionary_compression_ut.cpp index 5eb10dee42..2d00080b0d 100644 --- a/yt/yt/core/compression/unittests/dictionary_compression_ut.cpp +++ b/yt/yt/core/compression/unittests/dictionary_compression_ut.cpp @@ -65,13 +65,30 @@ protected: TCompressionContext CreateCompressionContext() const { - auto digestedCompressionDictionary = GetDictionaryCompressionCodec()->CreateDigestedCompressionDictionary( - GetCompressionDictionary().Value(), + auto dictionary = GetCompressionDictionary().Value(); + + auto dictionarySize = GetDictionaryCompressionCodec()->EstimateDigestedCompressionDictionarySize( + dictionary.Size(), + GetDictionaryCompressionCodec()->GetDefaultCompressionLevel()); + auto storage = TSharedMutableRef::Allocate( + dictionarySize, + { .InitializeStorage = false }); + + auto digestedCompressionDictionary = GetDictionaryCompressionCodec()->ConstructDigestedCompressionDictionary( + dictionary, + storage, GetDictionaryCompressionCodec()->GetDefaultCompressionLevel()); auto compressor = GetDictionaryCompressionCodec()->CreateDictionaryCompressor(digestedCompressionDictionary); - auto digestedDecompressionDictionary = GetDictionaryCompressionCodec()->CreateDigestedDecompressionDictionary( - GetCompressionDictionary().Value()); + dictionarySize = GetDictionaryCompressionCodec()->EstimateDigestedDecompressionDictionarySize( + dictionary.Size()); + storage = TSharedMutableRef::Allocate( + dictionarySize, + { .InitializeStorage = false }); + + auto digestedDecompressionDictionary = GetDictionaryCompressionCodec()->ConstructDigestedDecompressionDictionary( + dictionary, + storage); auto decompressor = GetDictionaryCompressionCodec()->CreateDictionaryDecompressor(digestedDecompressionDictionary); // NB: We do not need to store digested dictionaries as they must be referenced within (de)compressor. diff --git a/yt/yt/core/compression/zstd.cpp b/yt/yt/core/compression/zstd.cpp index 8772de735b..ba8a71aca0 100644 --- a/yt/yt/core/compression/zstd.cpp +++ b/yt/yt/core/compression/zstd.cpp @@ -241,29 +241,31 @@ class TDigestedCompressionDictionary , private TNonCopyable { public: - explicit TDigestedCompressionDictionary(ZSTD_CDict* digestedDictionary) - : DigestedDictionary_(digestedDictionary) + explicit TDigestedCompressionDictionary( + TSharedRef storage, + const ZSTD_CDict* digestedDictionary) + : Storage_(std::move(storage)) + , DigestedDictionary_(digestedDictionary) { + YT_VERIFY(Storage_); YT_VERIFY(DigestedDictionary_); } - ~TDigestedCompressionDictionary() - { - ZSTD_freeCDict(DigestedDictionary_); - } - i64 GetMemoryUsage() const override { return ZSTD_sizeof_CDict(DigestedDictionary_); } - ZSTD_CDict* GetDigestedDictionary() const + const ZSTD_CDict* GetDigestedDictionary() const { return DigestedDictionary_; } private: - ZSTD_CDict* const DigestedDictionary_; + //! NB: DigestedDictionary is initialized over preallocated memory referenced by Storage. + //! For that reason we must assure that Storage lifetime is longer and also do not need to call ZSTD_freeCDict. + const TSharedRef Storage_; + const ZSTD_CDict* const DigestedDictionary_; }; DEFINE_REFCOUNTED_TYPE(TDigestedCompressionDictionary) @@ -277,29 +279,31 @@ class TDigestedDecompressionDictionary , private TNonCopyable { public: - explicit TDigestedDecompressionDictionary(ZSTD_DDict* digestedDictionary) - : DigestedDictionary_(digestedDictionary) + explicit TDigestedDecompressionDictionary( + TSharedRef storage, + const ZSTD_DDict* digestedDictionary) + : Storage_(std::move(storage)) + , DigestedDictionary_(digestedDictionary) { + YT_VERIFY(Storage_); YT_VERIFY(DigestedDictionary_); } - ~TDigestedDecompressionDictionary() - { - ZSTD_freeDDict(DigestedDictionary_); - } - i64 GetMemoryUsage() const override { return ZSTD_sizeof_DDict(DigestedDictionary_); } - ZSTD_DDict* GetDigestedDictionary() const + const ZSTD_DDict* GetDigestedDictionary() const { return DigestedDictionary_; } private: - ZSTD_DDict* const DigestedDictionary_; + //! NB: DigestedDictionary is initialized over preallocated memory referenced by Storage. + //! For that reason we must assure that Storage lifetime is longer and also do not need to call ZSTD_freeDDict. + const TSharedRef Storage_; + const ZSTD_DDict* const DigestedDictionary_; }; DEFINE_REFCOUNTED_TYPE(TDigestedDecompressionDictionary) @@ -488,38 +492,76 @@ TErrorOr<TSharedRef> ZstdTrainCompressionDictionary(i64 dictionarySize, const st //////////////////////////////////////////////////////////////////////////////// -IDigestedCompressionDictionaryPtr ZstdCreateDigestedCompressionDictionary( +i64 ZstdEstimateDigestedCompressionDictionarySize(i64 dictionarySize, int compressionLevel) +{ + return ZSTD_estimateCDictSize(dictionarySize, compressionLevel); +} + +i64 ZstdEstimateDigestedDecompressionDictionarySize(i64 dictionarySize) +{ + return ZSTD_estimateDDictSize(dictionarySize, ZSTD_dlm_byCopy); +} + +IDigestedCompressionDictionaryPtr ZstdConstructDigestedCompressionDictionary( const TSharedRef& compressionDictionary, + TSharedMutableRef storage, int compressionLevel) { YT_VERIFY(compressionDictionary); + YT_VERIFY(storage); YT_VERIFY(compressionLevel >= 0 && compressionLevel <= ZstdGetMaxCompressionLevel()); + YT_VERIFY(AlignUp(storage.Begin(), 8ull) == storage.Begin()); + + // XXX(akozhikhov): We have to assign #srcSizeHint here to 1 instead of 0 (as a default unknown value) + // because of the mode that this function sets upon calling its internal implementation. + // According to documentation 'ZSTD_cParamMode_e::ZSTD_cpm_createCDict' and 'ZSTD_cParamMode_e::ZSTD_cpm_unknown' seems to + // have the same effect, however for some reason this is not true in case of zero #srcSizeHint. + auto compressionParams = ZSTD_getCParams( + compressionLevel, + /*srcSizeHint*/ 1, + compressionDictionary.Size()); - auto* digestedDictionary = ZSTD_createCDict( + auto* digestedDictionary = ZSTD_initStaticCDict( + storage.Begin(), + storage.Size(), compressionDictionary.Begin(), compressionDictionary.Size(), - compressionLevel); + ZSTD_dlm_byCopy, + ZSTD_dct_auto, + compressionParams); + if (!digestedDictionary) { THROW_ERROR_EXCEPTION("Failed to create digested compression dictionary") << TErrorAttribute("compression_level", compressionLevel) - << TErrorAttribute("dictionary_size", compressionDictionary.Size()); + << TErrorAttribute("dictionary_size", compressionDictionary.Size()) + << TErrorAttribute("storage_size", storage.Size()); } - return New<TDigestedCompressionDictionary>(digestedDictionary); + + return New<TDigestedCompressionDictionary>(std::move(storage), digestedDictionary); } -IDigestedDecompressionDictionaryPtr ZstdCreateDigestedDecompressionDictionary( - const TSharedRef& compressionDictionary) +IDigestedDecompressionDictionaryPtr ZstdConstructDigestedDecompressionDictionary( + const TSharedRef& decompressionDictionary, + TSharedMutableRef storage) { - YT_VERIFY(compressionDictionary); + YT_VERIFY(decompressionDictionary); + YT_VERIFY(storage); + + auto* digestedDictionary = ZSTD_initStaticDDict( + storage.Begin(), + storage.Size(), + decompressionDictionary.Begin(), + decompressionDictionary.Size(), + ZSTD_dlm_byCopy, + ZSTD_dct_auto); - auto* digestedDictionary = ZSTD_createDDict( - compressionDictionary.Begin(), - compressionDictionary.Size()); if (!digestedDictionary) { THROW_ERROR_EXCEPTION("Failed to create digested decompression dictionary") - << TErrorAttribute("dictionary_size", compressionDictionary.Size()); + << TErrorAttribute("dictionary_size", decompressionDictionary.Size()) + << TErrorAttribute("storage_size", storage.Size()); } - return New<TDigestedDecompressionDictionary>(digestedDictionary); + + return New<TDigestedDecompressionDictionary>(std::move(storage), digestedDictionary); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/compression/zstd.h b/yt/yt/core/compression/zstd.h index c8e4ab8b2f..ef58708e8f 100644 --- a/yt/yt/core/compression/zstd.h +++ b/yt/yt/core/compression/zstd.h @@ -31,12 +31,16 @@ IDictionaryCompressorPtr ZstdCreateDictionaryCompressor( IDictionaryDecompressorPtr ZstdCreateDictionaryDecompressor( const IDigestedDecompressionDictionaryPtr& digestedDecompressionDictionary); -IDigestedCompressionDictionaryPtr ZstdCreateDigestedCompressionDictionary( +i64 ZstdEstimateDigestedCompressionDictionarySize(i64 dictionarySize, int compressionLevel); +i64 ZstdEstimateDigestedDecompressionDictionarySize(i64 dictionarySize); + +IDigestedCompressionDictionaryPtr ZstdConstructDigestedCompressionDictionary( const TSharedRef& compressionDictionary, + TSharedMutableRef storage, int compressionLevel); - -IDigestedDecompressionDictionaryPtr ZstdCreateDigestedDecompressionDictionary( - const TSharedRef& compressionDictionary); +IDigestedDecompressionDictionaryPtr ZstdConstructDigestedDecompressionDictionary( + const TSharedRef& decompressionDictionary, + TSharedMutableRef storage); TDictionaryCompressionFrameInfo ZstdGetFrameInfo(TRef input); |