diff options
author | monster <monster@ydb.tech> | 2022-07-07 14:41:37 +0300 |
---|---|---|
committer | monster <monster@ydb.tech> | 2022-07-07 14:41:37 +0300 |
commit | 06e5c21a835c0e923506c4ff27929f34e00761c2 (patch) | |
tree | 75efcbc6854ef9bd476eb8bf00cc5c900da436a2 /library/cpp/http/push_parser | |
parent | 03f024c4412e3aa613bb543cf1660176320ba8f4 (diff) | |
download | ydb-06e5c21a835c0e923506c4ff27929f34e00761c2.tar.gz |
fix ya.make
Diffstat (limited to 'library/cpp/http/push_parser')
-rw-r--r-- | library/cpp/http/push_parser/http_parser.cpp | 345 | ||||
-rw-r--r-- | library/cpp/http/push_parser/http_parser.h | 165 |
2 files changed, 510 insertions, 0 deletions
diff --git a/library/cpp/http/push_parser/http_parser.cpp b/library/cpp/http/push_parser/http_parser.cpp new file mode 100644 index 00000000000..d36618069fe --- /dev/null +++ b/library/cpp/http/push_parser/http_parser.cpp @@ -0,0 +1,345 @@ +#include "http_parser.h" + +#include <library/cpp/blockcodecs/stream.h> +#include <library/cpp/blockcodecs/codecs.h> + +#include <util/generic/string.h> +#include <util/generic/yexception.h> +#include <util/stream/mem.h> +#include <util/stream/zlib.h> +#include <util/string/ascii.h> +#include <util/string/split.h> +#include <util/string/strip.h> + +//#define DBGOUT(args) Cout << args << Endl; +#define DBGOUT(args) + +namespace { + const TString BestCodings[] = { + "gzip", + "deflate", + "br", + "x-gzip", + "x-deflate", + "y-lzo", + "y-lzf", + "y-lzq", + "y-bzip2", + "y-lzma", + }; +} + +TString THttpParser::GetBestCompressionScheme() const { + if (AcceptEncodings_.contains("*")) { + return BestCodings[0]; + } + + for (auto& coding : BestCodings) { + if (AcceptEncodings_.contains(coding)) { + return coding; + } + } + + return TString(); +} + +bool THttpParser::FirstLineParser() { + if (Y_UNLIKELY(!ReadLine())) { + return false; + } + + CurrentLine_.swap(FirstLine_); + + try { + TStringBuf s(FirstLine_); + if (MessageType_ == Response) { + // Status-Line = HTTP-Version SP Status-Code SP Reason-Phrase CRLF + TStringBuf httpVersion, statusCode; + GetNext(s, ' ', httpVersion); + ParseHttpVersion(httpVersion); + GetNext(s, ' ', statusCode); + RetCode_ = FromString<unsigned>(statusCode); + } else { + // Request-Line = Method SP Request-URI SP HTTP-Version CRLF + TStringBuf httpVersion = s.After(' ').After(' '); + ParseHttpVersion(httpVersion); + } + } catch (...) { + throw THttpParseException() << "Cannot parse first line: " << CurrentExceptionMessage() << " First 80 chars of line: " << FirstLine_.substr(0, Min<size_t>(80ull, FirstLine_.size())).Quote(); + } + + return HeadersParser(); +} + +bool THttpParser::HeadersParser() { + while (ReadLine()) { + if (!CurrentLine_) { + //end of headers + DBGOUT("end of headers()"); + ParseHeaderLine(); + + if (HasContentLength_) { + if (ContentLength_ == 0) { + return OnEndParsing(); + } + + if (ContentLength_ < 1000000) { + Content_.reserve(ContentLength_ + 1); + } + } + + return !!ChunkInputState_ ? ChunkedContentParser() : ContentParser(); + } + + if (CurrentLine_[0] == ' ' || CurrentLine_[0] == '\t') { + //continue previous header-line + HeaderLine_ += CurrentLine_; + CurrentLine_.remove(0); + } else { + ParseHeaderLine(); + HeaderLine_.swap(CurrentLine_); + } + } + + Parser_ = &THttpParser::HeadersParser; + return false; +} + +bool THttpParser::ContentParser() { + DBGOUT("Content parsing()"); + if (HasContentLength_) { + size_t rd = Min<size_t>(DataEnd_ - Data_, ContentLength_ - Content_.size()); + Content_.append(Data_, rd); + Data_ += rd; + DBGOUT("Content parsing: " << Content_.Size() << " from " << ContentLength_); + if (Content_.size() == ContentLength_) { + return OnEndParsing(); + } + } else { + if (MessageType_ == Request) { + return OnEndParsing(); //RFC2616 4.4-5 + } else if (Y_UNLIKELY(RetCode() < 200 || RetCode() == 204 || RetCode() == 304)) { + return OnEndParsing(); //RFC2616 4.4-1 (but not checked HEAD request type !) + } + + Content_.append(Data_, DataEnd_); + Data_ = DataEnd_; + } + Parser_ = &THttpParser::ContentParser; + return false; +} + +bool THttpParser::ChunkedContentParser() { + DBGOUT("ReadChunkedContent"); + TChunkInputState& ci = *ChunkInputState_; + + if (Content_.capacity() < static_cast<size_t>(DataEnd_ - Data_)) { + //try reduce memory reallocations + Content_.reserve(DataEnd_ - Data_); + } + + do { + if (!ci.LeftBytes_) { + if (Y_UNLIKELY(!ReadLine())) { //read first chunk size or CRLF from prev chunk or CRLF from last chunk + break; + } + + if (Y_UNLIKELY(ci.ReadLastChunk_)) { + return OnEndParsing(); + } + + if (!CurrentLine_) { + // skip crlf from previous chunk + if (!ReadLine()) { + break; + } + } + Y_ENSURE(CurrentLine_.size(), "NEH: LeftBytes hex number cannot be empty. "); + size_t size = CurrentLine_.find_first_of(" \t;"); + if (size == TString::npos) { + size = CurrentLine_.size(); + } + ci.LeftBytes_ = IntFromString<ui32, 16, char>(CurrentLine_.c_str(), size); + CurrentLine_.remove(0); + if (!ci.LeftBytes_) { //detectect end of context marker - zero-size chunk, need read CRLF after empty chunk + ci.ReadLastChunk_ = true; + if (ReadLine()) { + return OnEndParsing(); + } else { + break; + } + } + } + + size_t rd = Min<size_t>(DataEnd_ - Data_, ci.LeftBytes_); + Content_.append(Data_, rd); + Data_ += rd; + ci.LeftBytes_ -= rd; + } while (Data_ != DataEnd_); + + Parser_ = &THttpParser::ChunkedContentParser; + return false; +} + +bool THttpParser::OnEndParsing() { + Parser_ = &THttpParser::OnEndParsing; + ExtraDataSize_ = DataEnd_ - Data_; + return true; +} + +//continue read to CurrentLine_ +bool THttpParser::ReadLine() { + TStringBuf in(Data_, DataEnd_); + size_t endl = in.find('\n'); + + if (Y_UNLIKELY(endl == TStringBuf::npos)) { + //input line not completed + CurrentLine_.append(Data_, DataEnd_); + return false; + } + + CurrentLine_.append(in.data(), endl); + if (Y_LIKELY(CurrentLine_.size())) { + //remove '\r' from tail + size_t withoutCR = CurrentLine_.size() - 1; + if (CurrentLine_[withoutCR] == '\r') { + CurrentLine_.remove(withoutCR); + } + } + + //Cout << "ReadLine:" << CurrentLine_ << Endl; + Data_ += endl + 1; + return true; +} + +void THttpParser::ParseHttpVersion(TStringBuf httpVersion) { + if (!httpVersion.StartsWith("HTTP/", 5)) { + throw yexception() << "expect 'HTTP/'"; + } + httpVersion.Skip(5); + { + TStringBuf major, minor; + Split(httpVersion, '.', major, minor); + HttpVersion_.Major = FromString<unsigned>(major); + HttpVersion_.Minor = FromString<unsigned>(minor); + if (Y_LIKELY(HttpVersion_.Major > 1 || HttpVersion_.Minor > 0)) { + // since HTTP/1.1 Keep-Alive is default behaviour + KeepAlive_ = true; + } + } +} + +void THttpParser::ParseHeaderLine() { + if (!!HeaderLine_) { + if (CollectHeaders_) { + THttpInputHeader hdr(HeaderLine_); + + Headers_.AddHeader(hdr); + + ApplyHeaderLine(hdr.Name(), hdr.Value()); + } else { + //some dirty optimization (avoid reallocation new strings) + size_t pos = HeaderLine_.find(':'); + + if (pos == TString::npos) { + ythrow THttpParseException() << "can not parse http header(" << HeaderLine_.Quote() << ")"; + } + + TStringBuf name(StripString(TStringBuf(HeaderLine_.begin(), HeaderLine_.begin() + pos))); + TStringBuf val(StripString(TStringBuf(HeaderLine_.begin() + pos + 1, HeaderLine_.end()))); + ApplyHeaderLine(name, val); + } + HeaderLine_.remove(0); + } +} + +void THttpParser::OnEof() { + if (Parser_ == &THttpParser::ContentParser && !HasContentLength_ && !ChunkInputState_) { + return; //end of content determined by end of input + } + throw THttpException() << TStringBuf("incompleted http response"); +} + +bool THttpParser::DecodeContent() { + if (!ContentEncoding_ || ContentEncoding_ == "identity" || ContentEncoding_ == "none") { + DecodedContent_ = Content_; + return false; + } + + TMemoryInput in(Content_.data(), Content_.size()); + if (ContentEncoding_ == "gzip") { + auto decompressor = TZLibDecompress(&in, ZLib::GZip); + if (!GzipAllowMultipleStreams_) { + decompressor.SetAllowMultipleStreams(false); + } + DecodedContent_ = decompressor.ReadAll(); + } else if (ContentEncoding_ == "deflate") { + + //https://tools.ietf.org/html/rfc1950 + bool definitelyNoZlibHeader; + if (Content_.size() < 2) { + definitelyNoZlibHeader = true; + } else { + const ui16 cmf = static_cast<ui8>(Content_[0]); + const ui16 flg = static_cast<ui8>(Content_[1]); + definitelyNoZlibHeader = ((cmf << 8) | flg) % 31 != 0; + } + + try { + DecodedContent_ = TZLibDecompress(&in, definitelyNoZlibHeader ? ZLib::Raw : ZLib::ZLib).ReadAll(); + } + catch(...) { + if (definitelyNoZlibHeader) { + throw; + } + TMemoryInput retryInput(Content_.data(), Content_.size()); + DecodedContent_ = TZLibDecompress(&retryInput, ZLib::Raw).ReadAll(); + } + } else if (ContentEncoding_.StartsWith("z-")) { + // opposite for library/cpp/http/io/stream.h + const NBlockCodecs::ICodec* codec = nullptr; + try { + const TStringBuf codecName = TStringBuf(ContentEncoding_).SubStr(2); + if (codecName.StartsWith("zstd06") || codecName.StartsWith("zstd08")) { + ythrow NBlockCodecs::TNotFound() << codecName; + } + codec = NBlockCodecs::Codec(codecName); + } catch(const NBlockCodecs::TNotFound& exc) { + throw THttpParseException() << "Unsupported content-encoding method: " << exc.AsStrBuf(); + } + NBlockCodecs::TDecodedInput decoder(&in, codec); + DecodedContent_ = decoder.ReadAll(); + } else { + throw THttpParseException() << "Unsupported content-encoding method: " << ContentEncoding_; + } + return true; +} + +void THttpParser::ApplyHeaderLine(const TStringBuf& name, const TStringBuf& val) { + if (AsciiEqualsIgnoreCase(name, TStringBuf("connection"))) { + KeepAlive_ = AsciiEqualsIgnoreCase(val, TStringBuf("keep-alive")); + } else if (AsciiEqualsIgnoreCase(name, TStringBuf("content-length"))) { + Y_ENSURE(val.size(), "NEH: Content-Length cannot be empty string. "); + ContentLength_ = FromString<ui64>(val); + HasContentLength_ = true; + } else if (AsciiEqualsIgnoreCase(name, TStringBuf("transfer-encoding"))) { + if (AsciiEqualsIgnoreCase(val, TStringBuf("chunked"))) { + ChunkInputState_ = new TChunkInputState(); + } + } else if (AsciiEqualsIgnoreCase(name, TStringBuf("accept-encoding"))) { + TStringBuf encodings(val); + while (encodings.size()) { + TStringBuf enc = encodings.NextTok(',').After(' ').Before(' '); + if (!enc) { + continue; + } + TString s(enc); + s.to_lower(); + AcceptEncodings_.insert(s); + } + } else if (AsciiEqualsIgnoreCase(name, TStringBuf("content-encoding"))) { + TString s(val); + s.to_lower(); + ContentEncoding_ = s; + } +} diff --git a/library/cpp/http/push_parser/http_parser.h b/library/cpp/http/push_parser/http_parser.h new file mode 100644 index 00000000000..8757a3ef9a2 --- /dev/null +++ b/library/cpp/http/push_parser/http_parser.h @@ -0,0 +1,165 @@ +#pragma once + +#include <util/generic/string.h> +#include <util/generic/strbuf.h> +#include <util/generic/yexception.h> +#include <util/generic/hash_set.h> +#include <util/string/cast.h> +#include <library/cpp/http/io/stream.h> + +struct THttpVersion { + unsigned Major = 1; + unsigned Minor = 0; +}; + +//http requests parser for async/callbacks arch. (uggly state-machine) +//usage, - call Parse(...), if returned 'true' - all message parsed, +//external (non entered in message) bytes in input data counted by GetExtraDataSize() +class THttpParser { +public: + enum TMessageType { + Request, + Response + }; + + THttpParser(TMessageType mt = Response) + : Parser_(&THttpParser::FirstLineParser) + , MessageType_(mt) + { + } + + inline void DisableCollectingHeaders() noexcept { + CollectHeaders_ = false; + } + + inline void SetGzipAllowMultipleStreams(bool allow) noexcept { + GzipAllowMultipleStreams_ = allow; + } + + /// @return true on end parsing (GetExtraDataSize() return amount not used bytes) + /// throw exception on bad http format (unsupported encoding, etc) + /// sz == 0 signaling end of input stream + bool Parse(const char* data, size_t sz) { + if (ParseImpl(data, sz)) { + DecodeContent(); + return true; + } + return false; + } + + const char* Data() const noexcept { + return Data_; + } + size_t GetExtraDataSize() const noexcept { + return ExtraDataSize_; + } + + const TString& FirstLine() const noexcept { + return FirstLine_; + } + + unsigned RetCode() const noexcept { + return RetCode_; + } + + const THttpVersion& HttpVersion() const noexcept { + return HttpVersion_; + } + + const THttpHeaders& Headers() const noexcept { + return Headers_; + } + + bool IsKeepAlive() const noexcept { + return KeepAlive_; + } + + bool GetContentLength(ui64& value) const noexcept { + if (!HasContentLength_) { + return false; + } + + value = ContentLength_; + return true; + } + + TString GetBestCompressionScheme() const; + + const TString& Content() const noexcept { + return Content_; + } + + const TString& DecodedContent() const noexcept { + return DecodedContent_; + } + + void Prepare() { + HeaderLine_.reserve(128); + FirstLine_.reserve(128); + } + +private: + bool ParseImpl(const char* data, size_t sz) { + Data_ = data; + DataEnd_ = data + sz; + if (sz == 0) { + OnEof(); + return true; + } + return (this->*Parser_)(); + } + // stage parsers + bool FirstLineParser(); + bool HeadersParser(); + bool ContentParser(); + bool ChunkedContentParser(); + bool OnEndParsing(); + + // continue read to CurrentLine_ + bool ReadLine(); + + void ParseHttpVersion(TStringBuf httpVersion); + void ParseHeaderLine(); + + void OnEof(); + bool DecodeContent(); + + void ApplyHeaderLine(const TStringBuf& name, const TStringBuf& val); + + typedef bool (THttpParser::*TParser)(); + + TParser Parser_; //current parser (stage) + TMessageType MessageType_ = Response; + bool CollectHeaders_ = true; + bool GzipAllowMultipleStreams_ = true; + + // parsed data + const char* Data_ = nullptr; + const char* DataEnd_ = nullptr; + TString CurrentLine_; + TString HeaderLine_; + + size_t ExtraDataSize_ = 0; + + // headers + TString FirstLine_; + THttpVersion HttpVersion_; + unsigned RetCode_ = 0; + THttpHeaders Headers_; + bool KeepAlive_ = false; + THashSet<TString> AcceptEncodings_; + + TString ContentEncoding_; + bool HasContentLength_ = false; + ui64 ContentLength_ = 0; + + struct TChunkInputState { + size_t LeftBytes_ = 0; + bool ReadLastChunk_ = false; + }; + + TAutoPtr<TChunkInputState> ChunkInputState_; + + TString Content_; + TString DecodedContent_; +}; |