aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/monlib/encode/spack/compression.cpp
diff options
context:
space:
mode:
authorSergey Polovko <sergey@polovko.me>2022-02-10 16:47:02 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:47:02 +0300
commit3e0b762a82514bac89c1dd6ea7211e381d8aa248 (patch)
treec2d1b379ecaf05ca8f11ed0b5da9d1a950e6e554 /library/cpp/monlib/encode/spack/compression.cpp
parentab3783171cc30e262243a0227c86118f7080c896 (diff)
downloadydb-3e0b762a82514bac89c1dd6ea7211e381d8aa248.tar.gz
Restoring authorship annotation for Sergey Polovko <sergey@polovko.me>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/monlib/encode/spack/compression.cpp')
-rw-r--r--library/cpp/monlib/encode/spack/compression.cpp602
1 files changed, 301 insertions, 301 deletions
diff --git a/library/cpp/monlib/encode/spack/compression.cpp b/library/cpp/monlib/encode/spack/compression.cpp
index 0d2152fc85..0ad1eee866 100644
--- a/library/cpp/monlib/encode/spack/compression.cpp
+++ b/library/cpp/monlib/encode/spack/compression.cpp
@@ -1,213 +1,213 @@
-#include "compression.h"
-
+#include "compression.h"
+
#include <util/generic/buffer.h>
-#include <util/generic/cast.h>
-#include <util/generic/ptr.h>
+#include <util/generic/cast.h>
+#include <util/generic/ptr.h>
#include <util/generic/scope.h>
#include <util/generic/size_literals.h>
#include <util/stream/format.h>
#include <util/stream/output.h>
-#include <util/stream/walk.h>
-
-#include <contrib/libs/lz4/lz4.h>
+#include <util/stream/walk.h>
+
+#include <contrib/libs/lz4/lz4.h>
#include <contrib/libs/xxhash/xxhash.h>
-#include <contrib/libs/zlib/zlib.h>
-#define ZSTD_STATIC_LINKING_ONLY
+#include <contrib/libs/zlib/zlib.h>
+#define ZSTD_STATIC_LINKING_ONLY
#include <contrib/libs/zstd/include/zstd.h>
-
-namespace NMonitoring {
- namespace {
- ///////////////////////////////////////////////////////////////////////////////
- // Frame
- ///////////////////////////////////////////////////////////////////////////////
- using TCompressedSize = ui32;
- using TUncompressedSize = ui32;
- using TCheckSum = ui32;
-
+
+namespace NMonitoring {
+ namespace {
+ ///////////////////////////////////////////////////////////////////////////////
+ // Frame
+ ///////////////////////////////////////////////////////////////////////////////
+ using TCompressedSize = ui32;
+ using TUncompressedSize = ui32;
+ using TCheckSum = ui32;
+
constexpr size_t COMPRESSED_FRAME_SIZE_LIMIT = 512_KB;
constexpr size_t UNCOMPRESSED_FRAME_SIZE_LIMIT = COMPRESSED_FRAME_SIZE_LIMIT;
constexpr size_t FRAME_SIZE_LIMIT = 2_MB;
constexpr size_t DEFAULT_FRAME_LEN = 64_KB;
-
+
struct Y_PACKED TFrameHeader {
- TCompressedSize CompressedSize;
- TUncompressedSize UncompressedSize;
- };
-
+ TCompressedSize CompressedSize;
+ TUncompressedSize UncompressedSize;
+ };
+
struct Y_PACKED TFrameFooter {
- TCheckSum CheckSum;
- };
-
- ///////////////////////////////////////////////////////////////////////////////
- // TBlock
- ///////////////////////////////////////////////////////////////////////////////
- struct TBlock: public TStringBuf {
- template <typename T>
- TBlock(T&& t)
+ TCheckSum CheckSum;
+ };
+
+ ///////////////////////////////////////////////////////////////////////////////
+ // TBlock
+ ///////////////////////////////////////////////////////////////////////////////
+ struct TBlock: public TStringBuf {
+ template <typename T>
+ TBlock(T&& t)
: TStringBuf(t.data(), t.size())
- {
+ {
Y_ENSURE(t.data() != nullptr);
- }
-
+ }
+
char* data() noexcept {
return const_cast<char*>(TStringBuf::data());
- }
- };
-
- ///////////////////////////////////////////////////////////////////////////////
- // XXHASH
- ///////////////////////////////////////////////////////////////////////////////
- struct TXxHash32 {
- static TCheckSum Calc(TBlock in) {
- static const ui32 SEED = 0x1337c0de;
+ }
+ };
+
+ ///////////////////////////////////////////////////////////////////////////////
+ // XXHASH
+ ///////////////////////////////////////////////////////////////////////////////
+ struct TXxHash32 {
+ static TCheckSum Calc(TBlock in) {
+ static const ui32 SEED = 0x1337c0de;
return XXH32(in.data(), in.size(), SEED);
- }
-
- static bool Check(TBlock in, TCheckSum checksum) {
- return Calc(in) == checksum;
- }
- };
-
- ///////////////////////////////////////////////////////////////////////////////
- // Adler32
- ///////////////////////////////////////////////////////////////////////////////
- struct TAdler32 {
- static TCheckSum Calc(TBlock in) {
+ }
+
+ static bool Check(TBlock in, TCheckSum checksum) {
+ return Calc(in) == checksum;
+ }
+ };
+
+ ///////////////////////////////////////////////////////////////////////////////
+ // Adler32
+ ///////////////////////////////////////////////////////////////////////////////
+ struct TAdler32 {
+ static TCheckSum Calc(TBlock in) {
return adler32(1L, reinterpret_cast<const Bytef*>(in.data()), in.size());
- }
-
- static bool Check(TBlock in, TCheckSum checksum) {
- return Calc(in) == checksum;
- }
- };
-
- ///////////////////////////////////////////////////////////////////////////////
- // LZ4
- ///////////////////////////////////////////////////////////////////////////////
- struct TLz4Codec {
- static size_t MaxCompressedLength(size_t in) {
- int result = LZ4_compressBound(static_cast<int>(in));
- Y_ENSURE(result != 0, "lz4 input size is too large");
- return result;
- }
-
- static size_t Compress(TBlock in, TBlock out) {
- int rc = LZ4_compress_default(
+ }
+
+ static bool Check(TBlock in, TCheckSum checksum) {
+ return Calc(in) == checksum;
+ }
+ };
+
+ ///////////////////////////////////////////////////////////////////////////////
+ // LZ4
+ ///////////////////////////////////////////////////////////////////////////////
+ struct TLz4Codec {
+ static size_t MaxCompressedLength(size_t in) {
+ int result = LZ4_compressBound(static_cast<int>(in));
+ Y_ENSURE(result != 0, "lz4 input size is too large");
+ return result;
+ }
+
+ static size_t Compress(TBlock in, TBlock out) {
+ int rc = LZ4_compress_default(
in.data(),
out.data(),
SafeIntegerCast<int>(in.size()),
SafeIntegerCast<int>(out.size()));
- Y_ENSURE(rc != 0, "lz4 compression failed");
- return rc;
- }
-
- static void Decompress(TBlock in, TBlock out) {
- int rc = LZ4_decompress_safe(
+ Y_ENSURE(rc != 0, "lz4 compression failed");
+ return rc;
+ }
+
+ static void Decompress(TBlock in, TBlock out) {
+ int rc = LZ4_decompress_safe(
in.data(),
out.data(),
SafeIntegerCast<int>(in.size()),
SafeIntegerCast<int>(out.size()));
- Y_ENSURE(rc >= 0, "the lz4 stream is detected malformed");
- }
- };
-
- ///////////////////////////////////////////////////////////////////////////////
- // ZSTD
- ///////////////////////////////////////////////////////////////////////////////
- struct TZstdCodec {
- static const int LEVEL = 11;
-
- static size_t MaxCompressedLength(size_t in) {
- return ZSTD_compressBound(in);
- }
-
- static size_t Compress(TBlock in, TBlock out) {
+ Y_ENSURE(rc >= 0, "the lz4 stream is detected malformed");
+ }
+ };
+
+ ///////////////////////////////////////////////////////////////////////////////
+ // ZSTD
+ ///////////////////////////////////////////////////////////////////////////////
+ struct TZstdCodec {
+ static const int LEVEL = 11;
+
+ static size_t MaxCompressedLength(size_t in) {
+ return ZSTD_compressBound(in);
+ }
+
+ static size_t Compress(TBlock in, TBlock out) {
size_t rc = ZSTD_compress(out.data(), out.size(), in.data(), in.size(), LEVEL);
- if (Y_UNLIKELY(ZSTD_isError(rc))) {
+ if (Y_UNLIKELY(ZSTD_isError(rc))) {
ythrow yexception() << TStringBuf("zstd compression failed: ")
- << ZSTD_getErrorName(rc);
- }
- return rc;
- }
-
- static void Decompress(TBlock in, TBlock out) {
+ << ZSTD_getErrorName(rc);
+ }
+ return rc;
+ }
+
+ static void Decompress(TBlock in, TBlock out) {
size_t rc = ZSTD_decompress(out.data(), out.size(), in.data(), in.size());
- if (Y_UNLIKELY(ZSTD_isError(rc))) {
+ if (Y_UNLIKELY(ZSTD_isError(rc))) {
ythrow yexception() << TStringBuf("zstd decompression failed: ")
- << ZSTD_getErrorName(rc);
- }
+ << ZSTD_getErrorName(rc);
+ }
Y_ENSURE(rc == out.size(), "zstd decompressed wrong size");
- }
- };
-
- ///////////////////////////////////////////////////////////////////////////////
- // ZLIB
- ///////////////////////////////////////////////////////////////////////////////
- struct TZlibCodec {
- static const int LEVEL = 6;
-
- static size_t MaxCompressedLength(size_t in) {
- return compressBound(in);
- }
-
- static size_t Compress(TBlock in, TBlock out) {
+ }
+ };
+
+ ///////////////////////////////////////////////////////////////////////////////
+ // ZLIB
+ ///////////////////////////////////////////////////////////////////////////////
+ struct TZlibCodec {
+ static const int LEVEL = 6;
+
+ static size_t MaxCompressedLength(size_t in) {
+ return compressBound(in);
+ }
+
+ static size_t Compress(TBlock in, TBlock out) {
uLong ret = out.size();
- int rc = compress2(
+ int rc = compress2(
reinterpret_cast<Bytef*>(out.data()),
- &ret,
+ &ret,
reinterpret_cast<const Bytef*>(in.data()),
in.size(),
- LEVEL);
- Y_ENSURE(rc == Z_OK, "zlib compression failed");
- return ret;
- }
-
- static void Decompress(TBlock in, TBlock out) {
+ LEVEL);
+ Y_ENSURE(rc == Z_OK, "zlib compression failed");
+ return ret;
+ }
+
+ static void Decompress(TBlock in, TBlock out) {
uLong ret = out.size();
- int rc = uncompress(
+ int rc = uncompress(
reinterpret_cast<Bytef*>(out.data()),
- &ret,
+ &ret,
reinterpret_cast<const Bytef*>(in.data()),
in.size());
- Y_ENSURE(rc == Z_OK, "zlib decompression failed");
+ Y_ENSURE(rc == Z_OK, "zlib decompression failed");
Y_ENSURE(ret == out.size(), "zlib decompressed wrong size");
- }
- };
-
- //
- // Framed streams use next frame structure:
- //
- // +-----------------+-------------------+============+------------------+
- // | compressed size | uncompressed size | data | check sum |
- // +-----------------+-------------------+============+------------------+
- // 4 bytes 4 bytes var len 4 bytes
- //
-
- ///////////////////////////////////////////////////////////////////////////////
- // TFramedInputStream
- ///////////////////////////////////////////////////////////////////////////////
- template <typename TCodecAlg, typename TCheckSumAlg>
- class TFramedDecompressStream final: public IWalkInput {
- public:
- explicit TFramedDecompressStream(IInputStream* in)
- : In_(in)
- {
- }
-
- private:
- size_t DoUnboundedNext(const void** ptr) override {
- if (!In_) {
- return 0;
- }
-
- TFrameHeader header;
- In_->LoadOrFail(&header, sizeof(header));
-
- if (header.CompressedSize == 0) {
- In_ = nullptr;
- return 0;
- }
-
+ }
+ };
+
+ //
+ // Framed streams use next frame structure:
+ //
+ // +-----------------+-------------------+============+------------------+
+ // | compressed size | uncompressed size | data | check sum |
+ // +-----------------+-------------------+============+------------------+
+ // 4 bytes 4 bytes var len 4 bytes
+ //
+
+ ///////////////////////////////////////////////////////////////////////////////
+ // TFramedInputStream
+ ///////////////////////////////////////////////////////////////////////////////
+ template <typename TCodecAlg, typename TCheckSumAlg>
+ class TFramedDecompressStream final: public IWalkInput {
+ public:
+ explicit TFramedDecompressStream(IInputStream* in)
+ : In_(in)
+ {
+ }
+
+ private:
+ size_t DoUnboundedNext(const void** ptr) override {
+ if (!In_) {
+ return 0;
+ }
+
+ TFrameHeader header;
+ In_->LoadOrFail(&header, sizeof(header));
+
+ if (header.CompressedSize == 0) {
+ In_ = nullptr;
+ return 0;
+ }
+
Y_ENSURE(header.CompressedSize <= COMPRESSED_FRAME_SIZE_LIMIT, "Compressed frame size is limited to "
<< HumanReadableSize(COMPRESSED_FRAME_SIZE_LIMIT, SF_BYTES)
<< " but is " << HumanReadableSize(header.CompressedSize, SF_BYTES));
@@ -216,87 +216,87 @@ namespace NMonitoring {
<< HumanReadableSize(UNCOMPRESSED_FRAME_SIZE_LIMIT, SF_BYTES)
<< " but is " << HumanReadableSize(header.UncompressedSize, SF_BYTES));
- Compressed_.Resize(header.CompressedSize);
- In_->LoadOrFail(Compressed_.Data(), header.CompressedSize);
-
- TFrameFooter footer;
- In_->LoadOrFail(&footer, sizeof(footer));
- Y_ENSURE(TCheckSumAlg::Check(Compressed_, footer.CheckSum),
- "corrupted stream: check sum mismatch");
-
- Uncompressed_.Resize(header.UncompressedSize);
- TCodecAlg::Decompress(Compressed_, Uncompressed_);
-
- *ptr = Uncompressed_.Data();
- return Uncompressed_.Size();
- }
-
- private:
- IInputStream* In_;
- TBuffer Compressed_;
- TBuffer Uncompressed_;
- };
-
- ///////////////////////////////////////////////////////////////////////////////
- // TFramedOutputStream
- ///////////////////////////////////////////////////////////////////////////////
- template <typename TCodecAlg, typename TCheckSumAlg>
+ Compressed_.Resize(header.CompressedSize);
+ In_->LoadOrFail(Compressed_.Data(), header.CompressedSize);
+
+ TFrameFooter footer;
+ In_->LoadOrFail(&footer, sizeof(footer));
+ Y_ENSURE(TCheckSumAlg::Check(Compressed_, footer.CheckSum),
+ "corrupted stream: check sum mismatch");
+
+ Uncompressed_.Resize(header.UncompressedSize);
+ TCodecAlg::Decompress(Compressed_, Uncompressed_);
+
+ *ptr = Uncompressed_.Data();
+ return Uncompressed_.Size();
+ }
+
+ private:
+ IInputStream* In_;
+ TBuffer Compressed_;
+ TBuffer Uncompressed_;
+ };
+
+ ///////////////////////////////////////////////////////////////////////////////
+ // TFramedOutputStream
+ ///////////////////////////////////////////////////////////////////////////////
+ template <typename TCodecAlg, typename TCheckSumAlg>
class TFramedCompressStream final: public IFramedCompressStream {
- public:
- explicit TFramedCompressStream(IOutputStream* out)
- : Out_(out)
- , Uncompressed_(DEFAULT_FRAME_LEN)
- {
- }
-
+ public:
+ explicit TFramedCompressStream(IOutputStream* out)
+ : Out_(out)
+ , Uncompressed_(DEFAULT_FRAME_LEN)
+ {
+ }
+
~TFramedCompressStream() override {
- try {
- Finish();
- } catch (...) {
- }
- }
-
- private:
- void DoWrite(const void* buf, size_t len) override {
- const char* in = static_cast<const char*>(buf);
-
- while (len != 0) {
- const size_t avail = Uncompressed_.Avail();
- if (len < avail) {
- Uncompressed_.Append(in, len);
- return;
- }
-
- Uncompressed_.Append(in, avail);
- Y_ASSERT(Uncompressed_.Avail() == 0);
-
- in += avail;
- len -= avail;
-
- WriteCompressedFrame();
- }
- }
-
+ try {
+ Finish();
+ } catch (...) {
+ }
+ }
+
+ private:
+ void DoWrite(const void* buf, size_t len) override {
+ const char* in = static_cast<const char*>(buf);
+
+ while (len != 0) {
+ const size_t avail = Uncompressed_.Avail();
+ if (len < avail) {
+ Uncompressed_.Append(in, len);
+ return;
+ }
+
+ Uncompressed_.Append(in, avail);
+ Y_ASSERT(Uncompressed_.Avail() == 0);
+
+ in += avail;
+ len -= avail;
+
+ WriteCompressedFrame();
+ }
+ }
+
void FlushWithoutEmptyFrame() override {
- if (Out_ && !Uncompressed_.Empty()) {
- WriteCompressedFrame();
- }
- }
-
+ if (Out_ && !Uncompressed_.Empty()) {
+ WriteCompressedFrame();
+ }
+ }
+
void FinishAndWriteEmptyFrame() override {
- if (Out_) {
+ if (Out_) {
Y_DEFER {
- Out_ = nullptr;
+ Out_ = nullptr;
};
if (!Uncompressed_.Empty()) {
WriteCompressedFrame();
- }
+ }
WriteEmptyFrame();
- }
- }
-
+ }
+ }
+
void DoFlush() override {
FlushWithoutEmptyFrame();
}
@@ -305,79 +305,79 @@ namespace NMonitoring {
FinishAndWriteEmptyFrame();
}
- void WriteCompressedFrame() {
- static const auto framePayload = sizeof(TFrameHeader) + sizeof(TFrameFooter);
+ void WriteCompressedFrame() {
+ static const auto framePayload = sizeof(TFrameHeader) + sizeof(TFrameFooter);
const auto maxFrameSize = ui64(TCodecAlg::MaxCompressedLength(Uncompressed_.Size())) + framePayload;
Y_ENSURE(maxFrameSize <= FRAME_SIZE_LIMIT, "Frame size in encoder is limited to "
<< HumanReadableSize(FRAME_SIZE_LIMIT, SF_BYTES)
<< " but is " << HumanReadableSize(maxFrameSize, SF_BYTES));
-
+
Frame_.Resize(maxFrameSize);
- // compress
- TBlock compressedBlock = Frame_;
- compressedBlock.Skip(sizeof(TFrameHeader));
- compressedBlock.Trunc(TCodecAlg::Compress(Uncompressed_, compressedBlock));
-
- // add header
- auto header = reinterpret_cast<TFrameHeader*>(Frame_.Data());
+ // compress
+ TBlock compressedBlock = Frame_;
+ compressedBlock.Skip(sizeof(TFrameHeader));
+ compressedBlock.Trunc(TCodecAlg::Compress(Uncompressed_, compressedBlock));
+
+ // add header
+ auto header = reinterpret_cast<TFrameHeader*>(Frame_.Data());
header->CompressedSize = SafeIntegerCast<TCompressedSize>(compressedBlock.size());
- header->UncompressedSize = SafeIntegerCast<TUncompressedSize>(Uncompressed_.Size());
-
- // add footer
- auto footer = reinterpret_cast<TFrameFooter*>(
- Frame_.Data() + sizeof(TFrameHeader) + header->CompressedSize);
- footer->CheckSum = TCheckSumAlg::Calc(compressedBlock);
-
- // write
- Out_->Write(Frame_.Data(), header->CompressedSize + framePayload);
- Uncompressed_.Clear();
- }
-
- void WriteEmptyFrame() {
- static const auto framePayload = sizeof(TFrameHeader) + sizeof(TFrameFooter);
- char buf[framePayload] = {0};
- Out_->Write(buf, sizeof(buf));
- }
-
- private:
- IOutputStream* Out_;
- TBuffer Uncompressed_;
- TBuffer Frame_;
- };
-
- }
-
- THolder<IInputStream> CompressedInput(IInputStream* in, ECompression alg) {
- switch (alg) {
- case ECompression::IDENTITY:
- return nullptr;
- case ECompression::ZLIB:
+ header->UncompressedSize = SafeIntegerCast<TUncompressedSize>(Uncompressed_.Size());
+
+ // add footer
+ auto footer = reinterpret_cast<TFrameFooter*>(
+ Frame_.Data() + sizeof(TFrameHeader) + header->CompressedSize);
+ footer->CheckSum = TCheckSumAlg::Calc(compressedBlock);
+
+ // write
+ Out_->Write(Frame_.Data(), header->CompressedSize + framePayload);
+ Uncompressed_.Clear();
+ }
+
+ void WriteEmptyFrame() {
+ static const auto framePayload = sizeof(TFrameHeader) + sizeof(TFrameFooter);
+ char buf[framePayload] = {0};
+ Out_->Write(buf, sizeof(buf));
+ }
+
+ private:
+ IOutputStream* Out_;
+ TBuffer Uncompressed_;
+ TBuffer Frame_;
+ };
+
+ }
+
+ THolder<IInputStream> CompressedInput(IInputStream* in, ECompression alg) {
+ switch (alg) {
+ case ECompression::IDENTITY:
+ return nullptr;
+ case ECompression::ZLIB:
return MakeHolder<TFramedDecompressStream<TZlibCodec, TAdler32>>(in);
- case ECompression::ZSTD:
+ case ECompression::ZSTD:
return MakeHolder<TFramedDecompressStream<TZstdCodec, TXxHash32>>(in);
- case ECompression::LZ4:
+ case ECompression::LZ4:
return MakeHolder<TFramedDecompressStream<TLz4Codec, TXxHash32>>(in);
- case ECompression::UNKNOWN:
- return nullptr;
- }
- Y_FAIL("invalid compression algorithm");
- }
-
+ case ECompression::UNKNOWN:
+ return nullptr;
+ }
+ Y_FAIL("invalid compression algorithm");
+ }
+
THolder<IFramedCompressStream> CompressedOutput(IOutputStream* out, ECompression alg) {
- switch (alg) {
- case ECompression::IDENTITY:
- return nullptr;
- case ECompression::ZLIB:
+ switch (alg) {
+ case ECompression::IDENTITY:
+ return nullptr;
+ case ECompression::ZLIB:
return MakeHolder<TFramedCompressStream<TZlibCodec, TAdler32>>(out);
- case ECompression::ZSTD:
+ case ECompression::ZSTD:
return MakeHolder<TFramedCompressStream<TZstdCodec, TXxHash32>>(out);
- case ECompression::LZ4:
+ case ECompression::LZ4:
return MakeHolder<TFramedCompressStream<TLz4Codec, TXxHash32>>(out);
- case ECompression::UNKNOWN:
- return nullptr;
- }
- Y_FAIL("invalid compression algorithm");
- }
-
-}
+ case ECompression::UNKNOWN:
+ return nullptr;
+ }
+ Y_FAIL("invalid compression algorithm");
+ }
+
+}