aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/blockcodecs/core/stream.cpp
diff options
context:
space:
mode:
authorAnton Samokhvalov <pg83@yandex.ru>2022-02-10 16:45:15 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:15 +0300
commit72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch)
treeda2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /library/cpp/blockcodecs/core/stream.cpp
parent778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff)
downloadydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/blockcodecs/core/stream.cpp')
-rw-r--r--library/cpp/blockcodecs/core/stream.cpp362
1 files changed, 181 insertions, 181 deletions
diff --git a/library/cpp/blockcodecs/core/stream.cpp b/library/cpp/blockcodecs/core/stream.cpp
index 4f7db3c32b..c0134dea28 100644
--- a/library/cpp/blockcodecs/core/stream.cpp
+++ b/library/cpp/blockcodecs/core/stream.cpp
@@ -1,212 +1,212 @@
-#include "stream.h"
-#include "codecs.h"
-
-#include <util/digest/murmur.h>
-#include <util/generic/scope.h>
-#include <util/generic/cast.h>
-#include <util/generic/hash.h>
-#include <util/generic/singleton.h>
-#include <util/stream/mem.h>
-#include <util/ysaveload.h>
-
-using namespace NBlockCodecs;
-
-namespace {
+#include "stream.h"
+#include "codecs.h"
+
+#include <util/digest/murmur.h>
+#include <util/generic/scope.h>
+#include <util/generic/cast.h>
+#include <util/generic/hash.h>
+#include <util/generic/singleton.h>
+#include <util/stream/mem.h>
+#include <util/ysaveload.h>
+
+using namespace NBlockCodecs;
+
+namespace {
constexpr size_t MAX_BUF_LEN = 128 * 1024 * 1024;
-
- typedef ui16 TCodecID;
- typedef ui64 TBlockLen;
-
- struct TIds {
- inline TIds() {
- const TCodecList lst = ListAllCodecs();
-
+
+ typedef ui16 TCodecID;
+ typedef ui64 TBlockLen;
+
+ struct TIds {
+ inline TIds() {
+ const TCodecList lst = ListAllCodecs();
+
for (size_t i = 0; i < lst.size(); ++i) {
- const ICodec* c = Codec(lst[i]);
-
- ByID[CodecID(c)] = c;
- }
- }
-
- static inline TCodecID CodecID(const ICodec* c) {
- const TStringBuf name = c->Name();
-
- union {
- ui16 Parts[2];
- ui32 Data;
- } x;
-
+ const ICodec* c = Codec(lst[i]);
+
+ ByID[CodecID(c)] = c;
+ }
+ }
+
+ static inline TCodecID CodecID(const ICodec* c) {
+ const TStringBuf name = c->Name();
+
+ union {
+ ui16 Parts[2];
+ ui32 Data;
+ } x;
+
x.Data = MurmurHash<ui32>(name.data(), name.size());
-
- return x.Parts[1] ^ x.Parts[0];
- }
-
- inline const ICodec* Find(TCodecID id) const {
- TByID::const_iterator it = ByID.find(id);
-
- if (it != ByID.end()) {
- return it->second;
- }
-
- ythrow yexception() << "can not find codec by id " << id;
- }
-
+
+ return x.Parts[1] ^ x.Parts[0];
+ }
+
+ inline const ICodec* Find(TCodecID id) const {
+ TByID::const_iterator it = ByID.find(id);
+
+ if (it != ByID.end()) {
+ return it->second;
+ }
+
+ ythrow yexception() << "can not find codec by id " << id;
+ }
+
typedef THashMap<TCodecID, const ICodec*> TByID;
- TByID ByID;
- };
-
+ TByID ByID;
+ };
+
TCodecID CodecID(const ICodec* c) {
- return TIds::CodecID(c);
- }
-
+ return TIds::CodecID(c);
+ }
+
const ICodec* CodecByID(TCodecID id) {
- return Singleton<TIds>()->Find(id);
- }
-}
-
+ return Singleton<TIds>()->Find(id);
+ }
+}
+
TCodedOutput::TCodedOutput(IOutputStream* out, const ICodec* c, size_t bufLen)
- : C_(c)
- , D_(bufLen)
- , S_(out)
-{
- if (bufLen > MAX_BUF_LEN) {
+ : C_(c)
+ , D_(bufLen)
+ , S_(out)
+{
+ if (bufLen > MAX_BUF_LEN) {
ythrow yexception() << TStringBuf("too big buffer size: ") << bufLen;
- }
-}
-
+ }
+}
+
TCodedOutput::~TCodedOutput() {
- try {
- Finish();
- } catch (...) {
- }
-}
-
-void TCodedOutput::DoWrite(const void* buf, size_t len) {
- const char* in = (const char*)buf;
-
- while (len) {
- const size_t avail = D_.Avail();
-
- if (len < avail) {
- D_.Append(in, len);
-
- return;
- }
-
- D_.Append(in, avail);
-
+ try {
+ Finish();
+ } catch (...) {
+ }
+}
+
+void TCodedOutput::DoWrite(const void* buf, size_t len) {
+ const char* in = (const char*)buf;
+
+ while (len) {
+ const size_t avail = D_.Avail();
+
+ if (len < avail) {
+ D_.Append(in, len);
+
+ return;
+ }
+
+ D_.Append(in, avail);
+
Y_ASSERT(!D_.Avail());
-
- in += avail;
- len -= avail;
-
+
+ in += avail;
+ len -= avail;
+
Y_VERIFY(FlushImpl(), "flush on writing failed");
- }
-}
-
-bool TCodedOutput::FlushImpl() {
- const bool ret = !D_.Empty();
- const size_t payload = sizeof(TCodecID) + sizeof(TBlockLen);
- O_.Reserve(C_->MaxCompressedLength(D_) + payload);
-
- void* out = O_.Data() + payload;
- const size_t olen = C_->Compress(D_, out);
-
- {
- TMemoryOutput mo(O_.Data(), payload);
-
- ::Save(&mo, CodecID(C_));
- ::Save(&mo, SafeIntegerCast<TBlockLen>(olen));
- }
-
- S_->Write(O_.Data(), payload + olen);
-
- D_.Clear();
- O_.Clear();
-
- return ret;
-}
-
-void TCodedOutput::DoFlush() {
- if (S_ && !D_.Empty()) {
- FlushImpl();
- }
-}
-
-void TCodedOutput::DoFinish() {
- if (S_) {
- Y_DEFER {
+ }
+}
+
+bool TCodedOutput::FlushImpl() {
+ const bool ret = !D_.Empty();
+ const size_t payload = sizeof(TCodecID) + sizeof(TBlockLen);
+ O_.Reserve(C_->MaxCompressedLength(D_) + payload);
+
+ void* out = O_.Data() + payload;
+ const size_t olen = C_->Compress(D_, out);
+
+ {
+ TMemoryOutput mo(O_.Data(), payload);
+
+ ::Save(&mo, CodecID(C_));
+ ::Save(&mo, SafeIntegerCast<TBlockLen>(olen));
+ }
+
+ S_->Write(O_.Data(), payload + olen);
+
+ D_.Clear();
+ O_.Clear();
+
+ return ret;
+}
+
+void TCodedOutput::DoFlush() {
+ if (S_ && !D_.Empty()) {
+ FlushImpl();
+ }
+}
+
+void TCodedOutput::DoFinish() {
+ if (S_) {
+ Y_DEFER {
S_ = nullptr;
- };
-
- if (FlushImpl()) {
- //always write zero-length block as eos marker
- FlushImpl();
- }
- }
-}
-
+ };
+
+ if (FlushImpl()) {
+ //always write zero-length block as eos marker
+ FlushImpl();
+ }
+ }
+}
+
TDecodedInput::TDecodedInput(IInputStream* in)
- : S_(in)
- , C_(nullptr)
-{
-}
-
-TDecodedInput::TDecodedInput(IInputStream* in, const ICodec* codec)
- : S_(in)
- , C_(codec)
-{
-}
-
+ : S_(in)
+ , C_(nullptr)
+{
+}
+
+TDecodedInput::TDecodedInput(IInputStream* in, const ICodec* codec)
+ : S_(in)
+ , C_(codec)
+{
+}
+
TDecodedInput::~TDecodedInput() = default;
-
+
size_t TDecodedInput::DoUnboundedNext(const void** ptr) {
- if (!S_) {
+ if (!S_) {
return 0;
- }
-
+ }
+
TCodecID codecId;
TBlockLen blockLen;
-
- {
- const size_t payload = sizeof(TCodecID) + sizeof(TBlockLen);
- char buf[32];
-
+
+ {
+ const size_t payload = sizeof(TCodecID) + sizeof(TBlockLen);
+ char buf[32];
+
S_->LoadOrFail(buf, payload);
-
- TMemoryInput in(buf, payload);
-
+
+ TMemoryInput in(buf, payload);
+
::Load(&in, codecId);
::Load(&in, blockLen);
- }
-
+ }
+
if (!blockLen) {
S_ = nullptr;
-
+
return 0;
- }
-
+ }
+
if (Y_UNLIKELY(blockLen > 1024 * 1024 * 1024)) {
- ythrow yexception() << "block size exceeds 1 GiB";
+ ythrow yexception() << "block size exceeds 1 GiB";
}
- TBuffer block;
+ TBuffer block;
block.Resize(blockLen);
-
+
S_->LoadOrFail(block.Data(), blockLen);
-
- auto codec = CodecByID(codecId);
-
- if (C_) {
+
+ auto codec = CodecByID(codecId);
+
+ if (C_) {
Y_ENSURE(C_->Name() == codec->Name(), TStringBuf("incorrect stream codec"));
- }
-
- if (codec->DecompressedLength(block) > MAX_BUF_LEN) {
- ythrow yexception() << "broken stream";
- }
-
- codec->Decode(block, D_);
- *ptr = D_.Data();
-
+ }
+
+ if (codec->DecompressedLength(block) > MAX_BUF_LEN) {
+ ythrow yexception() << "broken stream";
+ }
+
+ codec->Decode(block, D_);
+ *ptr = D_.Data();
+
return D_.Size();
-}
+}