diff options
author | Anton Samokhvalov <pg83@yandex.ru> | 2022-02-10 16:45:15 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:15 +0300 |
commit | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch) | |
tree | da2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /library/cpp/blockcodecs/core/stream.cpp | |
parent | 778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff) | |
download | ydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz |
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/blockcodecs/core/stream.cpp')
-rw-r--r-- | library/cpp/blockcodecs/core/stream.cpp | 362 |
1 files changed, 181 insertions, 181 deletions
diff --git a/library/cpp/blockcodecs/core/stream.cpp b/library/cpp/blockcodecs/core/stream.cpp index 4f7db3c32b..c0134dea28 100644 --- a/library/cpp/blockcodecs/core/stream.cpp +++ b/library/cpp/blockcodecs/core/stream.cpp @@ -1,212 +1,212 @@ -#include "stream.h" -#include "codecs.h" - -#include <util/digest/murmur.h> -#include <util/generic/scope.h> -#include <util/generic/cast.h> -#include <util/generic/hash.h> -#include <util/generic/singleton.h> -#include <util/stream/mem.h> -#include <util/ysaveload.h> - -using namespace NBlockCodecs; - -namespace { +#include "stream.h" +#include "codecs.h" + +#include <util/digest/murmur.h> +#include <util/generic/scope.h> +#include <util/generic/cast.h> +#include <util/generic/hash.h> +#include <util/generic/singleton.h> +#include <util/stream/mem.h> +#include <util/ysaveload.h> + +using namespace NBlockCodecs; + +namespace { constexpr size_t MAX_BUF_LEN = 128 * 1024 * 1024; - - typedef ui16 TCodecID; - typedef ui64 TBlockLen; - - struct TIds { - inline TIds() { - const TCodecList lst = ListAllCodecs(); - + + typedef ui16 TCodecID; + typedef ui64 TBlockLen; + + struct TIds { + inline TIds() { + const TCodecList lst = ListAllCodecs(); + for (size_t i = 0; i < lst.size(); ++i) { - const ICodec* c = Codec(lst[i]); - - ByID[CodecID(c)] = c; - } - } - - static inline TCodecID CodecID(const ICodec* c) { - const TStringBuf name = c->Name(); - - union { - ui16 Parts[2]; - ui32 Data; - } x; - + const ICodec* c = Codec(lst[i]); + + ByID[CodecID(c)] = c; + } + } + + static inline TCodecID CodecID(const ICodec* c) { + const TStringBuf name = c->Name(); + + union { + ui16 Parts[2]; + ui32 Data; + } x; + x.Data = MurmurHash<ui32>(name.data(), name.size()); - - return x.Parts[1] ^ x.Parts[0]; - } - - inline const ICodec* Find(TCodecID id) const { - TByID::const_iterator it = ByID.find(id); - - if (it != ByID.end()) { - return it->second; - } - - ythrow yexception() << "can not find codec by id " << id; - } - + + return x.Parts[1] ^ x.Parts[0]; + } + + inline const ICodec* Find(TCodecID id) const { + TByID::const_iterator it = ByID.find(id); + + if (it != ByID.end()) { + return it->second; + } + + ythrow yexception() << "can not find codec by id " << id; + } + typedef THashMap<TCodecID, const ICodec*> TByID; - TByID ByID; - }; - + TByID ByID; + }; + TCodecID CodecID(const ICodec* c) { - return TIds::CodecID(c); - } - + return TIds::CodecID(c); + } + const ICodec* CodecByID(TCodecID id) { - return Singleton<TIds>()->Find(id); - } -} - + return Singleton<TIds>()->Find(id); + } +} + TCodedOutput::TCodedOutput(IOutputStream* out, const ICodec* c, size_t bufLen) - : C_(c) - , D_(bufLen) - , S_(out) -{ - if (bufLen > MAX_BUF_LEN) { + : C_(c) + , D_(bufLen) + , S_(out) +{ + if (bufLen > MAX_BUF_LEN) { ythrow yexception() << TStringBuf("too big buffer size: ") << bufLen; - } -} - + } +} + TCodedOutput::~TCodedOutput() { - try { - Finish(); - } catch (...) { - } -} - -void TCodedOutput::DoWrite(const void* buf, size_t len) { - const char* in = (const char*)buf; - - while (len) { - const size_t avail = D_.Avail(); - - if (len < avail) { - D_.Append(in, len); - - return; - } - - D_.Append(in, avail); - + try { + Finish(); + } catch (...) { + } +} + +void TCodedOutput::DoWrite(const void* buf, size_t len) { + const char* in = (const char*)buf; + + while (len) { + const size_t avail = D_.Avail(); + + if (len < avail) { + D_.Append(in, len); + + return; + } + + D_.Append(in, avail); + Y_ASSERT(!D_.Avail()); - - in += avail; - len -= avail; - + + in += avail; + len -= avail; + Y_VERIFY(FlushImpl(), "flush on writing failed"); - } -} - -bool TCodedOutput::FlushImpl() { - const bool ret = !D_.Empty(); - const size_t payload = sizeof(TCodecID) + sizeof(TBlockLen); - O_.Reserve(C_->MaxCompressedLength(D_) + payload); - - void* out = O_.Data() + payload; - const size_t olen = C_->Compress(D_, out); - - { - TMemoryOutput mo(O_.Data(), payload); - - ::Save(&mo, CodecID(C_)); - ::Save(&mo, SafeIntegerCast<TBlockLen>(olen)); - } - - S_->Write(O_.Data(), payload + olen); - - D_.Clear(); - O_.Clear(); - - return ret; -} - -void TCodedOutput::DoFlush() { - if (S_ && !D_.Empty()) { - FlushImpl(); - } -} - -void TCodedOutput::DoFinish() { - if (S_) { - Y_DEFER { + } +} + +bool TCodedOutput::FlushImpl() { + const bool ret = !D_.Empty(); + const size_t payload = sizeof(TCodecID) + sizeof(TBlockLen); + O_.Reserve(C_->MaxCompressedLength(D_) + payload); + + void* out = O_.Data() + payload; + const size_t olen = C_->Compress(D_, out); + + { + TMemoryOutput mo(O_.Data(), payload); + + ::Save(&mo, CodecID(C_)); + ::Save(&mo, SafeIntegerCast<TBlockLen>(olen)); + } + + S_->Write(O_.Data(), payload + olen); + + D_.Clear(); + O_.Clear(); + + return ret; +} + +void TCodedOutput::DoFlush() { + if (S_ && !D_.Empty()) { + FlushImpl(); + } +} + +void TCodedOutput::DoFinish() { + if (S_) { + Y_DEFER { S_ = nullptr; - }; - - if (FlushImpl()) { - //always write zero-length block as eos marker - FlushImpl(); - } - } -} - + }; + + if (FlushImpl()) { + //always write zero-length block as eos marker + FlushImpl(); + } + } +} + TDecodedInput::TDecodedInput(IInputStream* in) - : S_(in) - , C_(nullptr) -{ -} - -TDecodedInput::TDecodedInput(IInputStream* in, const ICodec* codec) - : S_(in) - , C_(codec) -{ -} - + : S_(in) + , C_(nullptr) +{ +} + +TDecodedInput::TDecodedInput(IInputStream* in, const ICodec* codec) + : S_(in) + , C_(codec) +{ +} + TDecodedInput::~TDecodedInput() = default; - + size_t TDecodedInput::DoUnboundedNext(const void** ptr) { - if (!S_) { + if (!S_) { return 0; - } - + } + TCodecID codecId; TBlockLen blockLen; - - { - const size_t payload = sizeof(TCodecID) + sizeof(TBlockLen); - char buf[32]; - + + { + const size_t payload = sizeof(TCodecID) + sizeof(TBlockLen); + char buf[32]; + S_->LoadOrFail(buf, payload); - - TMemoryInput in(buf, payload); - + + TMemoryInput in(buf, payload); + ::Load(&in, codecId); ::Load(&in, blockLen); - } - + } + if (!blockLen) { S_ = nullptr; - + return 0; - } - + } + if (Y_UNLIKELY(blockLen > 1024 * 1024 * 1024)) { - ythrow yexception() << "block size exceeds 1 GiB"; + ythrow yexception() << "block size exceeds 1 GiB"; } - TBuffer block; + TBuffer block; block.Resize(blockLen); - + S_->LoadOrFail(block.Data(), blockLen); - - auto codec = CodecByID(codecId); - - if (C_) { + + auto codec = CodecByID(codecId); + + if (C_) { Y_ENSURE(C_->Name() == codec->Name(), TStringBuf("incorrect stream codec")); - } - - if (codec->DecompressedLength(block) > MAX_BUF_LEN) { - ythrow yexception() << "broken stream"; - } - - codec->Decode(block, D_); - *ptr = D_.Data(); - + } + + if (codec->DecompressedLength(block) > MAX_BUF_LEN) { + ythrow yexception() << "broken stream"; + } + + codec->Decode(block, D_); + *ptr = D_.Data(); + return D_.Size(); -} +} |