diff options
| author | Devtools Arcadia <[email protected]> | 2022-02-07 18:08:42 +0300 | 
|---|---|---|
| committer | Devtools Arcadia <[email protected]> | 2022-02-07 18:08:42 +0300 | 
| commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
| tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/http/io/chunk.cpp | |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/http/io/chunk.cpp')
| -rw-r--r-- | library/cpp/http/io/chunk.cpp | 246 | 
1 files changed, 246 insertions, 0 deletions
diff --git a/library/cpp/http/io/chunk.cpp b/library/cpp/http/io/chunk.cpp new file mode 100644 index 00000000000..6975d9eac1e --- /dev/null +++ b/library/cpp/http/io/chunk.cpp @@ -0,0 +1,246 @@ +#include "chunk.h" + +#include "headers.h" + +#include <util/string/cast.h> +#include <util/generic/utility.h> +#include <util/generic/yexception.h> + +static inline size_t ParseHex(const TString& s) { +    if (s.empty()) { +        ythrow yexception() << "can not parse chunk length(empty string)"; +    } + +    size_t ret = 0; + +    for (TString::const_iterator c = s.begin(); c != s.end(); ++c) { +        const char ch = *c; + +        if (ch >= '0' && ch <= '9') { +            ret *= 16; +            ret += ch - '0'; +        } else if (ch >= 'a' && ch <= 'f') { +            ret *= 16; +            ret += 10 + ch - 'a'; +        } else if (ch >= 'A' && ch <= 'F') { +            ret *= 16; +            ret += 10 + ch - 'A'; +        } else if (ch == ';') { +            break; +        } else if (isspace(ch)) { +            continue; +        } else { +            ythrow yexception() << "can not parse chunk length(" << s.data() << ")"; +        } +    } + +    return ret; +} + +static inline char* ToHex(size_t len, char* buf) { +    do { +        const size_t val = len % 16; + +        *--buf = (val < 10) ? (val + '0') : (val - 10 + 'a'); +        len /= 16; +    } while (len); + +    return buf; +} + +class TChunkedInput::TImpl { +public: +    inline TImpl(IInputStream* slave, TMaybe<THttpHeaders>* trailers) +        : Slave_(slave) +        , Trailers_(trailers) +        , Pending_(0) +        , LastChunkReaded_(false) +    { +        if (Trailers_) { +            Trailers_->Clear(); +        } +    } + +    inline ~TImpl() { +    } + +    inline size_t Read(void* buf, size_t len) { +        return Perform(len, [this, buf](size_t toRead) { return Slave_->Read(buf, toRead); }); +    } + +    inline size_t Skip(size_t len) { +        return Perform(len, [this](size_t toSkip) { return Slave_->Skip(toSkip); }); +    } + +private: +    template <class Operation> +    inline size_t Perform(size_t len, const Operation& operation) { +        if (!HavePendingData()) { +            return 0; +        } + +        const size_t toProcess = Min(Pending_, len); + +        if (toProcess) { +            const size_t processed = operation(toProcess); + +            if (!processed) { +                ythrow yexception() << "malformed http chunk"; +            } + +            Pending_ -= processed; + +            return processed; +        } + +        return 0; +    } + +    inline bool HavePendingData() { +        if (LastChunkReaded_) { +            return false; +        } + +        if (!Pending_) { +            if (!ProceedToNextChunk()) { +                return false; +            } +        } + +        return true; +    } + +    inline bool ProceedToNextChunk() { +        TString len(Slave_->ReadLine()); + +        if (len.empty()) { +            /* +             * skip crlf from previous chunk +             */ + +            len = Slave_->ReadLine(); +        } + +        Pending_ = ParseHex(len); + +        if (Pending_) { +            return true; +        } + +        if (Trailers_) { +            Trailers_->ConstructInPlace(Slave_); +        } +        LastChunkReaded_ = true; + +        return false; +    } + +private: +    IInputStream* Slave_; +    TMaybe<THttpHeaders>* Trailers_; +    size_t Pending_; +    bool LastChunkReaded_; +}; + +TChunkedInput::TChunkedInput(IInputStream* slave, TMaybe<THttpHeaders>* trailers) +    : Impl_(new TImpl(slave, trailers)) +{ +} + +TChunkedInput::~TChunkedInput() { +} + +size_t TChunkedInput::DoRead(void* buf, size_t len) { +    return Impl_->Read(buf, len); +} + +size_t TChunkedInput::DoSkip(size_t len) { +    return Impl_->Skip(len); +} + +class TChunkedOutput::TImpl { +    typedef IOutputStream::TPart TPart; + +public: +    inline TImpl(IOutputStream* slave) +        : Slave_(slave) +    { +    } + +    inline ~TImpl() { +    } + +    inline void Write(const void* buf, size_t len) { +        const char* ptr = (const char*)buf; + +        while (len) { +            const size_t portion = Min<size_t>(len, 1024 * 16); + +            WriteImpl(ptr, portion); + +            ptr += portion; +            len -= portion; +        } +    } + +    inline void WriteImpl(const void* buf, size_t len) { +        char tmp[32]; +        char* e = tmp + sizeof(tmp); +        char* b = ToHex(len, e); + +        const TPart parts[] = { +            TPart(b, e - b), +            TPart::CrLf(), +            TPart(buf, len), +            TPart::CrLf(), +        }; + +        Slave_->Write(parts, sizeof(parts) / sizeof(*parts)); +    } + +    inline void Flush() { +        Slave_->Flush(); +    } + +    inline void Finish() { +        Slave_->Write("0\r\n\r\n", 5); + +        Flush(); +    } + +private: +    IOutputStream* Slave_; +}; + +TChunkedOutput::TChunkedOutput(IOutputStream* slave) +    : Impl_(new TImpl(slave)) +{ +} + +TChunkedOutput::~TChunkedOutput() { +    try { +        Finish(); +    } catch (...) { +    } +} + +void TChunkedOutput::DoWrite(const void* buf, size_t len) { +    if (Impl_.Get()) { +        Impl_->Write(buf, len); +    } else { +        ythrow yexception() << "can not write to finished stream"; +    } +} + +void TChunkedOutput::DoFlush() { +    if (Impl_.Get()) { +        Impl_->Flush(); +    } +} + +void TChunkedOutput::DoFinish() { +    if (Impl_.Get()) { +        Impl_->Finish(); +        Impl_.Destroy(); +    } +}  | 
