diff options
author | Anton Samokhvalov <pg83@yandex.ru> | 2022-02-10 16:45:17 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:17 +0300 |
commit | d3a398281c6fd1d3672036cb2d63f842d2cb28c5 (patch) | |
tree | dd4bd3ca0f36b817e96812825ffaf10d645803f2 /library/cpp/blockcodecs | |
parent | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (diff) | |
download | ydb-d3a398281c6fd1d3672036cb2d63f842d2cb28c5.tar.gz |
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/blockcodecs')
-rw-r--r-- | library/cpp/blockcodecs/codecs/legacy_zstd06/legacy_zstd06.cpp | 74 | ||||
-rw-r--r-- | library/cpp/blockcodecs/codecs_ut.cpp | 396 | ||||
-rw-r--r-- | library/cpp/blockcodecs/core/codecs.cpp | 176 | ||||
-rw-r--r-- | library/cpp/blockcodecs/core/codecs.h | 110 | ||||
-rw-r--r-- | library/cpp/blockcodecs/core/common.h | 138 | ||||
-rw-r--r-- | library/cpp/blockcodecs/core/stream.cpp | 362 | ||||
-rw-r--r-- | library/cpp/blockcodecs/core/stream.h | 66 | ||||
-rw-r--r-- | library/cpp/blockcodecs/core/ya.make | 20 | ||||
-rw-r--r-- | library/cpp/blockcodecs/fuzz/main.cpp | 14 | ||||
-rw-r--r-- | library/cpp/blockcodecs/fuzz/ya.make | 6 | ||||
-rw-r--r-- | library/cpp/blockcodecs/ut/ya.make | 10 |
11 files changed, 686 insertions, 686 deletions
diff --git a/library/cpp/blockcodecs/codecs/legacy_zstd06/legacy_zstd06.cpp b/library/cpp/blockcodecs/codecs/legacy_zstd06/legacy_zstd06.cpp index 7319008420..042f031679 100644 --- a/library/cpp/blockcodecs/codecs/legacy_zstd06/legacy_zstd06.cpp +++ b/library/cpp/blockcodecs/codecs/legacy_zstd06/legacy_zstd06.cpp @@ -1,58 +1,58 @@ #include <library/cpp/blockcodecs/core/codecs.h> #include <library/cpp/blockcodecs/core/common.h> #include <library/cpp/blockcodecs/core/register.h> - + #include <contrib/libs/zstd06/common/zstd.h> #include <contrib/libs/zstd06/common/zstd_static.h> - -using namespace NBlockCodecs; - -namespace { + +using namespace NBlockCodecs; + +namespace { struct TZStd06Codec: public TAddLengthCodec<TZStd06Codec> { inline TZStd06Codec(unsigned level) - : Level(level) + : Level(level) , MyName(TStringBuf("zstd06_") + ToString(Level)) - { - } - - static inline size_t CheckError(size_t ret, const char* what) { - if (ZSTD_isError(ret)) { + { + } + + static inline size_t CheckError(size_t ret, const char* what) { + if (ZSTD_isError(ret)) { ythrow yexception() << what << TStringBuf(" zstd error: ") << ZSTD_getErrorName(ret); - } - - return ret; - } - + } + + return ret; + } + static inline size_t DoMaxCompressedLength(size_t l) noexcept { - return ZSTD_compressBound(l); - } - - inline size_t DoCompress(const TData& in, void* out) const { + return ZSTD_compressBound(l); + } + + inline size_t DoCompress(const TData& in, void* out) const { return CheckError(ZSTD_compress(out, DoMaxCompressedLength(in.size()), in.data(), in.size(), Level), "compress"); - } - - inline void DoDecompress(const TData& in, void* out, size_t dsize) const { + } + + inline void DoDecompress(const TData& in, void* out, size_t dsize) const { const size_t res = CheckError(ZSTD_decompress(out, dsize, in.data(), in.size()), "decompress"); - - if (res != dsize) { - ythrow TDecompressError(dsize, res); - } - } - + + if (res != dsize) { + ythrow TDecompressError(dsize, res); + } + } + TStringBuf Name() const noexcept override { - return MyName; - } - - const unsigned Level; + return MyName; + } + + const unsigned Level; const TString MyName; - }; - + }; + struct TZStd06Registrar { TZStd06Registrar() { for (unsigned i = 1; i <= ZSTD_maxCLevel(); ++i) { RegisterCodec(MakeHolder<TZStd06Codec>(i)); } - } + } }; const TZStd06Registrar Registrar{}; -} +} diff --git a/library/cpp/blockcodecs/codecs_ut.cpp b/library/cpp/blockcodecs/codecs_ut.cpp index 62acd66f9f..bfe5a23690 100644 --- a/library/cpp/blockcodecs/codecs_ut.cpp +++ b/library/cpp/blockcodecs/codecs_ut.cpp @@ -1,24 +1,24 @@ -#include "codecs.h" -#include "stream.h" - +#include "codecs.h" +#include "stream.h" + #include <library/cpp/testing/unittest/registar.h> - -#include <util/stream/str.h> + +#include <util/stream/str.h> #include <util/string/join.h> -#include <util/digest/multi.h> - +#include <util/digest/multi.h> + Y_UNIT_TEST_SUITE(TBlockCodecsTest) { - using namespace NBlockCodecs; - + using namespace NBlockCodecs; + TBuffer Buffer(TStringBuf b) { TBuffer bb; bb.Assign(b.data(), b.size()); return bb; } - void TestAllAtOnce(size_t n, size_t m) { + void TestAllAtOnce(size_t n, size_t m) { TVector<TBuffer> datas; - + datas.emplace_back(); datas.push_back(Buffer("na gorshke sidel korol")); datas.push_back(Buffer(TStringBuf("", 1))); @@ -26,249 +26,249 @@ Y_UNIT_TEST_SUITE(TBlockCodecsTest) { datas.push_back(Buffer(" ")); datas.push_back(Buffer(" ")); datas.push_back(Buffer(" ")); - - { - TStringStream data; - - for (size_t i = 0; i < 1024; ++i) { - data << " " << i; - } - - datas.push_back(Buffer(data.Str())); - } - - TCodecList lst = ListAllCodecs(); - + + { + TStringStream data; + + for (size_t i = 0; i < 1024; ++i) { + data << " " << i; + } + + datas.push_back(Buffer(data.Str())); + } + + TCodecList lst = ListAllCodecs(); + for (size_t i = 0; i < lst.size(); ++i) { - const ICodec* c = Codec(lst[i]); - const auto h = MultiHash(c->Name(), i, 1); - - if (h % n == m) { - } else { - continue; - } - + const ICodec* c = Codec(lst[i]); + const auto h = MultiHash(c->Name(), i, 1); + + if (h % n == m) { + } else { + continue; + } + for (size_t j = 0; j < datas.size(); ++j) { const TBuffer& data = datas[j]; TString res; - try { + try { TBuffer e, d; c->Encode(data, e); c->Decode(e, d); d.AsString(res); UNIT_ASSERT_EQUAL(NBlockCodecs::TData(res), NBlockCodecs::TData(data)); - } catch (...) { + } catch (...) { Cerr << c->Name() << "(" << res.Quote() << ")(" << TString{NBlockCodecs::TData(data)}.Quote() << ")" << Endl; - - throw; - } - } - } - } - + + throw; + } + } + } + } + Y_UNIT_TEST(TestAllAtOnce0) { - TestAllAtOnce(20, 0); - } - + TestAllAtOnce(20, 0); + } + Y_UNIT_TEST(TestAllAtOnce1) { - TestAllAtOnce(20, 1); - } - + TestAllAtOnce(20, 1); + } + Y_UNIT_TEST(TestAllAtOnce2) { - TestAllAtOnce(20, 2); - } - + TestAllAtOnce(20, 2); + } + Y_UNIT_TEST(TestAllAtOnce3) { - TestAllAtOnce(20, 3); - } - + TestAllAtOnce(20, 3); + } + Y_UNIT_TEST(TestAllAtOnce4) { - TestAllAtOnce(20, 4); - } - + TestAllAtOnce(20, 4); + } + Y_UNIT_TEST(TestAllAtOnce5) { - TestAllAtOnce(20, 5); - } - + TestAllAtOnce(20, 5); + } + Y_UNIT_TEST(TestAllAtOnce6) { - TestAllAtOnce(20, 6); - } - + TestAllAtOnce(20, 6); + } + Y_UNIT_TEST(TestAllAtOnce7) { - TestAllAtOnce(20, 7); - } - + TestAllAtOnce(20, 7); + } + Y_UNIT_TEST(TestAllAtOnce8) { - TestAllAtOnce(20, 8); - } - + TestAllAtOnce(20, 8); + } + Y_UNIT_TEST(TestAllAtOnce9) { - TestAllAtOnce(20, 9); - } - + TestAllAtOnce(20, 9); + } + Y_UNIT_TEST(TestAllAtOnce10) { - TestAllAtOnce(20, 10); - } - + TestAllAtOnce(20, 10); + } + Y_UNIT_TEST(TestAllAtOnce12) { - TestAllAtOnce(20, 12); - } - + TestAllAtOnce(20, 12); + } + Y_UNIT_TEST(TestAllAtOnce13) { - TestAllAtOnce(20, 13); - } - + TestAllAtOnce(20, 13); + } + Y_UNIT_TEST(TestAllAtOnce14) { - TestAllAtOnce(20, 14); - } - + TestAllAtOnce(20, 14); + } + Y_UNIT_TEST(TestAllAtOnce15) { - TestAllAtOnce(20, 15); - } - + TestAllAtOnce(20, 15); + } + Y_UNIT_TEST(TestAllAtOnce16) { - TestAllAtOnce(20, 16); - } - + TestAllAtOnce(20, 16); + } + Y_UNIT_TEST(TestAllAtOnce17) { - TestAllAtOnce(20, 17); - } - + TestAllAtOnce(20, 17); + } + Y_UNIT_TEST(TestAllAtOnce18) { - TestAllAtOnce(20, 18); - } - + TestAllAtOnce(20, 18); + } + Y_UNIT_TEST(TestAllAtOnce19) { - TestAllAtOnce(20, 19); - } - - void TestStreams(size_t n, size_t m) { + TestAllAtOnce(20, 19); + } + + void TestStreams(size_t n, size_t m) { TVector<TString> datas; TString res; - - for (size_t i = 0; i < 256; ++i) { + + for (size_t i = 0; i < 256; ++i) { datas.push_back(TString(i, (char)(i % 128))); - } - + } + for (size_t i = 0; i < datas.size(); ++i) { - res += datas[i]; - } - - TCodecList lst = ListAllCodecs(); - + res += datas[i]; + } + + TCodecList lst = ListAllCodecs(); + for (size_t i = 0; i < lst.size(); ++i) { - TStringStream ss; - - const ICodec* c = Codec(lst[i]); - const auto h = MultiHash(c->Name(), i, 2); - - if (h % n == m) { - } else { - continue; - } - - { - TCodedOutput out(&ss, c, 1234); - + TStringStream ss; + + const ICodec* c = Codec(lst[i]); + const auto h = MultiHash(c->Name(), i, 2); + + if (h % n == m) { + } else { + continue; + } + + { + TCodedOutput out(&ss, c, 1234); + for (size_t j = 0; j < datas.size(); ++j) { - out << datas[j]; - } - - out.Finish(); - } - + out << datas[j]; + } + + out.Finish(); + } + const TString resNew = TDecodedInput(&ss).ReadAll(); - - try { - UNIT_ASSERT_EQUAL(resNew, res); - } catch (...) { - Cerr << c->Name() << Endl; - - throw; - } - } - } - + + try { + UNIT_ASSERT_EQUAL(resNew, res); + } catch (...) { + Cerr << c->Name() << Endl; + + throw; + } + } + } + Y_UNIT_TEST(TestStreams0) { - TestStreams(20, 0); - } - + TestStreams(20, 0); + } + Y_UNIT_TEST(TestStreams1) { - TestStreams(20, 1); - } - + TestStreams(20, 1); + } + Y_UNIT_TEST(TestStreams2) { - TestStreams(20, 2); - } - + TestStreams(20, 2); + } + Y_UNIT_TEST(TestStreams3) { - TestStreams(20, 3); - } - + TestStreams(20, 3); + } + Y_UNIT_TEST(TestStreams4) { - TestStreams(20, 4); - } - + TestStreams(20, 4); + } + Y_UNIT_TEST(TestStreams5) { - TestStreams(20, 5); - } - + TestStreams(20, 5); + } + Y_UNIT_TEST(TestStreams6) { - TestStreams(20, 6); - } - + TestStreams(20, 6); + } + Y_UNIT_TEST(TestStreams7) { - TestStreams(20, 7); - } - + TestStreams(20, 7); + } + Y_UNIT_TEST(TestStreams8) { - TestStreams(20, 8); - } - + TestStreams(20, 8); + } + Y_UNIT_TEST(TestStreams9) { - TestStreams(20, 9); - } - + TestStreams(20, 9); + } + Y_UNIT_TEST(TestStreams10) { - TestStreams(20, 10); - } - + TestStreams(20, 10); + } + Y_UNIT_TEST(TestStreams11) { - TestStreams(20, 11); - } - + TestStreams(20, 11); + } + Y_UNIT_TEST(TestStreams12) { - TestStreams(20, 12); - } - + TestStreams(20, 12); + } + Y_UNIT_TEST(TestStreams13) { - TestStreams(20, 13); - } - + TestStreams(20, 13); + } + Y_UNIT_TEST(TestStreams14) { - TestStreams(20, 14); - } - + TestStreams(20, 14); + } + Y_UNIT_TEST(TestStreams15) { - TestStreams(20, 15); - } - + TestStreams(20, 15); + } + Y_UNIT_TEST(TestStreams16) { - TestStreams(20, 16); - } - + TestStreams(20, 16); + } + Y_UNIT_TEST(TestStreams17) { - TestStreams(20, 17); - } - + TestStreams(20, 17); + } + Y_UNIT_TEST(TestStreams18) { - TestStreams(20, 18); - } - + TestStreams(20, 18); + } + Y_UNIT_TEST(TestStreams19) { - TestStreams(20, 19); - } + TestStreams(20, 19); + } Y_UNIT_TEST(TestMaxPossibleDecompressedSize) { @@ -337,4 +337,4 @@ Y_UNIT_TEST_SUITE(TBlockCodecsTest) { UNIT_ASSERT_VALUES_EQUAL(decoded, data); } } -} +} diff --git a/library/cpp/blockcodecs/core/codecs.cpp b/library/cpp/blockcodecs/core/codecs.cpp index 4f6b15b550..21506e812b 100644 --- a/library/cpp/blockcodecs/core/codecs.cpp +++ b/library/cpp/blockcodecs/core/codecs.cpp @@ -1,91 +1,91 @@ -#include "codecs.h" +#include "codecs.h" #include "common.h" #include "register.h" - -#include <util/ysaveload.h> + +#include <util/ysaveload.h> #include <util/stream/null.h> -#include <util/stream/mem.h> -#include <util/string/cast.h> -#include <util/string/join.h> -#include <util/system/align.h> -#include <util/system/unaligned_mem.h> -#include <util/generic/hash.h> -#include <util/generic/cast.h> +#include <util/stream/mem.h> +#include <util/string/cast.h> +#include <util/string/join.h> +#include <util/system/align.h> +#include <util/system/unaligned_mem.h> +#include <util/generic/hash.h> +#include <util/generic/cast.h> #include <util/generic/deque.h> -#include <util/generic/buffer.h> +#include <util/generic/buffer.h> #include <util/generic/array_ref.h> -#include <util/generic/singleton.h> -#include <util/generic/algorithm.h> -#include <util/generic/mem_copy.h> - -using namespace NBlockCodecs; - -namespace { - - struct TCodecFactory { - inline TCodecFactory() { - Add(&Null); - } - - inline const ICodec* Find(const TStringBuf& name) const { - auto it = Registry.find(name); - - if (it == Registry.end()) { - ythrow TNotFound() << "can not found " << name << " codec"; - } - - return it->second; - } - - inline void ListCodecs(TCodecList& lst) const { +#include <util/generic/singleton.h> +#include <util/generic/algorithm.h> +#include <util/generic/mem_copy.h> + +using namespace NBlockCodecs; + +namespace { + + struct TCodecFactory { + inline TCodecFactory() { + Add(&Null); + } + + inline const ICodec* Find(const TStringBuf& name) const { + auto it = Registry.find(name); + + if (it == Registry.end()) { + ythrow TNotFound() << "can not found " << name << " codec"; + } + + return it->second; + } + + inline void ListCodecs(TCodecList& lst) const { for (const auto& it : Registry) { lst.push_back(it.first); - } - - Sort(lst.begin(), lst.end()); - } - - inline void Add(ICodec* codec) { - Registry[codec->Name()] = codec; - } - + } + + Sort(lst.begin(), lst.end()); + } + + inline void Add(ICodec* codec) { + Registry[codec->Name()] = codec; + } + inline void Add(TCodecPtr codec) { Codecs.push_back(std::move(codec)); Add(Codecs.back().Get()); } - inline void Alias(TStringBuf from, TStringBuf to) { - Tmp.emplace_back(from); - Registry[Tmp.back()] = Registry[to]; - } - + inline void Alias(TStringBuf from, TStringBuf to) { + Tmp.emplace_back(from); + Registry[Tmp.back()] = Registry[to]; + } + TDeque<TString> Tmp; - TNullCodec Null; + TNullCodec Null; TVector<TCodecPtr> Codecs; typedef THashMap<TStringBuf, ICodec*> TRegistry; - TRegistry Registry; + TRegistry Registry; // SEARCH-8344: Global decompressed size limiter (to prevent remote DoS) size_t MaxPossibleDecompressedLength = Max<size_t>(); - }; -} - -const ICodec* NBlockCodecs::Codec(const TStringBuf& name) { - return Singleton<TCodecFactory>()->Find(name); -} - -TCodecList NBlockCodecs::ListAllCodecs() { - TCodecList ret; - - Singleton<TCodecFactory>()->ListCodecs(ret); - - return ret; -} - + }; +} + +const ICodec* NBlockCodecs::Codec(const TStringBuf& name) { + return Singleton<TCodecFactory>()->Find(name); +} + +TCodecList NBlockCodecs::ListAllCodecs() { + TCodecList ret; + + Singleton<TCodecFactory>()->ListCodecs(ret); + + return ret; +} + TString NBlockCodecs::ListAllCodecsAsString() { return JoinSeq(TStringBuf(","), ListAllCodecs()); -} - +} + void NBlockCodecs::RegisterCodec(TCodecPtr codec) { Singleton<TCodecFactory>()->Add(std::move(codec)); } @@ -113,36 +113,36 @@ size_t ICodec::GetDecompressedLength(const TData& in) const { return len; } -void ICodec::Encode(const TData& in, TBuffer& out) const { - const size_t maxLen = MaxCompressedLength(in); - - out.Reserve(maxLen); - out.Resize(Compress(in, out.Data())); -} - -void ICodec::Decode(const TData& in, TBuffer& out) const { +void ICodec::Encode(const TData& in, TBuffer& out) const { + const size_t maxLen = MaxCompressedLength(in); + + out.Reserve(maxLen); + out.Resize(Compress(in, out.Data())); +} + +void ICodec::Decode(const TData& in, TBuffer& out) const { const size_t len = GetDecompressedLength(in); - - out.Reserve(len); - out.Resize(Decompress(in, out.Data())); -} - + + out.Reserve(len); + out.Resize(Decompress(in, out.Data())); +} + void ICodec::Encode(const TData& in, TString& out) const { - const size_t maxLen = MaxCompressedLength(in); + const size_t maxLen = MaxCompressedLength(in); out.ReserveAndResize(maxLen); - + size_t actualLen = Compress(in, out.begin()); Y_ASSERT(actualLen <= maxLen); out.resize(actualLen); -} - +} + void ICodec::Decode(const TData& in, TString& out) const { const size_t maxLen = GetDecompressedLength(in); out.ReserveAndResize(maxLen); - + size_t actualLen = Decompress(in, out.begin()); Y_ASSERT(actualLen <= maxLen); out.resize(actualLen); -} - +} + ICodec::~ICodec() = default; diff --git a/library/cpp/blockcodecs/core/codecs.h b/library/cpp/blockcodecs/core/codecs.h index 90b2e1842c..9c93c00274 100644 --- a/library/cpp/blockcodecs/core/codecs.h +++ b/library/cpp/blockcodecs/core/codecs.h @@ -1,14 +1,14 @@ -#pragma once - -#include <util/generic/buffer.h> -#include <util/generic/strbuf.h> +#pragma once + +#include <util/generic/buffer.h> +#include <util/generic/strbuf.h> #include <util/generic/string.h> #include <util/generic/typetraits.h> -#include <util/generic/vector.h> -#include <util/generic/yexception.h> - -namespace NBlockCodecs { - struct TData: public TStringBuf { +#include <util/generic/vector.h> +#include <util/generic/yexception.h> + +namespace NBlockCodecs { + struct TData: public TStringBuf { inline TData() = default; Y_HAS_MEMBER(Data); @@ -18,68 +18,68 @@ namespace NBlockCodecs { inline TData(const T& t) : TStringBuf((const char*)t.data(), t.size()) { - } - + } + template <class T, std::enable_if_t<THasSize<T>::value && THasData<T>::value, int> = 0> - inline TData(const T& t) + inline TData(const T& t) : TStringBuf((const char*)t.Data(), t.Size()) - { - } - }; - - struct TCodecError: public yexception { - }; - - struct TNotFound: public TCodecError { - }; - - struct TDataError: public TCodecError { - }; - - struct ICodec { - virtual ~ICodec(); - + { + } + }; + + struct TCodecError: public yexception { + }; + + struct TNotFound: public TCodecError { + }; + + struct TDataError: public TCodecError { + }; + + struct ICodec { + virtual ~ICodec(); + // main interface - virtual size_t DecompressedLength(const TData& in) const = 0; - virtual size_t MaxCompressedLength(const TData& in) const = 0; - virtual size_t Compress(const TData& in, void* out) const = 0; - virtual size_t Decompress(const TData& in, void* out) const = 0; - + virtual size_t DecompressedLength(const TData& in) const = 0; + virtual size_t MaxCompressedLength(const TData& in) const = 0; + virtual size_t Compress(const TData& in, void* out) const = 0; + virtual size_t Decompress(const TData& in, void* out) const = 0; + virtual TStringBuf Name() const noexcept = 0; - + // some useful helpers - void Encode(const TData& in, TBuffer& out) const; - void Decode(const TData& in, TBuffer& out) const; - + void Encode(const TData& in, TBuffer& out) const; + void Decode(const TData& in, TBuffer& out) const; + void Encode(const TData& in, TString& out) const; void Decode(const TData& in, TString& out) const; - + inline TString Encode(const TData& in) const { TString out; - - Encode(in, out); - - return out; - } - + + Encode(in, out); + + return out; + } + inline TString Decode(const TData& in) const { TString out; - - Decode(in, out); - - return out; - } + + Decode(in, out); + + return out; + } private: size_t GetDecompressedLength(const TData& in) const; - }; - + }; + using TCodecPtr = THolder<ICodec>; - const ICodec* Codec(const TStringBuf& name); - + const ICodec* Codec(const TStringBuf& name); + // some aux methods typedef TVector<TStringBuf> TCodecList; - TCodecList ListAllCodecs(); + TCodecList ListAllCodecs(); TString ListAllCodecsAsString(); // SEARCH-8344: Get the size of max possible decompressed block @@ -87,4 +87,4 @@ namespace NBlockCodecs { // SEARCH-8344: Globally set the size of max possible decompressed block void SetMaxPossibleDecompressedLength(size_t maxPossibleDecompressedLength); -} +} diff --git a/library/cpp/blockcodecs/core/common.h b/library/cpp/blockcodecs/core/common.h index 33649cf567..f05df4d334 100644 --- a/library/cpp/blockcodecs/core/common.h +++ b/library/cpp/blockcodecs/core/common.h @@ -1,105 +1,105 @@ #pragma once - -#include "codecs.h" - -#include <util/ysaveload.h> + +#include "codecs.h" + +#include <util/ysaveload.h> #include <util/stream/null.h> -#include <util/stream/mem.h> -#include <util/string/cast.h> -#include <util/string/join.h> -#include <util/system/align.h> -#include <util/system/unaligned_mem.h> -#include <util/generic/hash.h> -#include <util/generic/cast.h> -#include <util/generic/buffer.h> +#include <util/stream/mem.h> +#include <util/string/cast.h> +#include <util/string/join.h> +#include <util/system/align.h> +#include <util/system/unaligned_mem.h> +#include <util/generic/hash.h> +#include <util/generic/cast.h> +#include <util/generic/buffer.h> #include <util/generic/array_ref.h> -#include <util/generic/singleton.h> -#include <util/generic/algorithm.h> -#include <util/generic/mem_copy.h> - +#include <util/generic/singleton.h> +#include <util/generic/algorithm.h> +#include <util/generic/mem_copy.h> + namespace NBlockCodecs { - struct TDecompressError: public TDataError { - TDecompressError(int code) { - *this << "cannot decompress (errcode " << code << ")"; - } - - TDecompressError(size_t exp, size_t real) { - *this << "broken input (expected len: " << exp << ", got: " << real << ")"; - } + struct TDecompressError: public TDataError { + TDecompressError(int code) { + *this << "cannot decompress (errcode " << code << ")"; + } + + TDecompressError(size_t exp, size_t real) { + *this << "broken input (expected len: " << exp << ", got: " << real << ")"; + } }; - struct TCompressError: public TDataError { - TCompressError(int code) { - *this << "cannot compress (errcode " << code << ")"; - } + struct TCompressError: public TDataError { + TCompressError(int code) { + *this << "cannot compress (errcode " << code << ")"; + } }; - struct TNullCodec: public ICodec { + struct TNullCodec: public ICodec { size_t DecompressedLength(const TData& in) const override { return in.size(); - } - + } + size_t MaxCompressedLength(const TData& in) const override { return in.size(); - } - + } + size_t Compress(const TData& in, void* out) const override { MemCopy((char*)out, in.data(), in.size()); - + return in.size(); - } - + } + size_t Decompress(const TData& in, void* out) const override { MemCopy((char*)out, in.data(), in.size()); - + return in.size(); - } - + } + TStringBuf Name() const noexcept override { return TStringBuf("null"); - } - }; - - template <class T> - struct TAddLengthCodec: public ICodec { - static inline void Check(const TData& in) { + } + }; + + template <class T> + struct TAddLengthCodec: public ICodec { + static inline void Check(const TData& in) { if (in.size() < sizeof(ui64)) { - ythrow TDataError() << "too small input"; - } - } - + ythrow TDataError() << "too small input"; + } + } + size_t DecompressedLength(const TData& in) const override { - Check(in); - + Check(in); + return ReadUnaligned<ui64>(in.data()); - } - + } + size_t MaxCompressedLength(const TData& in) const override { return T::DoMaxCompressedLength(in.size()) + sizeof(ui64); - } - + } + size_t Compress(const TData& in, void* out) const override { - ui64* ptr = (ui64*)out; - + ui64* ptr = (ui64*)out; + WriteUnaligned<ui64>(ptr, (ui64) in.size()); - + return Base()->DoCompress(!in ? TData(TStringBuf("")) : in, ptr + 1) + sizeof(*ptr); - } - + } + size_t Decompress(const TData& in, void* out) const override { - Check(in); - + Check(in); + const auto len = ReadUnaligned<ui64>(in.data()); - + if (!len) return 0; Base()->DoDecompress(TData(in).Skip(sizeof(len)), out, len); return len; - } - + } + inline const T* Base() const noexcept { - return static_cast<const T*>(this); - } - }; -} + return static_cast<const T*>(this); + } + }; +} diff --git a/library/cpp/blockcodecs/core/stream.cpp b/library/cpp/blockcodecs/core/stream.cpp index c0134dea28..4f7db3c32b 100644 --- a/library/cpp/blockcodecs/core/stream.cpp +++ b/library/cpp/blockcodecs/core/stream.cpp @@ -1,212 +1,212 @@ -#include "stream.h" -#include "codecs.h" - -#include <util/digest/murmur.h> -#include <util/generic/scope.h> -#include <util/generic/cast.h> -#include <util/generic/hash.h> -#include <util/generic/singleton.h> -#include <util/stream/mem.h> -#include <util/ysaveload.h> - -using namespace NBlockCodecs; - -namespace { +#include "stream.h" +#include "codecs.h" + +#include <util/digest/murmur.h> +#include <util/generic/scope.h> +#include <util/generic/cast.h> +#include <util/generic/hash.h> +#include <util/generic/singleton.h> +#include <util/stream/mem.h> +#include <util/ysaveload.h> + +using namespace NBlockCodecs; + +namespace { constexpr size_t MAX_BUF_LEN = 128 * 1024 * 1024; - - typedef ui16 TCodecID; - typedef ui64 TBlockLen; - - struct TIds { - inline TIds() { - const TCodecList lst = ListAllCodecs(); - + + typedef ui16 TCodecID; + typedef ui64 TBlockLen; + + struct TIds { + inline TIds() { + const TCodecList lst = ListAllCodecs(); + for (size_t i = 0; i < lst.size(); ++i) { - const ICodec* c = Codec(lst[i]); - - ByID[CodecID(c)] = c; - } - } - - static inline TCodecID CodecID(const ICodec* c) { - const TStringBuf name = c->Name(); - - union { - ui16 Parts[2]; - ui32 Data; - } x; - + const ICodec* c = Codec(lst[i]); + + ByID[CodecID(c)] = c; + } + } + + static inline TCodecID CodecID(const ICodec* c) { + const TStringBuf name = c->Name(); + + union { + ui16 Parts[2]; + ui32 Data; + } x; + x.Data = MurmurHash<ui32>(name.data(), name.size()); - - return x.Parts[1] ^ x.Parts[0]; - } - - inline const ICodec* Find(TCodecID id) const { - TByID::const_iterator it = ByID.find(id); - - if (it != ByID.end()) { - return it->second; - } - - ythrow yexception() << "can not find codec by id " << id; - } - + + return x.Parts[1] ^ x.Parts[0]; + } + + inline const ICodec* Find(TCodecID id) const { + TByID::const_iterator it = ByID.find(id); + + if (it != ByID.end()) { + return it->second; + } + + ythrow yexception() << "can not find codec by id " << id; + } + typedef THashMap<TCodecID, const ICodec*> TByID; - TByID ByID; - }; - + TByID ByID; + }; + TCodecID CodecID(const ICodec* c) { - return TIds::CodecID(c); - } - + return TIds::CodecID(c); + } + const ICodec* CodecByID(TCodecID id) { - return Singleton<TIds>()->Find(id); - } -} - + return Singleton<TIds>()->Find(id); + } +} + TCodedOutput::TCodedOutput(IOutputStream* out, const ICodec* c, size_t bufLen) - : C_(c) - , D_(bufLen) - , S_(out) -{ - if (bufLen > MAX_BUF_LEN) { + : C_(c) + , D_(bufLen) + , S_(out) +{ + if (bufLen > MAX_BUF_LEN) { ythrow yexception() << TStringBuf("too big buffer size: ") << bufLen; - } -} - + } +} + TCodedOutput::~TCodedOutput() { - try { - Finish(); - } catch (...) { - } -} - -void TCodedOutput::DoWrite(const void* buf, size_t len) { - const char* in = (const char*)buf; - - while (len) { - const size_t avail = D_.Avail(); - - if (len < avail) { - D_.Append(in, len); - - return; - } - - D_.Append(in, avail); - + try { + Finish(); + } catch (...) { + } +} + +void TCodedOutput::DoWrite(const void* buf, size_t len) { + const char* in = (const char*)buf; + + while (len) { + const size_t avail = D_.Avail(); + + if (len < avail) { + D_.Append(in, len); + + return; + } + + D_.Append(in, avail); + Y_ASSERT(!D_.Avail()); - - in += avail; - len -= avail; - + + in += avail; + len -= avail; + Y_VERIFY(FlushImpl(), "flush on writing failed"); - } -} - -bool TCodedOutput::FlushImpl() { - const bool ret = !D_.Empty(); - const size_t payload = sizeof(TCodecID) + sizeof(TBlockLen); - O_.Reserve(C_->MaxCompressedLength(D_) + payload); - - void* out = O_.Data() + payload; - const size_t olen = C_->Compress(D_, out); - - { - TMemoryOutput mo(O_.Data(), payload); - - ::Save(&mo, CodecID(C_)); - ::Save(&mo, SafeIntegerCast<TBlockLen>(olen)); - } - - S_->Write(O_.Data(), payload + olen); - - D_.Clear(); - O_.Clear(); - - return ret; -} - -void TCodedOutput::DoFlush() { - if (S_ && !D_.Empty()) { - FlushImpl(); - } -} - -void TCodedOutput::DoFinish() { - if (S_) { - Y_DEFER { + } +} + +bool TCodedOutput::FlushImpl() { + const bool ret = !D_.Empty(); + const size_t payload = sizeof(TCodecID) + sizeof(TBlockLen); + O_.Reserve(C_->MaxCompressedLength(D_) + payload); + + void* out = O_.Data() + payload; + const size_t olen = C_->Compress(D_, out); + + { + TMemoryOutput mo(O_.Data(), payload); + + ::Save(&mo, CodecID(C_)); + ::Save(&mo, SafeIntegerCast<TBlockLen>(olen)); + } + + S_->Write(O_.Data(), payload + olen); + + D_.Clear(); + O_.Clear(); + + return ret; +} + +void TCodedOutput::DoFlush() { + if (S_ && !D_.Empty()) { + FlushImpl(); + } +} + +void TCodedOutput::DoFinish() { + if (S_) { + Y_DEFER { S_ = nullptr; - }; - - if (FlushImpl()) { - //always write zero-length block as eos marker - FlushImpl(); - } - } -} - + }; + + if (FlushImpl()) { + //always write zero-length block as eos marker + FlushImpl(); + } + } +} + TDecodedInput::TDecodedInput(IInputStream* in) - : S_(in) - , C_(nullptr) -{ -} - -TDecodedInput::TDecodedInput(IInputStream* in, const ICodec* codec) - : S_(in) - , C_(codec) -{ -} - + : S_(in) + , C_(nullptr) +{ +} + +TDecodedInput::TDecodedInput(IInputStream* in, const ICodec* codec) + : S_(in) + , C_(codec) +{ +} + TDecodedInput::~TDecodedInput() = default; - + size_t TDecodedInput::DoUnboundedNext(const void** ptr) { - if (!S_) { + if (!S_) { return 0; - } - + } + TCodecID codecId; TBlockLen blockLen; - - { - const size_t payload = sizeof(TCodecID) + sizeof(TBlockLen); - char buf[32]; - + + { + const size_t payload = sizeof(TCodecID) + sizeof(TBlockLen); + char buf[32]; + S_->LoadOrFail(buf, payload); - - TMemoryInput in(buf, payload); - + + TMemoryInput in(buf, payload); + ::Load(&in, codecId); ::Load(&in, blockLen); - } - + } + if (!blockLen) { S_ = nullptr; - + return 0; - } - + } + if (Y_UNLIKELY(blockLen > 1024 * 1024 * 1024)) { - ythrow yexception() << "block size exceeds 1 GiB"; + ythrow yexception() << "block size exceeds 1 GiB"; } - TBuffer block; + TBuffer block; block.Resize(blockLen); - + S_->LoadOrFail(block.Data(), blockLen); - - auto codec = CodecByID(codecId); - - if (C_) { + + auto codec = CodecByID(codecId); + + if (C_) { Y_ENSURE(C_->Name() == codec->Name(), TStringBuf("incorrect stream codec")); - } - - if (codec->DecompressedLength(block) > MAX_BUF_LEN) { - ythrow yexception() << "broken stream"; - } - - codec->Decode(block, D_); - *ptr = D_.Data(); - + } + + if (codec->DecompressedLength(block) > MAX_BUF_LEN) { + ythrow yexception() << "broken stream"; + } + + codec->Decode(block, D_); + *ptr = D_.Data(); + return D_.Size(); -} +} diff --git a/library/cpp/blockcodecs/core/stream.h b/library/cpp/blockcodecs/core/stream.h index b0d7929f05..fd44ef88f2 100644 --- a/library/cpp/blockcodecs/core/stream.h +++ b/library/cpp/blockcodecs/core/stream.h @@ -1,46 +1,46 @@ -#pragma once - -#include <util/stream/walk.h> -#include <util/stream/input.h> -#include <util/stream/output.h> -#include <util/stream/zerocopy.h> -#include <util/generic/buffer.h> - -namespace NBlockCodecs { +#pragma once + +#include <util/stream/walk.h> +#include <util/stream/input.h> +#include <util/stream/output.h> +#include <util/stream/zerocopy.h> +#include <util/generic/buffer.h> + +namespace NBlockCodecs { struct ICodec; - + class TCodedOutput: public IOutputStream { - public: + public: TCodedOutput(IOutputStream* out, const ICodec* c, size_t bufLen); ~TCodedOutput() override; - - private: + + private: void DoWrite(const void* buf, size_t len) override; void DoFlush() override; void DoFinish() override; - - bool FlushImpl(); - - private: - const ICodec* C_; - TBuffer D_; - TBuffer O_; + + bool FlushImpl(); + + private: + const ICodec* C_; + TBuffer D_; + TBuffer O_; IOutputStream* S_; - }; - + }; + class TDecodedInput: public IWalkInput { - public: + public: TDecodedInput(IInputStream* in); - TDecodedInput(IInputStream* in, const ICodec* codec); - + TDecodedInput(IInputStream* in, const ICodec* codec); + ~TDecodedInput() override; - - private: + + private: size_t DoUnboundedNext(const void** ptr) override; - - private: - TBuffer D_; + + private: + TBuffer D_; IInputStream* S_; - const ICodec* C_; - }; -} + const ICodec* C_; + }; +} diff --git a/library/cpp/blockcodecs/core/ya.make b/library/cpp/blockcodecs/core/ya.make index 1956d7dc5e..069e15927b 100644 --- a/library/cpp/blockcodecs/core/ya.make +++ b/library/cpp/blockcodecs/core/ya.make @@ -1,10 +1,10 @@ -LIBRARY() - -OWNER(pg) - -SRCS( - codecs.cpp - stream.cpp -) - -END() +LIBRARY() + +OWNER(pg) + +SRCS( + codecs.cpp + stream.cpp +) + +END() diff --git a/library/cpp/blockcodecs/fuzz/main.cpp b/library/cpp/blockcodecs/fuzz/main.cpp index 388800face..763c6c5a10 100644 --- a/library/cpp/blockcodecs/fuzz/main.cpp +++ b/library/cpp/blockcodecs/fuzz/main.cpp @@ -4,17 +4,17 @@ #include <library/cpp/blockcodecs/codecs.h> #include <library/cpp/blockcodecs/fuzz/proto/case.pb.h> #include <library/cpp/blockcodecs/stream.h> - + #include <util/stream/input.h> #include <util/stream/length.h> -#include <util/stream/mem.h> +#include <util/stream/mem.h> #include <util/stream/null.h> #include <util/stream/str.h> - + using NBlockCodecs::NFuzz::TPackUnpackCase; using NBlockCodecs::TCodedOutput; using NBlockCodecs::TDecodedInput; - + static void ValidateBufferSize(const ui32 size) { Y_ENSURE(size > 0 && size <= 16ULL * 1024); } @@ -22,14 +22,14 @@ static void ValidateBufferSize(const ui32 size) { static void DoOnlyDecode(const TPackUnpackCase& case_) { if (!case_.GetPacked()) { return; - } - + } + TMemoryInput mi(case_.GetData().data(), case_.GetData().size()); TDecodedInput di(&mi); TNullOutput no; TCountingOutput cno(&no); TransferData(&di, &cno); -} +} static void DoDecodeEncode(const TPackUnpackCase& case_) { auto* const codec = NBlockCodecs::Codec(case_.GetCodecName()); diff --git a/library/cpp/blockcodecs/fuzz/ya.make b/library/cpp/blockcodecs/fuzz/ya.make index a8f6d9f617..bc8becc9e1 100644 --- a/library/cpp/blockcodecs/fuzz/ya.make +++ b/library/cpp/blockcodecs/fuzz/ya.make @@ -2,12 +2,12 @@ OWNER( pg g:util ) - + IF (NOT MSVC) FUZZ() - + SIZE(MEDIUM) - + SRCS( main.cpp ) diff --git a/library/cpp/blockcodecs/ut/ya.make b/library/cpp/blockcodecs/ut/ya.make index b6de366c43..25b882c15b 100644 --- a/library/cpp/blockcodecs/ut/ya.make +++ b/library/cpp/blockcodecs/ut/ya.make @@ -2,14 +2,14 @@ UNITTEST_FOR(library/cpp/blockcodecs) OWNER(pg) -FORK_TESTS() +FORK_TESTS() -FORK_SUBTESTS() +FORK_SUBTESTS() -SPLIT_FACTOR(40) +SPLIT_FACTOR(40) + +TIMEOUT(300) -TIMEOUT(300) - SIZE(MEDIUM) SRCS( |