aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/blockcodecs/core
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/blockcodecs/core
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/blockcodecs/core')
-rw-r--r--library/cpp/blockcodecs/core/codecs.cpp148
-rw-r--r--library/cpp/blockcodecs/core/codecs.h90
-rw-r--r--library/cpp/blockcodecs/core/common.h105
-rw-r--r--library/cpp/blockcodecs/core/register.h10
-rw-r--r--library/cpp/blockcodecs/core/stream.cpp212
-rw-r--r--library/cpp/blockcodecs/core/stream.h46
-rw-r--r--library/cpp/blockcodecs/core/ya.make10
7 files changed, 621 insertions, 0 deletions
diff --git a/library/cpp/blockcodecs/core/codecs.cpp b/library/cpp/blockcodecs/core/codecs.cpp
new file mode 100644
index 0000000000..21506e812b
--- /dev/null
+++ b/library/cpp/blockcodecs/core/codecs.cpp
@@ -0,0 +1,148 @@
+#include "codecs.h"
+#include "common.h"
+#include "register.h"
+
+#include <util/ysaveload.h>
+#include <util/stream/null.h>
+#include <util/stream/mem.h>
+#include <util/string/cast.h>
+#include <util/string/join.h>
+#include <util/system/align.h>
+#include <util/system/unaligned_mem.h>
+#include <util/generic/hash.h>
+#include <util/generic/cast.h>
+#include <util/generic/deque.h>
+#include <util/generic/buffer.h>
+#include <util/generic/array_ref.h>
+#include <util/generic/singleton.h>
+#include <util/generic/algorithm.h>
+#include <util/generic/mem_copy.h>
+
+using namespace NBlockCodecs;
+
+namespace {
+
+ struct TCodecFactory {
+ inline TCodecFactory() {
+ Add(&Null);
+ }
+
+ inline const ICodec* Find(const TStringBuf& name) const {
+ auto it = Registry.find(name);
+
+ if (it == Registry.end()) {
+ ythrow TNotFound() << "can not found " << name << " codec";
+ }
+
+ return it->second;
+ }
+
+ inline void ListCodecs(TCodecList& lst) const {
+ for (const auto& it : Registry) {
+ lst.push_back(it.first);
+ }
+
+ Sort(lst.begin(), lst.end());
+ }
+
+ inline void Add(ICodec* codec) {
+ Registry[codec->Name()] = codec;
+ }
+
+ inline void Add(TCodecPtr codec) {
+ Codecs.push_back(std::move(codec));
+ Add(Codecs.back().Get());
+ }
+
+ inline void Alias(TStringBuf from, TStringBuf to) {
+ Tmp.emplace_back(from);
+ Registry[Tmp.back()] = Registry[to];
+ }
+
+ TDeque<TString> Tmp;
+ TNullCodec Null;
+ TVector<TCodecPtr> Codecs;
+ typedef THashMap<TStringBuf, ICodec*> TRegistry;
+ TRegistry Registry;
+
+ // SEARCH-8344: Global decompressed size limiter (to prevent remote DoS)
+ size_t MaxPossibleDecompressedLength = Max<size_t>();
+ };
+}
+
+const ICodec* NBlockCodecs::Codec(const TStringBuf& name) {
+ return Singleton<TCodecFactory>()->Find(name);
+}
+
+TCodecList NBlockCodecs::ListAllCodecs() {
+ TCodecList ret;
+
+ Singleton<TCodecFactory>()->ListCodecs(ret);
+
+ return ret;
+}
+
+TString NBlockCodecs::ListAllCodecsAsString() {
+ return JoinSeq(TStringBuf(","), ListAllCodecs());
+}
+
+void NBlockCodecs::RegisterCodec(TCodecPtr codec) {
+ Singleton<TCodecFactory>()->Add(std::move(codec));
+}
+
+void NBlockCodecs::RegisterAlias(TStringBuf from, TStringBuf to) {
+ Singleton<TCodecFactory>()->Alias(from, to);
+}
+
+void NBlockCodecs::SetMaxPossibleDecompressedLength(size_t maxPossibleDecompressedLength) {
+ Singleton<TCodecFactory>()->MaxPossibleDecompressedLength = maxPossibleDecompressedLength;
+}
+
+size_t NBlockCodecs::GetMaxPossibleDecompressedLength() {
+ return Singleton<TCodecFactory>()->MaxPossibleDecompressedLength;
+}
+
+size_t ICodec::GetDecompressedLength(const TData& in) const {
+ const size_t len = DecompressedLength(in);
+
+ Y_ENSURE(
+ len <= NBlockCodecs::GetMaxPossibleDecompressedLength(),
+ "Attempt to decompress the block that is larger than maximum possible decompressed length, "
+ "see SEARCH-8344 for details. "
+ );
+ return len;
+}
+
+void ICodec::Encode(const TData& in, TBuffer& out) const {
+ const size_t maxLen = MaxCompressedLength(in);
+
+ out.Reserve(maxLen);
+ out.Resize(Compress(in, out.Data()));
+}
+
+void ICodec::Decode(const TData& in, TBuffer& out) const {
+ const size_t len = GetDecompressedLength(in);
+
+ out.Reserve(len);
+ out.Resize(Decompress(in, out.Data()));
+}
+
+void ICodec::Encode(const TData& in, TString& out) const {
+ const size_t maxLen = MaxCompressedLength(in);
+ out.ReserveAndResize(maxLen);
+
+ size_t actualLen = Compress(in, out.begin());
+ Y_ASSERT(actualLen <= maxLen);
+ out.resize(actualLen);
+}
+
+void ICodec::Decode(const TData& in, TString& out) const {
+ const size_t maxLen = GetDecompressedLength(in);
+ out.ReserveAndResize(maxLen);
+
+ size_t actualLen = Decompress(in, out.begin());
+ Y_ASSERT(actualLen <= maxLen);
+ out.resize(actualLen);
+}
+
+ICodec::~ICodec() = default;
diff --git a/library/cpp/blockcodecs/core/codecs.h b/library/cpp/blockcodecs/core/codecs.h
new file mode 100644
index 0000000000..9c93c00274
--- /dev/null
+++ b/library/cpp/blockcodecs/core/codecs.h
@@ -0,0 +1,90 @@
+#pragma once
+
+#include <util/generic/buffer.h>
+#include <util/generic/strbuf.h>
+#include <util/generic/string.h>
+#include <util/generic/typetraits.h>
+#include <util/generic/vector.h>
+#include <util/generic/yexception.h>
+
+namespace NBlockCodecs {
+ struct TData: public TStringBuf {
+ inline TData() = default;
+
+ Y_HAS_MEMBER(Data);
+ Y_HAS_MEMBER(Size);
+
+ template <class T, std::enable_if_t<!THasSize<T>::value || !THasData<T>::value, int> = 0>
+ inline TData(const T& t)
+ : TStringBuf((const char*)t.data(), t.size())
+ {
+ }
+
+ template <class T, std::enable_if_t<THasSize<T>::value && THasData<T>::value, int> = 0>
+ inline TData(const T& t)
+ : TStringBuf((const char*)t.Data(), t.Size())
+ {
+ }
+ };
+
+ struct TCodecError: public yexception {
+ };
+
+ struct TNotFound: public TCodecError {
+ };
+
+ struct TDataError: public TCodecError {
+ };
+
+ struct ICodec {
+ virtual ~ICodec();
+
+ // main interface
+ virtual size_t DecompressedLength(const TData& in) const = 0;
+ virtual size_t MaxCompressedLength(const TData& in) const = 0;
+ virtual size_t Compress(const TData& in, void* out) const = 0;
+ virtual size_t Decompress(const TData& in, void* out) const = 0;
+
+ virtual TStringBuf Name() const noexcept = 0;
+
+ // some useful helpers
+ void Encode(const TData& in, TBuffer& out) const;
+ void Decode(const TData& in, TBuffer& out) const;
+
+ void Encode(const TData& in, TString& out) const;
+ void Decode(const TData& in, TString& out) const;
+
+ inline TString Encode(const TData& in) const {
+ TString out;
+
+ Encode(in, out);
+
+ return out;
+ }
+
+ inline TString Decode(const TData& in) const {
+ TString out;
+
+ Decode(in, out);
+
+ return out;
+ }
+ private:
+ size_t GetDecompressedLength(const TData& in) const;
+ };
+
+ using TCodecPtr = THolder<ICodec>;
+
+ const ICodec* Codec(const TStringBuf& name);
+
+ // some aux methods
+ typedef TVector<TStringBuf> TCodecList;
+ TCodecList ListAllCodecs();
+ TString ListAllCodecsAsString();
+
+ // SEARCH-8344: Get the size of max possible decompressed block
+ size_t GetMaxPossibleDecompressedLength();
+ // SEARCH-8344: Globally set the size of max possible decompressed block
+ void SetMaxPossibleDecompressedLength(size_t maxPossibleDecompressedLength);
+
+}
diff --git a/library/cpp/blockcodecs/core/common.h b/library/cpp/blockcodecs/core/common.h
new file mode 100644
index 0000000000..f05df4d334
--- /dev/null
+++ b/library/cpp/blockcodecs/core/common.h
@@ -0,0 +1,105 @@
+#pragma once
+
+#include "codecs.h"
+
+#include <util/ysaveload.h>
+#include <util/stream/null.h>
+#include <util/stream/mem.h>
+#include <util/string/cast.h>
+#include <util/string/join.h>
+#include <util/system/align.h>
+#include <util/system/unaligned_mem.h>
+#include <util/generic/hash.h>
+#include <util/generic/cast.h>
+#include <util/generic/buffer.h>
+#include <util/generic/array_ref.h>
+#include <util/generic/singleton.h>
+#include <util/generic/algorithm.h>
+#include <util/generic/mem_copy.h>
+
+namespace NBlockCodecs {
+ struct TDecompressError: public TDataError {
+ TDecompressError(int code) {
+ *this << "cannot decompress (errcode " << code << ")";
+ }
+
+ TDecompressError(size_t exp, size_t real) {
+ *this << "broken input (expected len: " << exp << ", got: " << real << ")";
+ }
+ };
+
+ struct TCompressError: public TDataError {
+ TCompressError(int code) {
+ *this << "cannot compress (errcode " << code << ")";
+ }
+ };
+
+ struct TNullCodec: public ICodec {
+ size_t DecompressedLength(const TData& in) const override {
+ return in.size();
+ }
+
+ size_t MaxCompressedLength(const TData& in) const override {
+ return in.size();
+ }
+
+ size_t Compress(const TData& in, void* out) const override {
+ MemCopy((char*)out, in.data(), in.size());
+
+ return in.size();
+ }
+
+ size_t Decompress(const TData& in, void* out) const override {
+ MemCopy((char*)out, in.data(), in.size());
+
+ return in.size();
+ }
+
+ TStringBuf Name() const noexcept override {
+ return TStringBuf("null");
+ }
+ };
+
+ template <class T>
+ struct TAddLengthCodec: public ICodec {
+ static inline void Check(const TData& in) {
+ if (in.size() < sizeof(ui64)) {
+ ythrow TDataError() << "too small input";
+ }
+ }
+
+ size_t DecompressedLength(const TData& in) const override {
+ Check(in);
+
+ return ReadUnaligned<ui64>(in.data());
+ }
+
+ size_t MaxCompressedLength(const TData& in) const override {
+ return T::DoMaxCompressedLength(in.size()) + sizeof(ui64);
+ }
+
+ size_t Compress(const TData& in, void* out) const override {
+ ui64* ptr = (ui64*)out;
+
+ WriteUnaligned<ui64>(ptr, (ui64) in.size());
+
+ return Base()->DoCompress(!in ? TData(TStringBuf("")) : in, ptr + 1) + sizeof(*ptr);
+ }
+
+ size_t Decompress(const TData& in, void* out) const override {
+ Check(in);
+
+ const auto len = ReadUnaligned<ui64>(in.data());
+
+ if (!len)
+ return 0;
+
+ Base()->DoDecompress(TData(in).Skip(sizeof(len)), out, len);
+ return len;
+ }
+
+ inline const T* Base() const noexcept {
+ return static_cast<const T*>(this);
+ }
+ };
+}
diff --git a/library/cpp/blockcodecs/core/register.h b/library/cpp/blockcodecs/core/register.h
new file mode 100644
index 0000000000..fa1186dd70
--- /dev/null
+++ b/library/cpp/blockcodecs/core/register.h
@@ -0,0 +1,10 @@
+#pragma once
+
+#include "codecs.h"
+
+namespace NBlockCodecs{
+
+ void RegisterCodec(TCodecPtr codec);
+ void RegisterAlias(TStringBuf from, TStringBuf to);
+
+}
diff --git a/library/cpp/blockcodecs/core/stream.cpp b/library/cpp/blockcodecs/core/stream.cpp
new file mode 100644
index 0000000000..4f7db3c32b
--- /dev/null
+++ b/library/cpp/blockcodecs/core/stream.cpp
@@ -0,0 +1,212 @@
+#include "stream.h"
+#include "codecs.h"
+
+#include <util/digest/murmur.h>
+#include <util/generic/scope.h>
+#include <util/generic/cast.h>
+#include <util/generic/hash.h>
+#include <util/generic/singleton.h>
+#include <util/stream/mem.h>
+#include <util/ysaveload.h>
+
+using namespace NBlockCodecs;
+
+namespace {
+ constexpr size_t MAX_BUF_LEN = 128 * 1024 * 1024;
+
+ typedef ui16 TCodecID;
+ typedef ui64 TBlockLen;
+
+ struct TIds {
+ inline TIds() {
+ const TCodecList lst = ListAllCodecs();
+
+ for (size_t i = 0; i < lst.size(); ++i) {
+ const ICodec* c = Codec(lst[i]);
+
+ ByID[CodecID(c)] = c;
+ }
+ }
+
+ static inline TCodecID CodecID(const ICodec* c) {
+ const TStringBuf name = c->Name();
+
+ union {
+ ui16 Parts[2];
+ ui32 Data;
+ } x;
+
+ x.Data = MurmurHash<ui32>(name.data(), name.size());
+
+ return x.Parts[1] ^ x.Parts[0];
+ }
+
+ inline const ICodec* Find(TCodecID id) const {
+ TByID::const_iterator it = ByID.find(id);
+
+ if (it != ByID.end()) {
+ return it->second;
+ }
+
+ ythrow yexception() << "can not find codec by id " << id;
+ }
+
+ typedef THashMap<TCodecID, const ICodec*> TByID;
+ TByID ByID;
+ };
+
+ TCodecID CodecID(const ICodec* c) {
+ return TIds::CodecID(c);
+ }
+
+ const ICodec* CodecByID(TCodecID id) {
+ return Singleton<TIds>()->Find(id);
+ }
+}
+
+TCodedOutput::TCodedOutput(IOutputStream* out, const ICodec* c, size_t bufLen)
+ : C_(c)
+ , D_(bufLen)
+ , S_(out)
+{
+ if (bufLen > MAX_BUF_LEN) {
+ ythrow yexception() << TStringBuf("too big buffer size: ") << bufLen;
+ }
+}
+
+TCodedOutput::~TCodedOutput() {
+ try {
+ Finish();
+ } catch (...) {
+ }
+}
+
+void TCodedOutput::DoWrite(const void* buf, size_t len) {
+ const char* in = (const char*)buf;
+
+ while (len) {
+ const size_t avail = D_.Avail();
+
+ if (len < avail) {
+ D_.Append(in, len);
+
+ return;
+ }
+
+ D_.Append(in, avail);
+
+ Y_ASSERT(!D_.Avail());
+
+ in += avail;
+ len -= avail;
+
+ Y_VERIFY(FlushImpl(), "flush on writing failed");
+ }
+}
+
+bool TCodedOutput::FlushImpl() {
+ const bool ret = !D_.Empty();
+ const size_t payload = sizeof(TCodecID) + sizeof(TBlockLen);
+ O_.Reserve(C_->MaxCompressedLength(D_) + payload);
+
+ void* out = O_.Data() + payload;
+ const size_t olen = C_->Compress(D_, out);
+
+ {
+ TMemoryOutput mo(O_.Data(), payload);
+
+ ::Save(&mo, CodecID(C_));
+ ::Save(&mo, SafeIntegerCast<TBlockLen>(olen));
+ }
+
+ S_->Write(O_.Data(), payload + olen);
+
+ D_.Clear();
+ O_.Clear();
+
+ return ret;
+}
+
+void TCodedOutput::DoFlush() {
+ if (S_ && !D_.Empty()) {
+ FlushImpl();
+ }
+}
+
+void TCodedOutput::DoFinish() {
+ if (S_) {
+ Y_DEFER {
+ S_ = nullptr;
+ };
+
+ if (FlushImpl()) {
+ //always write zero-length block as eos marker
+ FlushImpl();
+ }
+ }
+}
+
+TDecodedInput::TDecodedInput(IInputStream* in)
+ : S_(in)
+ , C_(nullptr)
+{
+}
+
+TDecodedInput::TDecodedInput(IInputStream* in, const ICodec* codec)
+ : S_(in)
+ , C_(codec)
+{
+}
+
+TDecodedInput::~TDecodedInput() = default;
+
+size_t TDecodedInput::DoUnboundedNext(const void** ptr) {
+ if (!S_) {
+ return 0;
+ }
+
+ TCodecID codecId;
+ TBlockLen blockLen;
+
+ {
+ const size_t payload = sizeof(TCodecID) + sizeof(TBlockLen);
+ char buf[32];
+
+ S_->LoadOrFail(buf, payload);
+
+ TMemoryInput in(buf, payload);
+
+ ::Load(&in, codecId);
+ ::Load(&in, blockLen);
+ }
+
+ if (!blockLen) {
+ S_ = nullptr;
+
+ return 0;
+ }
+
+ if (Y_UNLIKELY(blockLen > 1024 * 1024 * 1024)) {
+ ythrow yexception() << "block size exceeds 1 GiB";
+ }
+
+ TBuffer block;
+ block.Resize(blockLen);
+
+ S_->LoadOrFail(block.Data(), blockLen);
+
+ auto codec = CodecByID(codecId);
+
+ if (C_) {
+ Y_ENSURE(C_->Name() == codec->Name(), TStringBuf("incorrect stream codec"));
+ }
+
+ if (codec->DecompressedLength(block) > MAX_BUF_LEN) {
+ ythrow yexception() << "broken stream";
+ }
+
+ codec->Decode(block, D_);
+ *ptr = D_.Data();
+
+ return D_.Size();
+}
diff --git a/library/cpp/blockcodecs/core/stream.h b/library/cpp/blockcodecs/core/stream.h
new file mode 100644
index 0000000000..fd44ef88f2
--- /dev/null
+++ b/library/cpp/blockcodecs/core/stream.h
@@ -0,0 +1,46 @@
+#pragma once
+
+#include <util/stream/walk.h>
+#include <util/stream/input.h>
+#include <util/stream/output.h>
+#include <util/stream/zerocopy.h>
+#include <util/generic/buffer.h>
+
+namespace NBlockCodecs {
+ struct ICodec;
+
+ class TCodedOutput: public IOutputStream {
+ public:
+ TCodedOutput(IOutputStream* out, const ICodec* c, size_t bufLen);
+ ~TCodedOutput() override;
+
+ private:
+ void DoWrite(const void* buf, size_t len) override;
+ void DoFlush() override;
+ void DoFinish() override;
+
+ bool FlushImpl();
+
+ private:
+ const ICodec* C_;
+ TBuffer D_;
+ TBuffer O_;
+ IOutputStream* S_;
+ };
+
+ class TDecodedInput: public IWalkInput {
+ public:
+ TDecodedInput(IInputStream* in);
+ TDecodedInput(IInputStream* in, const ICodec* codec);
+
+ ~TDecodedInput() override;
+
+ private:
+ size_t DoUnboundedNext(const void** ptr) override;
+
+ private:
+ TBuffer D_;
+ IInputStream* S_;
+ const ICodec* C_;
+ };
+}
diff --git a/library/cpp/blockcodecs/core/ya.make b/library/cpp/blockcodecs/core/ya.make
new file mode 100644
index 0000000000..069e15927b
--- /dev/null
+++ b/library/cpp/blockcodecs/core/ya.make
@@ -0,0 +1,10 @@
+LIBRARY()
+
+OWNER(pg)
+
+SRCS(
+ codecs.cpp
+ stream.cpp
+)
+
+END()