diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/http/io/chunk.cpp | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/http/io/chunk.cpp')
-rw-r--r-- | library/cpp/http/io/chunk.cpp | 246 |
1 files changed, 246 insertions, 0 deletions
diff --git a/library/cpp/http/io/chunk.cpp b/library/cpp/http/io/chunk.cpp new file mode 100644 index 00000000000..6975d9eac1e --- /dev/null +++ b/library/cpp/http/io/chunk.cpp @@ -0,0 +1,246 @@ +#include "chunk.h" + +#include "headers.h" + +#include <util/string/cast.h> +#include <util/generic/utility.h> +#include <util/generic/yexception.h> + +static inline size_t ParseHex(const TString& s) { + if (s.empty()) { + ythrow yexception() << "can not parse chunk length(empty string)"; + } + + size_t ret = 0; + + for (TString::const_iterator c = s.begin(); c != s.end(); ++c) { + const char ch = *c; + + if (ch >= '0' && ch <= '9') { + ret *= 16; + ret += ch - '0'; + } else if (ch >= 'a' && ch <= 'f') { + ret *= 16; + ret += 10 + ch - 'a'; + } else if (ch >= 'A' && ch <= 'F') { + ret *= 16; + ret += 10 + ch - 'A'; + } else if (ch == ';') { + break; + } else if (isspace(ch)) { + continue; + } else { + ythrow yexception() << "can not parse chunk length(" << s.data() << ")"; + } + } + + return ret; +} + +static inline char* ToHex(size_t len, char* buf) { + do { + const size_t val = len % 16; + + *--buf = (val < 10) ? (val + '0') : (val - 10 + 'a'); + len /= 16; + } while (len); + + return buf; +} + +class TChunkedInput::TImpl { +public: + inline TImpl(IInputStream* slave, TMaybe<THttpHeaders>* trailers) + : Slave_(slave) + , Trailers_(trailers) + , Pending_(0) + , LastChunkReaded_(false) + { + if (Trailers_) { + Trailers_->Clear(); + } + } + + inline ~TImpl() { + } + + inline size_t Read(void* buf, size_t len) { + return Perform(len, [this, buf](size_t toRead) { return Slave_->Read(buf, toRead); }); + } + + inline size_t Skip(size_t len) { + return Perform(len, [this](size_t toSkip) { return Slave_->Skip(toSkip); }); + } + +private: + template <class Operation> + inline size_t Perform(size_t len, const Operation& operation) { + if (!HavePendingData()) { + return 0; + } + + const size_t toProcess = Min(Pending_, len); + + if (toProcess) { + const size_t processed = operation(toProcess); + + if (!processed) { + ythrow yexception() << "malformed http chunk"; + } + + Pending_ -= processed; + + return processed; + } + + return 0; + } + + inline bool HavePendingData() { + if (LastChunkReaded_) { + return false; + } + + if (!Pending_) { + if (!ProceedToNextChunk()) { + return false; + } + } + + return true; + } + + inline bool ProceedToNextChunk() { + TString len(Slave_->ReadLine()); + + if (len.empty()) { + /* + * skip crlf from previous chunk + */ + + len = Slave_->ReadLine(); + } + + Pending_ = ParseHex(len); + + if (Pending_) { + return true; + } + + if (Trailers_) { + Trailers_->ConstructInPlace(Slave_); + } + LastChunkReaded_ = true; + + return false; + } + +private: + IInputStream* Slave_; + TMaybe<THttpHeaders>* Trailers_; + size_t Pending_; + bool LastChunkReaded_; +}; + +TChunkedInput::TChunkedInput(IInputStream* slave, TMaybe<THttpHeaders>* trailers) + : Impl_(new TImpl(slave, trailers)) +{ +} + +TChunkedInput::~TChunkedInput() { +} + +size_t TChunkedInput::DoRead(void* buf, size_t len) { + return Impl_->Read(buf, len); +} + +size_t TChunkedInput::DoSkip(size_t len) { + return Impl_->Skip(len); +} + +class TChunkedOutput::TImpl { + typedef IOutputStream::TPart TPart; + +public: + inline TImpl(IOutputStream* slave) + : Slave_(slave) + { + } + + inline ~TImpl() { + } + + inline void Write(const void* buf, size_t len) { + const char* ptr = (const char*)buf; + + while (len) { + const size_t portion = Min<size_t>(len, 1024 * 16); + + WriteImpl(ptr, portion); + + ptr += portion; + len -= portion; + } + } + + inline void WriteImpl(const void* buf, size_t len) { + char tmp[32]; + char* e = tmp + sizeof(tmp); + char* b = ToHex(len, e); + + const TPart parts[] = { + TPart(b, e - b), + TPart::CrLf(), + TPart(buf, len), + TPart::CrLf(), + }; + + Slave_->Write(parts, sizeof(parts) / sizeof(*parts)); + } + + inline void Flush() { + Slave_->Flush(); + } + + inline void Finish() { + Slave_->Write("0\r\n\r\n", 5); + + Flush(); + } + +private: + IOutputStream* Slave_; +}; + +TChunkedOutput::TChunkedOutput(IOutputStream* slave) + : Impl_(new TImpl(slave)) +{ +} + +TChunkedOutput::~TChunkedOutput() { + try { + Finish(); + } catch (...) { + } +} + +void TChunkedOutput::DoWrite(const void* buf, size_t len) { + if (Impl_.Get()) { + Impl_->Write(buf, len); + } else { + ythrow yexception() << "can not write to finished stream"; + } +} + +void TChunkedOutput::DoFlush() { + if (Impl_.Get()) { + Impl_->Flush(); + } +} + +void TChunkedOutput::DoFinish() { + if (Impl_.Get()) { + Impl_->Finish(); + Impl_.Destroy(); + } +} |