diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/blockcodecs/core | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/blockcodecs/core')
-rw-r--r-- | library/cpp/blockcodecs/core/codecs.cpp | 148 | ||||
-rw-r--r-- | library/cpp/blockcodecs/core/codecs.h | 90 | ||||
-rw-r--r-- | library/cpp/blockcodecs/core/common.h | 105 | ||||
-rw-r--r-- | library/cpp/blockcodecs/core/register.h | 10 | ||||
-rw-r--r-- | library/cpp/blockcodecs/core/stream.cpp | 212 | ||||
-rw-r--r-- | library/cpp/blockcodecs/core/stream.h | 46 | ||||
-rw-r--r-- | library/cpp/blockcodecs/core/ya.make | 10 |
7 files changed, 621 insertions, 0 deletions
diff --git a/library/cpp/blockcodecs/core/codecs.cpp b/library/cpp/blockcodecs/core/codecs.cpp new file mode 100644 index 0000000000..21506e812b --- /dev/null +++ b/library/cpp/blockcodecs/core/codecs.cpp @@ -0,0 +1,148 @@ +#include "codecs.h" +#include "common.h" +#include "register.h" + +#include <util/ysaveload.h> +#include <util/stream/null.h> +#include <util/stream/mem.h> +#include <util/string/cast.h> +#include <util/string/join.h> +#include <util/system/align.h> +#include <util/system/unaligned_mem.h> +#include <util/generic/hash.h> +#include <util/generic/cast.h> +#include <util/generic/deque.h> +#include <util/generic/buffer.h> +#include <util/generic/array_ref.h> +#include <util/generic/singleton.h> +#include <util/generic/algorithm.h> +#include <util/generic/mem_copy.h> + +using namespace NBlockCodecs; + +namespace { + + struct TCodecFactory { + inline TCodecFactory() { + Add(&Null); + } + + inline const ICodec* Find(const TStringBuf& name) const { + auto it = Registry.find(name); + + if (it == Registry.end()) { + ythrow TNotFound() << "can not found " << name << " codec"; + } + + return it->second; + } + + inline void ListCodecs(TCodecList& lst) const { + for (const auto& it : Registry) { + lst.push_back(it.first); + } + + Sort(lst.begin(), lst.end()); + } + + inline void Add(ICodec* codec) { + Registry[codec->Name()] = codec; + } + + inline void Add(TCodecPtr codec) { + Codecs.push_back(std::move(codec)); + Add(Codecs.back().Get()); + } + + inline void Alias(TStringBuf from, TStringBuf to) { + Tmp.emplace_back(from); + Registry[Tmp.back()] = Registry[to]; + } + + TDeque<TString> Tmp; + TNullCodec Null; + TVector<TCodecPtr> Codecs; + typedef THashMap<TStringBuf, ICodec*> TRegistry; + TRegistry Registry; + + // SEARCH-8344: Global decompressed size limiter (to prevent remote DoS) + size_t MaxPossibleDecompressedLength = Max<size_t>(); + }; +} + +const ICodec* NBlockCodecs::Codec(const TStringBuf& name) { + return Singleton<TCodecFactory>()->Find(name); +} + +TCodecList NBlockCodecs::ListAllCodecs() { + TCodecList ret; + + Singleton<TCodecFactory>()->ListCodecs(ret); + + return ret; +} + +TString NBlockCodecs::ListAllCodecsAsString() { + return JoinSeq(TStringBuf(","), ListAllCodecs()); +} + +void NBlockCodecs::RegisterCodec(TCodecPtr codec) { + Singleton<TCodecFactory>()->Add(std::move(codec)); +} + +void NBlockCodecs::RegisterAlias(TStringBuf from, TStringBuf to) { + Singleton<TCodecFactory>()->Alias(from, to); +} + +void NBlockCodecs::SetMaxPossibleDecompressedLength(size_t maxPossibleDecompressedLength) { + Singleton<TCodecFactory>()->MaxPossibleDecompressedLength = maxPossibleDecompressedLength; +} + +size_t NBlockCodecs::GetMaxPossibleDecompressedLength() { + return Singleton<TCodecFactory>()->MaxPossibleDecompressedLength; +} + +size_t ICodec::GetDecompressedLength(const TData& in) const { + const size_t len = DecompressedLength(in); + + Y_ENSURE( + len <= NBlockCodecs::GetMaxPossibleDecompressedLength(), + "Attempt to decompress the block that is larger than maximum possible decompressed length, " + "see SEARCH-8344 for details. " + ); + return len; +} + +void ICodec::Encode(const TData& in, TBuffer& out) const { + const size_t maxLen = MaxCompressedLength(in); + + out.Reserve(maxLen); + out.Resize(Compress(in, out.Data())); +} + +void ICodec::Decode(const TData& in, TBuffer& out) const { + const size_t len = GetDecompressedLength(in); + + out.Reserve(len); + out.Resize(Decompress(in, out.Data())); +} + +void ICodec::Encode(const TData& in, TString& out) const { + const size_t maxLen = MaxCompressedLength(in); + out.ReserveAndResize(maxLen); + + size_t actualLen = Compress(in, out.begin()); + Y_ASSERT(actualLen <= maxLen); + out.resize(actualLen); +} + +void ICodec::Decode(const TData& in, TString& out) const { + const size_t maxLen = GetDecompressedLength(in); + out.ReserveAndResize(maxLen); + + size_t actualLen = Decompress(in, out.begin()); + Y_ASSERT(actualLen <= maxLen); + out.resize(actualLen); +} + +ICodec::~ICodec() = default; diff --git a/library/cpp/blockcodecs/core/codecs.h b/library/cpp/blockcodecs/core/codecs.h new file mode 100644 index 0000000000..9c93c00274 --- /dev/null +++ b/library/cpp/blockcodecs/core/codecs.h @@ -0,0 +1,90 @@ +#pragma once + +#include <util/generic/buffer.h> +#include <util/generic/strbuf.h> +#include <util/generic/string.h> +#include <util/generic/typetraits.h> +#include <util/generic/vector.h> +#include <util/generic/yexception.h> + +namespace NBlockCodecs { + struct TData: public TStringBuf { + inline TData() = default; + + Y_HAS_MEMBER(Data); + Y_HAS_MEMBER(Size); + + template <class T, std::enable_if_t<!THasSize<T>::value || !THasData<T>::value, int> = 0> + inline TData(const T& t) + : TStringBuf((const char*)t.data(), t.size()) + { + } + + template <class T, std::enable_if_t<THasSize<T>::value && THasData<T>::value, int> = 0> + inline TData(const T& t) + : TStringBuf((const char*)t.Data(), t.Size()) + { + } + }; + + struct TCodecError: public yexception { + }; + + struct TNotFound: public TCodecError { + }; + + struct TDataError: public TCodecError { + }; + + struct ICodec { + virtual ~ICodec(); + + // main interface + virtual size_t DecompressedLength(const TData& in) const = 0; + virtual size_t MaxCompressedLength(const TData& in) const = 0; + virtual size_t Compress(const TData& in, void* out) const = 0; + virtual size_t Decompress(const TData& in, void* out) const = 0; + + virtual TStringBuf Name() const noexcept = 0; + + // some useful helpers + void Encode(const TData& in, TBuffer& out) const; + void Decode(const TData& in, TBuffer& out) const; + + void Encode(const TData& in, TString& out) const; + void Decode(const TData& in, TString& out) const; + + inline TString Encode(const TData& in) const { + TString out; + + Encode(in, out); + + return out; + } + + inline TString Decode(const TData& in) const { + TString out; + + Decode(in, out); + + return out; + } + private: + size_t GetDecompressedLength(const TData& in) const; + }; + + using TCodecPtr = THolder<ICodec>; + + const ICodec* Codec(const TStringBuf& name); + + // some aux methods + typedef TVector<TStringBuf> TCodecList; + TCodecList ListAllCodecs(); + TString ListAllCodecsAsString(); + + // SEARCH-8344: Get the size of max possible decompressed block + size_t GetMaxPossibleDecompressedLength(); + // SEARCH-8344: Globally set the size of max possible decompressed block + void SetMaxPossibleDecompressedLength(size_t maxPossibleDecompressedLength); + +} diff --git a/library/cpp/blockcodecs/core/common.h b/library/cpp/blockcodecs/core/common.h new file mode 100644 index 0000000000..f05df4d334 --- /dev/null +++ b/library/cpp/blockcodecs/core/common.h @@ -0,0 +1,105 @@ +#pragma once + +#include "codecs.h" + +#include <util/ysaveload.h> +#include <util/stream/null.h> +#include <util/stream/mem.h> +#include <util/string/cast.h> +#include <util/string/join.h> +#include <util/system/align.h> +#include <util/system/unaligned_mem.h> +#include <util/generic/hash.h> +#include <util/generic/cast.h> +#include <util/generic/buffer.h> +#include <util/generic/array_ref.h> +#include <util/generic/singleton.h> +#include <util/generic/algorithm.h> +#include <util/generic/mem_copy.h> + +namespace NBlockCodecs { + struct TDecompressError: public TDataError { + TDecompressError(int code) { + *this << "cannot decompress (errcode " << code << ")"; + } + + TDecompressError(size_t exp, size_t real) { + *this << "broken input (expected len: " << exp << ", got: " << real << ")"; + } + }; + + struct TCompressError: public TDataError { + TCompressError(int code) { + *this << "cannot compress (errcode " << code << ")"; + } + }; + + struct TNullCodec: public ICodec { + size_t DecompressedLength(const TData& in) const override { + return in.size(); + } + + size_t MaxCompressedLength(const TData& in) const override { + return in.size(); + } + + size_t Compress(const TData& in, void* out) const override { + MemCopy((char*)out, in.data(), in.size()); + + return in.size(); + } + + size_t Decompress(const TData& in, void* out) const override { + MemCopy((char*)out, in.data(), in.size()); + + return in.size(); + } + + TStringBuf Name() const noexcept override { + return TStringBuf("null"); + } + }; + + template <class T> + struct TAddLengthCodec: public ICodec { + static inline void Check(const TData& in) { + if (in.size() < sizeof(ui64)) { + ythrow TDataError() << "too small input"; + } + } + + size_t DecompressedLength(const TData& in) const override { + Check(in); + + return ReadUnaligned<ui64>(in.data()); + } + + size_t MaxCompressedLength(const TData& in) const override { + return T::DoMaxCompressedLength(in.size()) + sizeof(ui64); + } + + size_t Compress(const TData& in, void* out) const override { + ui64* ptr = (ui64*)out; + + WriteUnaligned<ui64>(ptr, (ui64) in.size()); + + return Base()->DoCompress(!in ? TData(TStringBuf("")) : in, ptr + 1) + sizeof(*ptr); + } + + size_t Decompress(const TData& in, void* out) const override { + Check(in); + + const auto len = ReadUnaligned<ui64>(in.data()); + + if (!len) + return 0; + + Base()->DoDecompress(TData(in).Skip(sizeof(len)), out, len); + return len; + } + + inline const T* Base() const noexcept { + return static_cast<const T*>(this); + } + }; +} diff --git a/library/cpp/blockcodecs/core/register.h b/library/cpp/blockcodecs/core/register.h new file mode 100644 index 0000000000..fa1186dd70 --- /dev/null +++ b/library/cpp/blockcodecs/core/register.h @@ -0,0 +1,10 @@ +#pragma once + +#include "codecs.h" + +namespace NBlockCodecs{ + + void RegisterCodec(TCodecPtr codec); + void RegisterAlias(TStringBuf from, TStringBuf to); + +} diff --git a/library/cpp/blockcodecs/core/stream.cpp b/library/cpp/blockcodecs/core/stream.cpp new file mode 100644 index 0000000000..4f7db3c32b --- /dev/null +++ b/library/cpp/blockcodecs/core/stream.cpp @@ -0,0 +1,212 @@ +#include "stream.h" +#include "codecs.h" + +#include <util/digest/murmur.h> +#include <util/generic/scope.h> +#include <util/generic/cast.h> +#include <util/generic/hash.h> +#include <util/generic/singleton.h> +#include <util/stream/mem.h> +#include <util/ysaveload.h> + +using namespace NBlockCodecs; + +namespace { + constexpr size_t MAX_BUF_LEN = 128 * 1024 * 1024; + + typedef ui16 TCodecID; + typedef ui64 TBlockLen; + + struct TIds { + inline TIds() { + const TCodecList lst = ListAllCodecs(); + + for (size_t i = 0; i < lst.size(); ++i) { + const ICodec* c = Codec(lst[i]); + + ByID[CodecID(c)] = c; + } + } + + static inline TCodecID CodecID(const ICodec* c) { + const TStringBuf name = c->Name(); + + union { + ui16 Parts[2]; + ui32 Data; + } x; + + x.Data = MurmurHash<ui32>(name.data(), name.size()); + + return x.Parts[1] ^ x.Parts[0]; + } + + inline const ICodec* Find(TCodecID id) const { + TByID::const_iterator it = ByID.find(id); + + if (it != ByID.end()) { + return it->second; + } + + ythrow yexception() << "can not find codec by id " << id; + } + + typedef THashMap<TCodecID, const ICodec*> TByID; + TByID ByID; + }; + + TCodecID CodecID(const ICodec* c) { + return TIds::CodecID(c); + } + + const ICodec* CodecByID(TCodecID id) { + return Singleton<TIds>()->Find(id); + } +} + +TCodedOutput::TCodedOutput(IOutputStream* out, const ICodec* c, size_t bufLen) + : C_(c) + , D_(bufLen) + , S_(out) +{ + if (bufLen > MAX_BUF_LEN) { + ythrow yexception() << TStringBuf("too big buffer size: ") << bufLen; + } +} + +TCodedOutput::~TCodedOutput() { + try { + Finish(); + } catch (...) { + } +} + +void TCodedOutput::DoWrite(const void* buf, size_t len) { + const char* in = (const char*)buf; + + while (len) { + const size_t avail = D_.Avail(); + + if (len < avail) { + D_.Append(in, len); + + return; + } + + D_.Append(in, avail); + + Y_ASSERT(!D_.Avail()); + + in += avail; + len -= avail; + + Y_VERIFY(FlushImpl(), "flush on writing failed"); + } +} + +bool TCodedOutput::FlushImpl() { + const bool ret = !D_.Empty(); + const size_t payload = sizeof(TCodecID) + sizeof(TBlockLen); + O_.Reserve(C_->MaxCompressedLength(D_) + payload); + + void* out = O_.Data() + payload; + const size_t olen = C_->Compress(D_, out); + + { + TMemoryOutput mo(O_.Data(), payload); + + ::Save(&mo, CodecID(C_)); + ::Save(&mo, SafeIntegerCast<TBlockLen>(olen)); + } + + S_->Write(O_.Data(), payload + olen); + + D_.Clear(); + O_.Clear(); + + return ret; +} + +void TCodedOutput::DoFlush() { + if (S_ && !D_.Empty()) { + FlushImpl(); + } +} + +void TCodedOutput::DoFinish() { + if (S_) { + Y_DEFER { + S_ = nullptr; + }; + + if (FlushImpl()) { + //always write zero-length block as eos marker + FlushImpl(); + } + } +} + +TDecodedInput::TDecodedInput(IInputStream* in) + : S_(in) + , C_(nullptr) +{ +} + +TDecodedInput::TDecodedInput(IInputStream* in, const ICodec* codec) + : S_(in) + , C_(codec) +{ +} + +TDecodedInput::~TDecodedInput() = default; + +size_t TDecodedInput::DoUnboundedNext(const void** ptr) { + if (!S_) { + return 0; + } + + TCodecID codecId; + TBlockLen blockLen; + + { + const size_t payload = sizeof(TCodecID) + sizeof(TBlockLen); + char buf[32]; + + S_->LoadOrFail(buf, payload); + + TMemoryInput in(buf, payload); + + ::Load(&in, codecId); + ::Load(&in, blockLen); + } + + if (!blockLen) { + S_ = nullptr; + + return 0; + } + + if (Y_UNLIKELY(blockLen > 1024 * 1024 * 1024)) { + ythrow yexception() << "block size exceeds 1 GiB"; + } + + TBuffer block; + block.Resize(blockLen); + + S_->LoadOrFail(block.Data(), blockLen); + + auto codec = CodecByID(codecId); + + if (C_) { + Y_ENSURE(C_->Name() == codec->Name(), TStringBuf("incorrect stream codec")); + } + + if (codec->DecompressedLength(block) > MAX_BUF_LEN) { + ythrow yexception() << "broken stream"; + } + + codec->Decode(block, D_); + *ptr = D_.Data(); + + return D_.Size(); +} diff --git a/library/cpp/blockcodecs/core/stream.h b/library/cpp/blockcodecs/core/stream.h new file mode 100644 index 0000000000..fd44ef88f2 --- /dev/null +++ b/library/cpp/blockcodecs/core/stream.h @@ -0,0 +1,46 @@ +#pragma once + +#include <util/stream/walk.h> +#include <util/stream/input.h> +#include <util/stream/output.h> +#include <util/stream/zerocopy.h> +#include <util/generic/buffer.h> + +namespace NBlockCodecs { + struct ICodec; + + class TCodedOutput: public IOutputStream { + public: + TCodedOutput(IOutputStream* out, const ICodec* c, size_t bufLen); + ~TCodedOutput() override; + + private: + void DoWrite(const void* buf, size_t len) override; + void DoFlush() override; + void DoFinish() override; + + bool FlushImpl(); + + private: + const ICodec* C_; + TBuffer D_; + TBuffer O_; + IOutputStream* S_; + }; + + class TDecodedInput: public IWalkInput { + public: + TDecodedInput(IInputStream* in); + TDecodedInput(IInputStream* in, const ICodec* codec); + + ~TDecodedInput() override; + + private: + size_t DoUnboundedNext(const void** ptr) override; + + private: + TBuffer D_; + IInputStream* S_; + const ICodec* C_; + }; +} diff --git a/library/cpp/blockcodecs/core/ya.make b/library/cpp/blockcodecs/core/ya.make new file mode 100644 index 0000000000..069e15927b --- /dev/null +++ b/library/cpp/blockcodecs/core/ya.make @@ -0,0 +1,10 @@ +LIBRARY() + +OWNER(pg) + +SRCS( + codecs.cpp + stream.cpp +) + +END() |