diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/http/io | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/http/io')
-rw-r--r-- | library/cpp/http/io/chunk.cpp | 246 | ||||
-rw-r--r-- | library/cpp/http/io/chunk.h | 47 | ||||
-rw-r--r-- | library/cpp/http/io/chunk_ut.cpp | 105 | ||||
-rw-r--r-- | library/cpp/http/io/compression.cpp | 66 | ||||
-rw-r--r-- | library/cpp/http/io/compression.h | 72 | ||||
-rw-r--r-- | library/cpp/http/io/compression_ut.cpp | 60 | ||||
-rw-r--r-- | library/cpp/http/io/fuzz/main.cpp | 15 | ||||
-rw-r--r-- | library/cpp/http/io/fuzz/ya.make | 18 | ||||
-rw-r--r-- | library/cpp/http/io/headers.cpp | 108 | ||||
-rw-r--r-- | library/cpp/http/io/headers.h | 125 | ||||
-rw-r--r-- | library/cpp/http/io/headers_ut.cpp | 176 | ||||
-rw-r--r-- | library/cpp/http/io/list_codings/main.cpp | 8 | ||||
-rw-r--r-- | library/cpp/http/io/list_codings/ya.make | 13 | ||||
-rw-r--r-- | library/cpp/http/io/stream.cpp | 1005 | ||||
-rw-r--r-- | library/cpp/http/io/stream.h | 178 | ||||
-rw-r--r-- | library/cpp/http/io/stream_ut.cpp | 732 | ||||
-rw-r--r-- | library/cpp/http/io/stream_ut_medium.cpp | 54 | ||||
-rw-r--r-- | library/cpp/http/io/ut/medium/ya.make | 11 | ||||
-rw-r--r-- | library/cpp/http/io/ut/ya.make | 16 | ||||
-rw-r--r-- | library/cpp/http/io/ya.make | 22 |
20 files changed, 3077 insertions, 0 deletions
diff --git a/library/cpp/http/io/chunk.cpp b/library/cpp/http/io/chunk.cpp new file mode 100644 index 0000000000..6975d9eac1 --- /dev/null +++ b/library/cpp/http/io/chunk.cpp @@ -0,0 +1,246 @@ +#include "chunk.h" + +#include "headers.h" + +#include <util/string/cast.h> +#include <util/generic/utility.h> +#include <util/generic/yexception.h> + +static inline size_t ParseHex(const TString& s) { + if (s.empty()) { + ythrow yexception() << "can not parse chunk length(empty string)"; + } + + size_t ret = 0; + + for (TString::const_iterator c = s.begin(); c != s.end(); ++c) { + const char ch = *c; + + if (ch >= '0' && ch <= '9') { + ret *= 16; + ret += ch - '0'; + } else if (ch >= 'a' && ch <= 'f') { + ret *= 16; + ret += 10 + ch - 'a'; + } else if (ch >= 'A' && ch <= 'F') { + ret *= 16; + ret += 10 + ch - 'A'; + } else if (ch == ';') { + break; + } else if (isspace(ch)) { + continue; + } else { + ythrow yexception() << "can not parse chunk length(" << s.data() << ")"; + } + } + + return ret; +} + +static inline char* ToHex(size_t len, char* buf) { + do { + const size_t val = len % 16; + + *--buf = (val < 10) ? (val + '0') : (val - 10 + 'a'); + len /= 16; + } while (len); + + return buf; +} + +class TChunkedInput::TImpl { +public: + inline TImpl(IInputStream* slave, TMaybe<THttpHeaders>* trailers) + : Slave_(slave) + , Trailers_(trailers) + , Pending_(0) + , LastChunkReaded_(false) + { + if (Trailers_) { + Trailers_->Clear(); + } + } + + inline ~TImpl() { + } + + inline size_t Read(void* buf, size_t len) { + return Perform(len, [this, buf](size_t toRead) { return Slave_->Read(buf, toRead); }); + } + + inline size_t Skip(size_t len) { + return Perform(len, [this](size_t toSkip) { return Slave_->Skip(toSkip); }); + } + +private: + template <class Operation> + inline size_t Perform(size_t len, const Operation& operation) { + if (!HavePendingData()) { + return 0; + } + + const size_t toProcess = Min(Pending_, len); + + if (toProcess) { + const size_t processed = operation(toProcess); + + if (!processed) { + ythrow yexception() << "malformed http chunk"; + } + + Pending_ -= processed; + + return processed; + } + + return 0; + } + + inline bool HavePendingData() { + if (LastChunkReaded_) { + return false; + } + + if (!Pending_) { + if (!ProceedToNextChunk()) { + return false; + } + } + + return true; + } + + inline bool ProceedToNextChunk() { + TString len(Slave_->ReadLine()); + + if (len.empty()) { + /* + * skip crlf from previous chunk + */ + + len = Slave_->ReadLine(); + } + + Pending_ = ParseHex(len); + + if (Pending_) { + return true; + } + + if (Trailers_) { + Trailers_->ConstructInPlace(Slave_); + } + LastChunkReaded_ = true; + + return false; + } + +private: + IInputStream* Slave_; + TMaybe<THttpHeaders>* Trailers_; + size_t Pending_; + bool LastChunkReaded_; +}; + +TChunkedInput::TChunkedInput(IInputStream* slave, TMaybe<THttpHeaders>* trailers) + : Impl_(new TImpl(slave, trailers)) +{ +} + +TChunkedInput::~TChunkedInput() { +} + +size_t TChunkedInput::DoRead(void* buf, size_t len) { + return Impl_->Read(buf, len); +} + +size_t TChunkedInput::DoSkip(size_t len) { + return Impl_->Skip(len); +} + +class TChunkedOutput::TImpl { + typedef IOutputStream::TPart TPart; + +public: + inline TImpl(IOutputStream* slave) + : Slave_(slave) + { + } + + inline ~TImpl() { + } + + inline void Write(const void* buf, size_t len) { + const char* ptr = (const char*)buf; + + while (len) { + const size_t portion = Min<size_t>(len, 1024 * 16); + + WriteImpl(ptr, portion); + + ptr += portion; + len -= portion; + } + } + + inline void WriteImpl(const void* buf, size_t len) { + char tmp[32]; + char* e = tmp + sizeof(tmp); + char* b = ToHex(len, e); + + const TPart parts[] = { + TPart(b, e - b), + TPart::CrLf(), + TPart(buf, len), + TPart::CrLf(), + }; + + Slave_->Write(parts, sizeof(parts) / sizeof(*parts)); + } + + inline void Flush() { + Slave_->Flush(); + } + + inline void Finish() { + Slave_->Write("0\r\n\r\n", 5); + + Flush(); + } + +private: + IOutputStream* Slave_; +}; + +TChunkedOutput::TChunkedOutput(IOutputStream* slave) + : Impl_(new TImpl(slave)) +{ +} + +TChunkedOutput::~TChunkedOutput() { + try { + Finish(); + } catch (...) { + } +} + +void TChunkedOutput::DoWrite(const void* buf, size_t len) { + if (Impl_.Get()) { + Impl_->Write(buf, len); + } else { + ythrow yexception() << "can not write to finished stream"; + } +} + +void TChunkedOutput::DoFlush() { + if (Impl_.Get()) { + Impl_->Flush(); + } +} + +void TChunkedOutput::DoFinish() { + if (Impl_.Get()) { + Impl_->Finish(); + Impl_.Destroy(); + } +} diff --git a/library/cpp/http/io/chunk.h b/library/cpp/http/io/chunk.h new file mode 100644 index 0000000000..88d89fafda --- /dev/null +++ b/library/cpp/http/io/chunk.h @@ -0,0 +1,47 @@ +#pragma once + +#include <util/stream/output.h> +#include <util/generic/maybe.h> +#include <util/generic/ptr.h> + +class THttpHeaders; + +/// @addtogroup Streams_Chunked +/// @{ +/// Ввод данных порциями. +/// @details Последовательное чтение блоков данных. Предполагается, что +/// данные записаны в виде <длина блока><блок данных>. +class TChunkedInput: public IInputStream { +public: + /// Если передан указатель на trailers, то туда будут записаны HTTP trailer'ы (возможно пустые), + /// которые идут после чанков. + TChunkedInput(IInputStream* slave, TMaybe<THttpHeaders>* trailers = nullptr); + ~TChunkedInput() override; + +private: + size_t DoRead(void* buf, size_t len) override; + size_t DoSkip(size_t len) override; + +private: + class TImpl; + THolder<TImpl> Impl_; +}; + +/// Вывод данных порциями. +/// @details Вывод данных блоками в виде <длина блока><блок данных>. Если объем +/// данных превышает 64K, они записываются в виде n блоков по 64K + то, что осталось. +class TChunkedOutput: public IOutputStream { +public: + TChunkedOutput(IOutputStream* slave); + ~TChunkedOutput() override; + +private: + void DoWrite(const void* buf, size_t len) override; + void DoFlush() override; + void DoFinish() override; + +private: + class TImpl; + THolder<TImpl> Impl_; +}; +/// @} diff --git a/library/cpp/http/io/chunk_ut.cpp b/library/cpp/http/io/chunk_ut.cpp new file mode 100644 index 0000000000..da283f8568 --- /dev/null +++ b/library/cpp/http/io/chunk_ut.cpp @@ -0,0 +1,105 @@ +#include "chunk.h" + +#include <library/cpp/testing/unittest/registar.h> + +#include <util/stream/file.h> +#include <util/system/tempfile.h> +#include <util/stream/null.h> + +#define CDATA "./chunkedio" + +Y_UNIT_TEST_SUITE(TestChunkedIO) { + static const char test_data[] = "87s6cfbsudg cuisg s igasidftasiy tfrcua6s"; + + TString CombString(const TString& s, size_t chunkSize) { + TString result; + for (size_t pos = 0; pos < s.size(); pos += 2 * chunkSize) + result += s.substr(pos, chunkSize); + return result; + } + + void WriteTestData(IOutputStream * stream, TString * contents) { + contents->clear(); + for (size_t i = 0; i < sizeof(test_data); ++i) { + stream->Write(test_data, i); + contents->append(test_data, i); + } + } + + void ReadInSmallChunks(IInputStream * stream, TString * contents) { + char buf[11]; + size_t read = 0; + + contents->clear(); + do { + read = stream->Read(buf, sizeof(buf)); + contents->append(buf, read); + } while (read > 0); + } + + void ReadCombed(IInputStream * stream, TString * contents, size_t chunkSize) { + Y_ASSERT(chunkSize < 128); + char buf[128]; + + contents->clear(); + while (true) { + size_t read = stream->Load(buf, chunkSize); + contents->append(buf, read); + if (read == 0) + break; + + size_t toSkip = chunkSize; + size_t skipped = 0; + do { + skipped = stream->Skip(toSkip); + toSkip -= skipped; + } while (skipped != 0 && toSkip != 0); + } + } + + Y_UNIT_TEST(TestChunkedIo) { + TTempFile tmpFile(CDATA); + TString tmp; + + { + TUnbufferedFileOutput fo(CDATA); + TChunkedOutput co(&fo); + WriteTestData(&co, &tmp); + } + + { + TUnbufferedFileInput fi(CDATA); + TChunkedInput ci(&fi); + TString r; + + ReadInSmallChunks(&ci, &r); + + UNIT_ASSERT_EQUAL(r, tmp); + } + + { + TUnbufferedFileInput fi(CDATA); + TChunkedInput ci(&fi); + TString r; + + ReadCombed(&ci, &r, 11); + + UNIT_ASSERT_EQUAL(r, CombString(tmp, 11)); + } + } + + Y_UNIT_TEST(TestBadChunk) { + bool hasError = false; + + try { + TString badChunk = "10\r\nqwerty"; + TMemoryInput mi(badChunk.data(), badChunk.size()); + TChunkedInput ci(&mi); + TransferData(&ci, &Cnull); + } catch (...) { + hasError = true; + } + + UNIT_ASSERT(hasError); + } +} diff --git a/library/cpp/http/io/compression.cpp b/library/cpp/http/io/compression.cpp new file mode 100644 index 0000000000..8fa1f62ae6 --- /dev/null +++ b/library/cpp/http/io/compression.cpp @@ -0,0 +1,66 @@ +#include "compression.h" + +#if defined(ENABLE_GPL) +#include <library/cpp/streams/lz/lz.h> +#endif + +#include <library/cpp/streams/brotli/brotli.h> +#include <library/cpp/streams/lzma/lzma.h> +#include <library/cpp/streams/bzip2/bzip2.h> + +#include <library/cpp/blockcodecs/stream.h> +#include <library/cpp/blockcodecs/codecs.h> + +#include <util/stream/zlib.h> + + +TCompressionCodecFactory::TCompressionCodecFactory() { + auto gzip = [](auto s) { + return MakeHolder<TZLibDecompress>(s); + }; + + Add("gzip", gzip, [](auto s) { return MakeHolder<TZLibCompress>(s, ZLib::GZip); }); + Add("deflate", gzip, [](auto s) { return MakeHolder<TZLibCompress>(s, ZLib::ZLib); }); + Add("br", [](auto s) { return MakeHolder<TBrotliDecompress>(s); }, [](auto s) { return MakeHolder<TBrotliCompress>(s, 4); }); + Add("x-gzip", gzip, [](auto s) { return MakeHolder<TZLibCompress>(s, ZLib::GZip); }); + Add("x-deflate", gzip, [](auto s) { return MakeHolder<TZLibCompress>(s, ZLib::ZLib); }); + +#if defined(ENABLE_GPL) + const ui16 bs = 32 * 1024; + + Add("y-lzo", [](auto s) { return MakeHolder<TLzoDecompress>(s); }, [bs](auto s) { return MakeHolder<TLazy<TLzoCompress> >(s, bs); }); + Add("y-lzf", [](auto s) { return MakeHolder<TLzfDecompress>(s); }, [bs](auto s) { return MakeHolder<TLazy<TLzfCompress> >(s, bs); }); + Add("y-lzq", [](auto s) { return MakeHolder<TLzqDecompress>(s); }, [bs](auto s) { return MakeHolder<TLazy<TLzqCompress> >(s, bs); }); +#endif + + Add("y-bzip2", [](auto s) { return MakeHolder<TBZipDecompress>(s); }, [](auto s) { return MakeHolder<TBZipCompress>(s); }); + Add("y-lzma", [](auto s) { return MakeHolder<TLzmaDecompress>(s); }, [](auto s) { return MakeHolder<TLzmaCompress>(s); }); + + for (auto codecName : NBlockCodecs::ListAllCodecs()) { + if (codecName.StartsWith("zstd06")) { + continue; + } + + if (codecName.StartsWith("zstd08")) { + continue; + } + + auto codec = NBlockCodecs::Codec(codecName); + + auto enc = [codec](auto s) { + return MakeHolder<NBlockCodecs::TCodedOutput>(s, codec, 32 * 1024); + }; + + auto dec = [codec](auto s) { + return MakeHolder<NBlockCodecs::TDecodedInput>(s, codec); + }; + + Add(TString("z-") + codecName, dec, enc); + } +} + +void TCompressionCodecFactory::Add(TStringBuf name, TDecoderConstructor d, TEncoderConstructor e) { + Strings_.emplace_back(name); + Codecs_[Strings_.back()] = TCodec{d, e}; + BestCodecs_.emplace_back(Strings_.back()); +} diff --git a/library/cpp/http/io/compression.h b/library/cpp/http/io/compression.h new file mode 100644 index 0000000000..f16c4a18eb --- /dev/null +++ b/library/cpp/http/io/compression.h @@ -0,0 +1,72 @@ +#pragma once + +#include "stream.h" + +#include <util/generic/deque.h> +#include <util/generic/hash.h> + +class TCompressionCodecFactory { +public: + using TDecoderConstructor = std::function<THolder<IInputStream>(IInputStream*)>; + using TEncoderConstructor = std::function<THolder<IOutputStream>(IOutputStream*)>; + + TCompressionCodecFactory(); + + static inline TCompressionCodecFactory& Instance() noexcept { + return *SingletonWithPriority<TCompressionCodecFactory, 0>(); + } + + inline const TDecoderConstructor* FindDecoder(TStringBuf name) const { + if (auto codec = Codecs_.FindPtr(name)) { + return &codec->Decoder; + } + + return nullptr; + } + + inline const TEncoderConstructor* FindEncoder(TStringBuf name) const { + if (auto codec = Codecs_.FindPtr(name)) { + return &codec->Encoder; + } + + return nullptr; + } + + inline TArrayRef<const TStringBuf> GetBestCodecs() const { + return BestCodecs_; + } + +private: + void Add(TStringBuf name, TDecoderConstructor d, TEncoderConstructor e); + + struct TCodec { + TDecoderConstructor Decoder; + TEncoderConstructor Encoder; + }; + + TDeque<TString> Strings_; + THashMap<TStringBuf, TCodec> Codecs_; + TVector<TStringBuf> BestCodecs_; +}; + +namespace NHttp { + template <typename F> + TString ChooseBestCompressionScheme(F accepted, TArrayRef<const TStringBuf> available) { + if (available.empty()) { + return "identity"; + } + + if (accepted("*")) { + return TString(available[0]); + } + + for (const auto& coding : available) { + TString s(coding); + if (accepted(s)) { + return s; + } + } + + return "identity"; + } +} diff --git a/library/cpp/http/io/compression_ut.cpp b/library/cpp/http/io/compression_ut.cpp new file mode 100644 index 0000000000..2f3d131f8c --- /dev/null +++ b/library/cpp/http/io/compression_ut.cpp @@ -0,0 +1,60 @@ +#include "stream.h" +#include "compression.h" + +#include <library/cpp/testing/unittest/registar.h> +#include <library/cpp/testing/unittest/tests_data.h> + +#include <util/stream/zlib.h> +#include <util/generic/hash_set.h> + +Y_UNIT_TEST_SUITE(THttpCompressionTest) { + static const TString DATA = "I'm a teapot"; + + Y_UNIT_TEST(TestGetBestCodecs) { + UNIT_ASSERT(TCompressionCodecFactory::Instance().GetBestCodecs().size() > 0); + } + + Y_UNIT_TEST(TestEncoder) { + TStringStream buffer; + + { + auto encoder = TCompressionCodecFactory::Instance().FindEncoder("gzip"); + UNIT_ASSERT(encoder); + + auto encodedStream = (*encoder)(&buffer); + encodedStream->Write(DATA); + } + + TZLibDecompress decompressor(&buffer); + UNIT_ASSERT_EQUAL(decompressor.ReadAll(), DATA); + } + + Y_UNIT_TEST(TestDecoder) { + TStringStream buffer; + + { + TZLibCompress compressor(TZLibCompress::TParams(&buffer).SetType(ZLib::GZip)); + compressor.Write(DATA); + } + + auto decoder = TCompressionCodecFactory::Instance().FindDecoder("gzip"); + UNIT_ASSERT(decoder); + + auto decodedStream = (*decoder)(&buffer); + UNIT_ASSERT_EQUAL(decodedStream->ReadAll(), DATA); + } + + Y_UNIT_TEST(TestChooseBestCompressionScheme) { + THashSet<TString> accepted; + + auto checkAccepted = [&accepted](const TString& v) { + return accepted.contains(v); + }; + + UNIT_ASSERT_VALUES_EQUAL("identity", NHttp::ChooseBestCompressionScheme(checkAccepted, {"gzip", "deflate"})); + accepted.insert("deflate"); + UNIT_ASSERT_VALUES_EQUAL("deflate", NHttp::ChooseBestCompressionScheme(checkAccepted, {"gzip", "deflate"})); + accepted.insert("*"); + UNIT_ASSERT_VALUES_EQUAL("gzip", NHttp::ChooseBestCompressionScheme(checkAccepted, {"gzip", "deflate"})); + } +} // THttpCompressionTest suite diff --git a/library/cpp/http/io/fuzz/main.cpp b/library/cpp/http/io/fuzz/main.cpp new file mode 100644 index 0000000000..8ded9c7e32 --- /dev/null +++ b/library/cpp/http/io/fuzz/main.cpp @@ -0,0 +1,15 @@ +#include <library/cpp/http/io/stream.h> + +#include <util/generic/vector.h> +#include <util/stream/mem.h> + +extern "C" int LLVMFuzzerTestOneInput(const ui8* data, size_t size) { + TMemoryInput mi(data, size); + + try { + THttpInput(&mi).ReadAll(); + } catch (...) { + } + + return 0; // Non-zero return values are reserved for future use. +} diff --git a/library/cpp/http/io/fuzz/ya.make b/library/cpp/http/io/fuzz/ya.make new file mode 100644 index 0000000000..8b3ccb1969 --- /dev/null +++ b/library/cpp/http/io/fuzz/ya.make @@ -0,0 +1,18 @@ +FUZZ() + +OWNER( + pg + g:util +) + +PEERDIR( + library/cpp/http/io +) + +SIZE(MEDIUM) + +SRCS( + main.cpp +) + +END() diff --git a/library/cpp/http/io/headers.cpp b/library/cpp/http/io/headers.cpp new file mode 100644 index 0000000000..4ec27a29e8 --- /dev/null +++ b/library/cpp/http/io/headers.cpp @@ -0,0 +1,108 @@ +#include "headers.h" +#include "stream.h" + +#include <util/generic/strbuf.h> +#include <util/generic/yexception.h> +#include <util/stream/output.h> +#include <util/string/ascii.h> +#include <util/string/cast.h> +#include <util/string/strip.h> + +static inline TStringBuf Trim(const char* b, const char* e) noexcept { + return StripString(TStringBuf(b, e)); +} + +THttpInputHeader::THttpInputHeader(const TStringBuf header) { + size_t pos = header.find(':'); + + if (pos == TString::npos) { + ythrow THttpParseException() << "can not parse http header(" << TString{header}.Quote() << ")"; + } + + Name_ = TString(header.cbegin(), header.cbegin() + pos); + Value_ = ::ToString(Trim(header.cbegin() + pos + 1, header.cend())); +} + +THttpInputHeader::THttpInputHeader(TString name, TString value) + : Name_(std::move(name)) + , Value_(std::move(value)) +{ +} + +void THttpInputHeader::OutTo(IOutputStream* stream) const { + typedef IOutputStream::TPart TPart; + + const TPart parts[] = { + TPart(Name_), + TPart(": ", 2), + TPart(Value_), + TPart::CrLf(), + }; + + stream->Write(parts, sizeof(parts) / sizeof(*parts)); +} + +THttpHeaders::THttpHeaders(IInputStream* stream) { + TString header; + TString line; + + bool rdOk = stream->ReadLine(header); + while (rdOk && !header.empty()) { + rdOk = stream->ReadLine(line); + + if (rdOk && ((line[0] == ' ') || (line[0] == '\t'))) { + header += line; + } else { + AddHeader(THttpInputHeader(header)); + header = line; + } + } +} + +bool THttpHeaders::HasHeader(const TStringBuf header) const { + return FindHeader(header); +} + +const THttpInputHeader* THttpHeaders::FindHeader(const TStringBuf header) const { + for (const auto& hdr : Headers_) { + if (AsciiCompareIgnoreCase(hdr.Name(), header) == 0) { + return &hdr; + } + } + return nullptr; +} + +void THttpHeaders::RemoveHeader(const TStringBuf header) { + for (auto h = Headers_.begin(); h != Headers_.end(); ++h) { + if (AsciiCompareIgnoreCase(h->Name(), header) == 0) { + Headers_.erase(h); + return; + } + } +} + +void THttpHeaders::AddOrReplaceHeader(const THttpInputHeader& header) { + for (auto& hdr : Headers_) { + if (AsciiCompareIgnoreCase(hdr.Name(), header.Name()) == 0) { + hdr = header; + return; + } + } + + AddHeader(header); +} + +void THttpHeaders::AddHeader(THttpInputHeader header) { + Headers_.push_back(std::move(header)); +} + +void THttpHeaders::OutTo(IOutputStream* stream) const { + for (TConstIterator header = Begin(); header != End(); ++header) { + header->OutTo(stream); + } +} + +template <> +void Out<THttpHeaders>(IOutputStream& out, const THttpHeaders& h) { + h.OutTo(&out); +} diff --git a/library/cpp/http/io/headers.h b/library/cpp/http/io/headers.h new file mode 100644 index 0000000000..a71793d1c6 --- /dev/null +++ b/library/cpp/http/io/headers.h @@ -0,0 +1,125 @@ +#pragma once + +#include <util/generic/string.h> +#include <util/generic/strbuf.h> +#include <util/generic/deque.h> +#include <util/generic/vector.h> +#include <util/string/cast.h> + +class IInputStream; +class IOutputStream; + +/// @addtogroup Streams_HTTP +/// @{ +/// Объект, содержащий информацию о HTTP-заголовке. +class THttpInputHeader { +public: + /// @param[in] header - строка вида 'параметр: значение'. + THttpInputHeader(TStringBuf header); + /// @param[in] name - имя параметра. + /// @param[in] value - значение параметра. + THttpInputHeader(TString name, TString value); + + /// Возвращает имя параметра. + inline const TString& Name() const noexcept { + return Name_; + } + + /// Возвращает значение параметра. + inline const TString& Value() const noexcept { + return Value_; + } + + /// Записывает заголовок вида "имя параметра: значение\r\n" в поток. + void OutTo(IOutputStream* stream) const; + + /// Возвращает строку "имя параметра: значение". + inline TString ToString() const { + return Name_ + TStringBuf(": ") + Value_; + } + +private: + TString Name_; + TString Value_; +}; + +/// Контейнер для хранения HTTP-заголовков +class THttpHeaders { + using THeaders = TDeque<THttpInputHeader>; + +public: + using TConstIterator = THeaders::const_iterator; + + THttpHeaders() = default; + + /// Добавляет каждую строку из потока в контейнер, считая ее правильным заголовком. + THttpHeaders(IInputStream* stream); + + /// Стандартный итератор. + inline TConstIterator Begin() const noexcept { + return Headers_.begin(); + } + inline TConstIterator begin() const noexcept { + return Headers_.begin(); + } + + /// Стандартный итератор. + inline TConstIterator End() const noexcept { + return Headers_.end(); + } + inline TConstIterator end() const noexcept { + return Headers_.end(); + } + + /// Возвращает количество заголовков в контейнере. + inline size_t Count() const noexcept { + return Headers_.size(); + } + + /// Проверяет, содержит ли контейнер хотя бы один заголовок. + inline bool Empty() const noexcept { + return Headers_.empty(); + } + + /// Добавляет заголовок в контейнер. + void AddHeader(THttpInputHeader header); + + template <typename ValueType> + void AddHeader(TString name, const ValueType& value) { + AddHeader(THttpInputHeader(std::move(name), ToString(value))); + } + + /// Добавляет заголовок в контейнер, если тот не содержит заголовка + /// c таким же параметром. В противном случае, заменяет существующий + /// заголовок на новый. + void AddOrReplaceHeader(const THttpInputHeader& header); + + template <typename ValueType> + void AddOrReplaceHeader(TString name, const ValueType& value) { + AddOrReplaceHeader(THttpInputHeader(std::move(name), ToString(value))); + } + + // Проверяет, есть ли такой заголовок + bool HasHeader(TStringBuf header) const; + + /// Удаляет заголовок, если он есть. + void RemoveHeader(TStringBuf header); + + /// Ищет заголовок по указанному имени + /// Возвращает nullptr, если не нашел + const THttpInputHeader* FindHeader(TStringBuf header) const; + + /// Записывает все заголовки контейнера в поток. + /// @details Каждый заголовк записывается в виде "имя параметра: значение\r\n". + void OutTo(IOutputStream* stream) const; + + /// Обменивает наборы заголовков двух контейнеров. + void Swap(THttpHeaders& headers) noexcept { + Headers_.swap(headers.Headers_); + } + +private: + THeaders Headers_; +}; + +/// @} diff --git a/library/cpp/http/io/headers_ut.cpp b/library/cpp/http/io/headers_ut.cpp new file mode 100644 index 0000000000..1d23ef8fdc --- /dev/null +++ b/library/cpp/http/io/headers_ut.cpp @@ -0,0 +1,176 @@ +#include <util/generic/set.h> +#include <util/generic/string.h> +#include <util/generic/strbuf.h> +#include <utility> + +#include <library/cpp/http/io/headers.h> +#include <library/cpp/testing/unittest/registar.h> + +namespace { + class THeadersExistence { + public: + THeadersExistence() = default; + + THeadersExistence(const THttpHeaders& headers) { + for (THttpHeaders::TConstIterator it = headers.Begin(); + it != headers.End(); + ++it) { + Add(it->Name(), it->Value()); + } + } + + public: + void Add(TStringBuf name, TStringBuf value) { + Impl.emplace(TString(name), TString(value)); + } + + bool operator==(const THeadersExistence& rhs) const { + return Impl == rhs.Impl; + } + + private: + typedef TMultiSet<std::pair<TString, TString>> TImpl; + TImpl Impl; + }; +} + +bool operator==(const THeadersExistence& lhs, const THttpHeaders& rhs) { + return lhs == THeadersExistence(rhs); +} + +bool operator==(const THttpHeaders& lhs, const THeadersExistence& rhs) { + return THeadersExistence(lhs) == rhs; +} + +class THttpHeadersTest: public TTestBase { + UNIT_TEST_SUITE(THttpHeadersTest); + UNIT_TEST(TestAddOperation1Arg); + UNIT_TEST(TestAddOperation2Args); + UNIT_TEST(TestAddOrReplaceOperation1Arg); + UNIT_TEST(TestAddOrReplaceOperation2Args); + UNIT_TEST(TestAddHeaderTemplateness); + UNIT_TEST(TestFindHeader); + UNIT_TEST_SUITE_END(); + +private: + typedef void (*TAddHeaderFunction)(THttpHeaders&, TStringBuf name, TStringBuf value); + typedef void (*TAddOrReplaceHeaderFunction)(THttpHeaders&, TStringBuf name, TStringBuf value); + +public: + void TestAddOperation1Arg(); + void TestAddOperation2Args(); + void TestAddOrReplaceOperation1Arg(); + void TestAddOrReplaceOperation2Args(); + void TestAddHeaderTemplateness(); + void TestFindHeader(); + +private: + static void AddHeaderImpl1Arg(THttpHeaders& headers, TStringBuf name, TStringBuf value) { + headers.AddHeader(THttpInputHeader(TString(name), TString(value))); + } + + static void AddHeaderImpl2Args(THttpHeaders& headers, TStringBuf name, TStringBuf value) { + headers.AddHeader(TString(name), TString(value)); + } + + static void AddOrReplaceHeaderImpl1Arg(THttpHeaders& headers, TStringBuf name, TStringBuf value) { + headers.AddOrReplaceHeader(THttpInputHeader(TString(name), TString(value))); + } + + static void AddOrReplaceHeaderImpl2Args(THttpHeaders& headers, TStringBuf name, TStringBuf value) { + headers.AddOrReplaceHeader(TString(name), TString(value)); + } + + void DoTestAddOperation(TAddHeaderFunction); + void DoTestAddOrReplaceOperation(TAddHeaderFunction, TAddOrReplaceHeaderFunction); +}; + +UNIT_TEST_SUITE_REGISTRATION(THttpHeadersTest); + +void THttpHeadersTest::TestAddOperation1Arg() { + DoTestAddOperation(AddHeaderImpl1Arg); +} +void THttpHeadersTest::TestAddOperation2Args() { + DoTestAddOperation(AddHeaderImpl2Args); +} + +void THttpHeadersTest::TestAddOrReplaceOperation1Arg() { + DoTestAddOrReplaceOperation(AddHeaderImpl1Arg, AddOrReplaceHeaderImpl1Arg); +} +void THttpHeadersTest::TestAddOrReplaceOperation2Args() { + DoTestAddOrReplaceOperation(AddHeaderImpl2Args, AddOrReplaceHeaderImpl2Args); +} + +void THttpHeadersTest::DoTestAddOperation(TAddHeaderFunction addHeader) { + THttpHeaders h1; + + addHeader(h1, "h1", "v1"); + addHeader(h1, "h2", "v1"); + + addHeader(h1, "h3", "v1"); + addHeader(h1, "h3", "v2"); + addHeader(h1, "h3", "v2"); + + THeadersExistence h2; + + h2.Add("h1", "v1"); + h2.Add("h2", "v1"); + + h2.Add("h3", "v1"); + h2.Add("h3", "v2"); + h2.Add("h3", "v2"); + + UNIT_ASSERT(h2 == h1); +} + +// Sorry, but AddOrReplaceHeader replaces only first occurence +void THttpHeadersTest::DoTestAddOrReplaceOperation(TAddHeaderFunction addHeader, TAddOrReplaceHeaderFunction addOrReplaceHeader) { + THttpHeaders h1; + + addHeader(h1, "h1", "v1"); + + addOrReplaceHeader(h1, "h2", "v1"); + addOrReplaceHeader(h1, "h2", "v2"); + addOrReplaceHeader(h1, "h2", "v3"); + addHeader(h1, "h2", "v4"); + + addHeader(h1, "h3", "v1"); + addHeader(h1, "h3", "v2"); + addOrReplaceHeader(h1, "h3", "v3"); + + THeadersExistence h2; + + h2.Add("h1", "v1"); + + h2.Add("h2", "v3"); + h2.Add("h2", "v4"); + + h2.Add("h3", "v2"); + h2.Add("h3", "v3"); + + UNIT_ASSERT(h2 == h1); +} + +void THttpHeadersTest::TestAddHeaderTemplateness() { + THttpHeaders h1; + h1.AddHeader("h1", "v1"); + h1.AddHeader("h2", TString("v2")); + h1.AddHeader("h3", TStringBuf("v3")); + h1.AddHeader("h4", TStringBuf("v4")); + + THeadersExistence h2; + h2.Add("h1", "v1"); + h2.Add("h2", "v2"); + h2.Add("h3", "v3"); + h2.Add("h4", "v4"); + + UNIT_ASSERT(h1 == h2); +} + +void THttpHeadersTest::TestFindHeader() { + THttpHeaders sut; + sut.AddHeader("NaMe", "Value"); + + UNIT_ASSERT(sut.FindHeader("name")); + UNIT_ASSERT(sut.FindHeader("name")->Value() == "Value"); +} diff --git a/library/cpp/http/io/list_codings/main.cpp b/library/cpp/http/io/list_codings/main.cpp new file mode 100644 index 0000000000..9818d02bdf --- /dev/null +++ b/library/cpp/http/io/list_codings/main.cpp @@ -0,0 +1,8 @@ +#include <library/cpp/http/io/stream.h> +#include <util/stream/output.h> + +int main() { + for (auto codec : SupportedCodings()) { + Cout << codec << Endl; + } +} diff --git a/library/cpp/http/io/list_codings/ya.make b/library/cpp/http/io/list_codings/ya.make new file mode 100644 index 0000000000..e5c5fed6dc --- /dev/null +++ b/library/cpp/http/io/list_codings/ya.make @@ -0,0 +1,13 @@ +PROGRAM() + +OWNER(pg) + +PEERDIR( + library/cpp/http/io +) + +SRCS( + main.cpp +) + +END() diff --git a/library/cpp/http/io/stream.cpp b/library/cpp/http/io/stream.cpp new file mode 100644 index 0000000000..6689be684f --- /dev/null +++ b/library/cpp/http/io/stream.cpp @@ -0,0 +1,1005 @@ +#include "stream.h" + +#include "compression.h" +#include "chunk.h" + +#include <util/stream/buffered.h> +#include <util/stream/length.h> +#include <util/stream/multi.h> +#include <util/stream/null.h> +#include <util/stream/tee.h> + +#include <util/system/compat.h> +#include <util/system/yassert.h> + +#include <util/network/socket.h> + +#include <util/string/cast.h> +#include <util/string/strip.h> + +#include <util/generic/string.h> +#include <util/generic/utility.h> +#include <util/generic/hash_set.h> +#include <util/generic/yexception.h> + +#define HEADERCMP(header, str) \ + case sizeof(str) - 1: \ + if (!stricmp((header).Name().data(), str)) + +namespace { + inline size_t SuggestBufferSize() { + return 8192; + } + + inline TStringBuf Trim(const char* b, const char* e) noexcept { + return StripString(TStringBuf(b, e)); + } + + inline TStringBuf RmSemiColon(const TStringBuf& s) { + return s.Before(';'); + } + + template <class T, size_t N> + class TStreams: private TNonCopyable { + struct TDelete { + inline void operator()(T* t) noexcept { + delete t; + } + }; + + typedef T* TPtr; + + public: + inline TStreams() noexcept + : Beg_(T_ + N) + { + } + + inline ~TStreams() { + TDelete f; + + ForEach(f); + } + + template <class S> + inline S* Add(S* t) noexcept { + return (S*)AddImpl((T*)t); + } + + template <class Functor> + inline void ForEach(Functor& f) { + const TPtr* end = T_ + N; + + for (TPtr* cur = Beg_; cur != end; ++cur) { + f(*cur); + } + } + + TPtr Top() { + const TPtr* end = T_ + N; + return end == Beg_ ? nullptr : *Beg_; + } + + private: + inline T* AddImpl(T* t) noexcept { + Y_ASSERT(Beg_ > T_); + + return (*--Beg_ = t); + } + + private: + TPtr T_[N]; + TPtr* Beg_; + }; + + template <class TStream> + class TLazy: public IOutputStream { + public: + TLazy(IOutputStream* out, ui16 bs) + : Output_(out) + , BlockSize_(bs) + { + } + + void DoWrite(const void* buf, size_t len) override { + ConstructSlave(); + Slave_->Write(buf, len); + } + + void DoFlush() override { + ConstructSlave(); + Slave_->Flush(); + } + + void DoFinish() override { + ConstructSlave(); + Slave_->Finish(); + } + + private: + inline void ConstructSlave() { + if (!Slave_) { + Slave_.Reset(new TStream(Output_, BlockSize_)); + } + } + + private: + IOutputStream* Output_; + ui16 BlockSize_; + THolder<IOutputStream> Slave_; + }; +} + +class THttpInput::TImpl { + typedef THashSet<TString> TAcceptCodings; + +public: + inline TImpl(IInputStream* slave) + : Slave_(slave) + , Buffered_(Slave_, SuggestBufferSize()) + , ChunkedInput_(nullptr) + , Input_(nullptr) + , FirstLine_(ReadFirstLine(Buffered_)) + , Headers_(&Buffered_) + , KeepAlive_(false) + , HasContentLength_(false) + , ContentLength_(0) + , ContentEncoded_(false) + , Expect100Continue_(false) + { + BuildInputChain(); + Y_ASSERT(Input_); + } + + static TString ReadFirstLine(TBufferedInput& in) { + TString s; + Y_ENSURE_EX(in.ReadLine(s), THttpReadException() << "Failed to get first line"); + return s; + } + + inline ~TImpl() { + } + + inline size_t Read(void* buf, size_t len) { + return Perform(len, [this, buf](size_t toRead) { return Input_->Read(buf, toRead); }); + } + + inline size_t Skip(size_t len) { + return Perform(len, [this](size_t toSkip) { return Input_->Skip(toSkip); }); + } + + inline const TString& FirstLine() const noexcept { + return FirstLine_; + } + + inline const THttpHeaders& Headers() const noexcept { + return Headers_; + } + + inline const TMaybe<THttpHeaders>& Trailers() const noexcept { + return Trailers_; + } + + inline bool IsKeepAlive() const noexcept { + return KeepAlive_; + } + + inline bool AcceptEncoding(const TString& s) const { + return Codings_.find(to_lower(s)) != Codings_.end(); + } + + inline bool GetContentLength(ui64& value) const noexcept { + if (HasContentLength_) { + value = ContentLength_; + return true; + } + return false; + } + + inline bool ContentEncoded() const noexcept { + return ContentEncoded_; + } + + inline bool HasContent() const noexcept { + return HasContentLength_ || ChunkedInput_; + } + + inline bool HasExpect100Continue() const noexcept { + return Expect100Continue_; + } + +private: + template <class Operation> + inline size_t Perform(size_t len, const Operation& operation) { + size_t processed = operation(len); + if (processed == 0 && len > 0) { + if (!ChunkedInput_) { + Trailers_.ConstructInPlace(); + } else { + // Read the header of the trailing chunk. It remains in + // the TChunkedInput stream if the HTTP response is compressed. + char symbol; + if (ChunkedInput_->Read(&symbol, 1) != 0) { + ythrow THttpParseException() << "some data remaining in TChunkedInput"; + } + } + } + return processed; + } + + struct TParsedHeaders { + bool Chunked = false; + bool KeepAlive = false; + TStringBuf LZipped; + }; + + struct TTrEnc { + inline void operator()(const TStringBuf& s) { + if (s == TStringBuf("chunked")) { + p->Chunked = true; + } + } + + TParsedHeaders* p; + }; + + struct TAccCoding { + inline void operator()(const TStringBuf& s) { + c->insert(ToString(s)); + } + + TAcceptCodings* c; + }; + + template <class Functor> + inline void ForEach(TString in, Functor& f) { + in.to_lower(); + + const char* b = in.begin(); + const char* c = b; + const char* e = in.end(); + + while (c != e) { + if (*c == ',') { + f(RmSemiColon(Trim(b, c))); + b = c + 1; + } + + ++c; + } + + if (b != c) { + f(RmSemiColon(Trim(b, c))); + } + } + + inline bool IsRequest() const { + return strnicmp(FirstLine().data(), "get", 3) == 0 || + strnicmp(FirstLine().data(), "post", 4) == 0 || + strnicmp(FirstLine().data(), "put", 3) == 0 || + strnicmp(FirstLine().data(), "patch", 5) == 0 || + strnicmp(FirstLine().data(), "head", 4) == 0 || + strnicmp(FirstLine().data(), "delete", 6) == 0; + } + + inline void BuildInputChain() { + TParsedHeaders p; + + size_t pos = FirstLine_.rfind(' '); + // In HTTP/1.1 Keep-Alive is turned on by default + if (pos != TString::npos && strcmp(FirstLine_.c_str() + pos + 1, "HTTP/1.1") == 0) { + p.KeepAlive = true; //request + } else if (strnicmp(FirstLine_.data(), "HTTP/1.1", 8) == 0) { + p.KeepAlive = true; //reply + } + + for (THttpHeaders::TConstIterator h = Headers_.Begin(); h != Headers_.End(); ++h) { + const THttpInputHeader& header = *h; + switch (header.Name().size()) { + HEADERCMP(header, "transfer-encoding") { + TTrEnc f = {&p}; + ForEach(header.Value(), f); + } + break; + HEADERCMP(header, "content-encoding") { + p.LZipped = header.Value(); + } + break; + HEADERCMP(header, "accept-encoding") { + TAccCoding f = {&Codings_}; + ForEach(header.Value(), f); + } + break; + HEADERCMP(header, "content-length") { + HasContentLength_ = true; + ContentLength_ = FromString(header.Value()); + } + break; + HEADERCMP(header, "connection") { + // accept header "Connection: Keep-Alive, TE" + if (strnicmp(header.Value().data(), "keep-alive", 10) == 0) { + p.KeepAlive = true; + } else if (stricmp(header.Value().data(), "close") == 0) { + p.KeepAlive = false; + } + } + [[fallthrough]]; + HEADERCMP(header, "expect") { + auto findContinue = [&](const TStringBuf& s) { + if (strnicmp(s.data(), "100-continue", 13) == 0) { + Expect100Continue_ = true; + } + }; + ForEach(header.Value(), findContinue); + } + break; + } + } + + if (p.Chunked) { + ChunkedInput_ = Streams_.Add(new TChunkedInput(&Buffered_, &Trailers_)); + Input_ = ChunkedInput_; + } else { + // disable buffering + Buffered_.Reset(&Cnull); + Input_ = Streams_.Add(new TMultiInput(&Buffered_, Slave_)); + + if (IsRequest() || HasContentLength_) { + /* + * TODO - we have other cases + */ + Input_ = Streams_.Add(new TLengthLimitedInput(Input_, ContentLength_)); + } + } + + if (auto decoder = TCompressionCodecFactory::Instance().FindDecoder(p.LZipped)) { + ContentEncoded_ = true; + Input_ = Streams_.Add((*decoder)(Input_).Release()); + } + + KeepAlive_ = p.KeepAlive; + } + +private: + IInputStream* Slave_; + + /* + * input helpers + */ + TBufferedInput Buffered_; + TStreams<IInputStream, 8> Streams_; + IInputStream* ChunkedInput_; + + /* + * final input stream + */ + IInputStream* Input_; + + TString FirstLine_; + THttpHeaders Headers_; + TMaybe<THttpHeaders> Trailers_; + bool KeepAlive_; + + TAcceptCodings Codings_; + + bool HasContentLength_; + ui64 ContentLength_; + + bool ContentEncoded_; + bool Expect100Continue_; +}; + +THttpInput::THttpInput(IInputStream* slave) + : Impl_(new TImpl(slave)) +{ +} + +THttpInput::THttpInput(THttpInput&& httpInput) = default; + +THttpInput::~THttpInput() { +} + +size_t THttpInput::DoRead(void* buf, size_t len) { + return Impl_->Read(buf, len); +} + +size_t THttpInput::DoSkip(size_t len) { + return Impl_->Skip(len); +} + +const THttpHeaders& THttpInput::Headers() const noexcept { + return Impl_->Headers(); +} + +const TMaybe<THttpHeaders>& THttpInput::Trailers() const noexcept { + return Impl_->Trailers(); +} + +const TString& THttpInput::FirstLine() const noexcept { + return Impl_->FirstLine(); +} + +bool THttpInput::IsKeepAlive() const noexcept { + return Impl_->IsKeepAlive(); +} + +bool THttpInput::AcceptEncoding(const TString& coding) const { + return Impl_->AcceptEncoding(coding); +} + +TString THttpInput::BestCompressionScheme(TArrayRef<const TStringBuf> codings) const { + return NHttp::ChooseBestCompressionScheme( + [this](const TString& coding) { + return AcceptEncoding(coding); + }, + codings + ); +} + +TString THttpInput::BestCompressionScheme() const { + return BestCompressionScheme(TCompressionCodecFactory::Instance().GetBestCodecs()); +} + +bool THttpInput::GetContentLength(ui64& value) const noexcept { + return Impl_->GetContentLength(value); +} + +bool THttpInput::ContentEncoded() const noexcept { + return Impl_->ContentEncoded(); +} + +bool THttpInput::HasContent() const noexcept { + return Impl_->HasContent(); +} + +bool THttpInput::HasExpect100Continue() const noexcept { + return Impl_->HasExpect100Continue(); +} + +class THttpOutput::TImpl { + class TSizeCalculator: public IOutputStream { + public: + inline TSizeCalculator() noexcept { + } + + ~TSizeCalculator() override { + } + + void DoWrite(const void* /*buf*/, size_t len) override { + Length_ += len; + } + + inline size_t Length() const noexcept { + return Length_; + } + + private: + size_t Length_ = 0; + }; + + enum TState { + Begin = 0, + FirstLineSent = 1, + HeadersSent = 2 + }; + + struct TFlush { + inline void operator()(IOutputStream* s) { + s->Flush(); + } + }; + + struct TFinish { + inline void operator()(IOutputStream* s) { + s->Finish(); + } + }; + +public: + inline TImpl(IOutputStream* slave, THttpInput* request) + : Slave_(slave) + , State_(Begin) + , Output_(Slave_) + , Request_(request) + , Version_(1100) + , KeepAliveEnabled_(false) + , BodyEncodingEnabled_(true) + , CompressionHeaderEnabled_(true) + , Finished_(false) + { + } + + inline ~TImpl() { + } + + inline void SendContinue() { + Output_->Write("HTTP/1.1 100 Continue\r\n\r\n"); + Output_->Flush(); + } + + inline void Write(const void* buf, size_t len) { + if (Finished_) { + ythrow THttpException() << "can not write to finished stream"; + } + + if (State_ == HeadersSent) { + Output_->Write(buf, len); + + return; + } + + const char* b = (const char*)buf; + const char* e = b + len; + const char* c = b; + + while (c != e) { + if (*c == '\n') { + Line_.append(b, c); + + if (!Line_.empty() && Line_.back() == '\r') { + Line_.pop_back(); + } + + b = c + 1; + + Process(Line_); + + if (State_ == HeadersSent) { + Output_->Write(b, e - b); + + return; + } + + Line_.clear(); + } + + ++c; + } + + if (b != c) { + Line_.append(b, c); + } + } + + inline void Flush() { + TFlush f; + Streams_.ForEach(f); + Slave_->Flush(); // see SEARCH-1030 + } + + inline void Finish() { + if (Finished_) { + return; + } + + TFinish f; + Streams_.ForEach(f); + Slave_->Finish(); // see SEARCH-1030 + + Finished_ = true; + } + + inline const THttpHeaders& SentHeaders() const noexcept { + return Headers_; + } + + inline void EnableCompression(TArrayRef<const TStringBuf> schemas) { + ComprSchemas_ = schemas; + } + + inline void EnableKeepAlive(bool enable) { + KeepAliveEnabled_ = enable; + } + + inline void EnableBodyEncoding(bool enable) { + BodyEncodingEnabled_ = enable; + } + + inline void EnableCompressionHeader(bool enable) { + CompressionHeaderEnabled_ = enable; + } + + inline bool IsCompressionEnabled() const noexcept { + return !ComprSchemas_.empty(); + } + + inline bool IsKeepAliveEnabled() const noexcept { + return KeepAliveEnabled_; + } + + inline bool IsBodyEncodingEnabled() const noexcept { + return BodyEncodingEnabled_; + } + + inline bool IsCompressionHeaderEnabled() const noexcept { + return CompressionHeaderEnabled_; + } + + inline bool CanBeKeepAlive() const noexcept { + return SupportChunkedTransfer() && IsKeepAliveEnabled() && (Request_ ? Request_->IsKeepAlive() : true); + } + + inline const TString& FirstLine() const noexcept { + return FirstLine_; + } + + inline size_t SentSize() const noexcept { + return SizeCalculator_.Length(); + } + +private: + static inline bool IsResponse(const TString& s) noexcept { + return strnicmp(s.data(), "HTTP/", 5) == 0; + } + + static inline bool IsRequest(const TString& s) noexcept { + return !IsResponse(s); + } + + inline bool IsHttpRequest() const noexcept { + return IsRequest(FirstLine_); + } + + inline bool HasResponseBody() const noexcept { + if (IsHttpResponse()) { + if (Request_ && Request_->FirstLine().StartsWith(TStringBuf("HEAD"))) + return false; + if (FirstLine_.size() > 9 && strncmp(FirstLine_.data() + 9, "204", 3) == 0) + return false; + return true; + } + return false; + } + + inline bool IsHttpResponse() const noexcept { + return IsResponse(FirstLine_); + } + + inline bool HasRequestBody() const noexcept { + return strnicmp(FirstLine_.data(), "POST", 4) == 0 || + strnicmp(FirstLine_.data(), "PATCH", 5) == 0 || + strnicmp(FirstLine_.data(), "PUT", 3) == 0; + } + static inline size_t ParseHttpVersion(const TString& s) { + if (s.empty()) { + ythrow THttpParseException() << "malformed http stream"; + } + + size_t parsed_version = 0; + + if (IsResponse(s)) { + const char* b = s.data() + 5; + + while (*b && *b != ' ') { + if (*b != '.') { + parsed_version *= 10; + parsed_version += (*b - '0'); + } + + ++b; + } + } else { + /* + * s not empty here + */ + const char* e = s.end() - 1; + const char* b = s.begin(); + size_t mult = 1; + + while (e != b && *e != '/') { + if (*e != '.') { + parsed_version += (*e - '0') * mult; + mult *= 10; + } + + --e; + } + } + + return parsed_version * 100; + } + + inline void ParseHttpVersion() { + size_t parsed_version = ParseHttpVersion(FirstLine_); + + if (Request_) { + parsed_version = Min(parsed_version, ParseHttpVersion(Request_->FirstLine())); + } + + Version_ = parsed_version; + } + + inline void Process(const TString& s) { + Y_ASSERT(State_ != HeadersSent); + + if (State_ == Begin) { + FirstLine_ = s; + ParseHttpVersion(); + State_ = FirstLineSent; + } else { + if (s.empty()) { + BuildOutputStream(); + WriteCached(); + State_ = HeadersSent; + } else { + AddHeader(THttpInputHeader(s)); + } + } + } + + inline void WriteCachedImpl(IOutputStream* s) const { + s->Write(FirstLine_.data(), FirstLine_.size()); + s->Write("\r\n", 2); + Headers_.OutTo(s); + s->Write("\r\n", 2); + s->Finish(); + } + + inline void WriteCached() { + size_t buflen = 0; + + { + TSizeCalculator out; + + WriteCachedImpl(&out); + buflen = out.Length(); + } + + { + TBufferedOutput out(Slave_, buflen); + + WriteCachedImpl(&out); + } + + if (IsHttpRequest() && !HasRequestBody()) { + /* + * if this is http request, then send it now + */ + + Slave_->Flush(); + } + } + + inline bool SupportChunkedTransfer() const noexcept { + return Version_ >= 1100; + } + + inline void BuildOutputStream() { + if (CanBeKeepAlive()) { + AddOrReplaceHeader(THttpInputHeader("Connection", "Keep-Alive")); + } else { + AddOrReplaceHeader(THttpInputHeader("Connection", "Close")); + } + + if (IsHttpResponse()) { + if (Request_ && IsCompressionEnabled() && HasResponseBody()) { + TString scheme = Request_->BestCompressionScheme(ComprSchemas_); + if (scheme != "identity") { + AddOrReplaceHeader(THttpInputHeader("Content-Encoding", scheme)); + RemoveHeader("Content-Length"); + } + } + + RebuildStream(); + } else { + if (IsCompressionEnabled()) { + AddOrReplaceHeader(THttpInputHeader("Accept-Encoding", BuildAcceptEncoding())); + } + if (HasRequestBody()) { + RebuildStream(); + } + } + } + + inline TString BuildAcceptEncoding() const { + TString ret; + + for (const auto& coding : ComprSchemas_) { + if (ret) { + ret += ", "; + } + + ret += coding; + } + + return ret; + } + + inline void RebuildStream() { + bool keepAlive = false; + const TCompressionCodecFactory::TEncoderConstructor* encoder = nullptr; + bool chunked = false; + bool haveContentLength = false; + + for (THttpHeaders::TConstIterator h = Headers_.Begin(); h != Headers_.End(); ++h) { + const THttpInputHeader& header = *h; + const TString hl = to_lower(header.Name()); + + if (hl == TStringBuf("connection")) { + keepAlive = to_lower(header.Value()) == TStringBuf("keep-alive"); + } else if (IsCompressionHeaderEnabled() && hl == TStringBuf("content-encoding")) { + encoder = TCompressionCodecFactory::Instance().FindEncoder(to_lower(header.Value())); + } else if (hl == TStringBuf("transfer-encoding")) { + chunked = to_lower(header.Value()) == TStringBuf("chunked"); + } else if (hl == TStringBuf("content-length")) { + haveContentLength = true; + } + } + + if (!haveContentLength && !chunked && (IsHttpRequest() || HasResponseBody()) && SupportChunkedTransfer() && (keepAlive || encoder || IsHttpRequest())) { + AddHeader(THttpInputHeader("Transfer-Encoding", "chunked")); + chunked = true; + } + + if (IsBodyEncodingEnabled() && chunked) { + Output_ = Streams_.Add(new TChunkedOutput(Output_)); + } + + Output_ = Streams_.Add(new TTeeOutput(Output_, &SizeCalculator_)); + + if (IsBodyEncodingEnabled() && encoder) { + Output_ = Streams_.Add((*encoder)(Output_).Release()); + } + } + + inline void AddHeader(const THttpInputHeader& hdr) { + Headers_.AddHeader(hdr); + } + + inline void AddOrReplaceHeader(const THttpInputHeader& hdr) { + Headers_.AddOrReplaceHeader(hdr); + } + + inline void RemoveHeader(const TString& hdr) { + Headers_.RemoveHeader(hdr); + } + +private: + IOutputStream* Slave_; + TState State_; + IOutputStream* Output_; + TStreams<IOutputStream, 8> Streams_; + TString Line_; + TString FirstLine_; + THttpHeaders Headers_; + THttpInput* Request_; + size_t Version_; + + TArrayRef<const TStringBuf> ComprSchemas_; + + bool KeepAliveEnabled_; + bool BodyEncodingEnabled_; + bool CompressionHeaderEnabled_; + + bool Finished_; + + TSizeCalculator SizeCalculator_; +}; + +THttpOutput::THttpOutput(IOutputStream* slave) + : Impl_(new TImpl(slave, nullptr)) +{ +} + +THttpOutput::THttpOutput(IOutputStream* slave, THttpInput* request) + : Impl_(new TImpl(slave, request)) +{ +} + +THttpOutput::~THttpOutput() { + try { + Finish(); + } catch (...) { + } +} + +void THttpOutput::DoWrite(const void* buf, size_t len) { + Impl_->Write(buf, len); +} + +void THttpOutput::DoFlush() { + Impl_->Flush(); +} + +void THttpOutput::DoFinish() { + Impl_->Finish(); +} + +const THttpHeaders& THttpOutput::SentHeaders() const noexcept { + return Impl_->SentHeaders(); +} + +void THttpOutput::EnableCompression(bool enable) { + if (enable) { + EnableCompression(TCompressionCodecFactory::Instance().GetBestCodecs()); + } else { + TArrayRef<TStringBuf> codings; + EnableCompression(codings); + } +} + +void THttpOutput::EnableCompression(TArrayRef<const TStringBuf> schemas) { + Impl_->EnableCompression(schemas); +} + +void THttpOutput::EnableKeepAlive(bool enable) { + Impl_->EnableKeepAlive(enable); +} + +void THttpOutput::EnableBodyEncoding(bool enable) { + Impl_->EnableBodyEncoding(enable); +} + +void THttpOutput::EnableCompressionHeader(bool enable) { + Impl_->EnableCompressionHeader(enable); +} + +bool THttpOutput::IsKeepAliveEnabled() const noexcept { + return Impl_->IsKeepAliveEnabled(); +} + +bool THttpOutput::IsBodyEncodingEnabled() const noexcept { + return Impl_->IsBodyEncodingEnabled(); +} + +bool THttpOutput::IsCompressionEnabled() const noexcept { + return Impl_->IsCompressionEnabled(); +} + +bool THttpOutput::IsCompressionHeaderEnabled() const noexcept { + return Impl_->IsCompressionHeaderEnabled(); +} + +bool THttpOutput::CanBeKeepAlive() const noexcept { + return Impl_->CanBeKeepAlive(); +} + +void THttpOutput::SendContinue() { + Impl_->SendContinue(); +} + +const TString& THttpOutput::FirstLine() const noexcept { + return Impl_->FirstLine(); +} + +size_t THttpOutput::SentSize() const noexcept { + return Impl_->SentSize(); +} + +unsigned ParseHttpRetCode(const TStringBuf& ret) { + const TStringBuf code = StripString(StripString(ret.After(' ')).Before(' ')); + + return FromString<unsigned>(code.data(), code.size()); +} + +void SendMinimalHttpRequest(TSocket& s, const TStringBuf& host, const TStringBuf& request, const TStringBuf& agent, const TStringBuf& from) { + TSocketOutput so(s); + THttpOutput output(&so); + + output.EnableKeepAlive(false); + output.EnableCompression(false); + + const IOutputStream::TPart parts[] = { + IOutputStream::TPart(TStringBuf("GET ")), + IOutputStream::TPart(request), + IOutputStream::TPart(TStringBuf(" HTTP/1.1")), + IOutputStream::TPart::CrLf(), + IOutputStream::TPart(TStringBuf("Host: ")), + IOutputStream::TPart(host), + IOutputStream::TPart::CrLf(), + IOutputStream::TPart(TStringBuf("User-Agent: ")), + IOutputStream::TPart(agent), + IOutputStream::TPart::CrLf(), + IOutputStream::TPart(TStringBuf("From: ")), + IOutputStream::TPart(from), + IOutputStream::TPart::CrLf(), + IOutputStream::TPart::CrLf(), + }; + + output.Write(parts, sizeof(parts) / sizeof(*parts)); + output.Finish(); +} + +TArrayRef<const TStringBuf> SupportedCodings() { + return TCompressionCodecFactory::Instance().GetBestCodecs(); +} diff --git a/library/cpp/http/io/stream.h b/library/cpp/http/io/stream.h new file mode 100644 index 0000000000..78ca4fc814 --- /dev/null +++ b/library/cpp/http/io/stream.h @@ -0,0 +1,178 @@ +#pragma once + +#include "headers.h" + +#include <util/stream/output.h> +#include <util/generic/maybe.h> +#include <util/generic/ptr.h> +#include <util/generic/string.h> +#include <util/generic/strbuf.h> +#include <util/generic/yexception.h> +#include <util/generic/array_ref.h> + +class TSocket; + +struct THttpException: public yexception { +}; + +struct THttpParseException: public THttpException { +}; + +struct THttpReadException: public THttpException { +}; + +/// Чтение ответа HTTP-сервера. +class THttpInput: public IInputStream { +public: + THttpInput(IInputStream* slave); + THttpInput(THttpInput&& httpInput); + ~THttpInput() override; + + /* + * parsed http headers + */ + /// Возвращает контейнер с заголовками ответа HTTP-сервера. + const THttpHeaders& Headers() const noexcept; + + /* + * parsed http trailers + */ + /// Возвращает контейнер (возможно пустой) с trailer'ами ответа HTTP-сервера. + /// Поток должен быть вычитан полностью прежде чем trailer'ы будут доступны. + /// Пока поток не вычитан до конца возвращается Nothing. + /// https://tools.ietf.org/html/rfc7230#section-4.1.2 + const TMaybe<THttpHeaders>& Trailers() const noexcept; + + /* + * first line - response or request + */ + /// Возвращает первую строку ответа HTTP-сервера. + /// @details Первая строка HTTP-сервера - строка состояния, + /// содержащая три поля: версию HTTP, код состояния и описание. + const TString& FirstLine() const noexcept; + + /* + * connection can be keep-alive + */ + /// Проверяет, не завершено ли соединение с сервером. + /// @details Транзакция считается завершенной, если не передан заголовок + /// "Connection: Keep Alive". + bool IsKeepAlive() const noexcept; + + /* + * output data can be encoded + */ + /// Проверяет, поддерживается ли данный тип кодирования содержимого + /// ответа HTTP-сервера. + bool AcceptEncoding(const TString& coding) const; + + /// Пытается определить наилучший тип кодирования ответа HTTP-сервера. + /// @details Если ответ сервера говорит о том, что поддерживаются + /// любые типы кодирования, выбирается gzip. В противном случае + /// из списка типов кодирования выбирается лучший из поддерживаемых сервером. + TString BestCompressionScheme() const; + TString BestCompressionScheme(TArrayRef<const TStringBuf> codings) const; + + /// Если заголовки содержат Content-Length, возвращает true и + /// записывает значение из заголовка в value + bool GetContentLength(ui64& value) const noexcept; + + /// Признак запакованности данных, - если выставлен, то Content-Length, при наличии в заголовках, + /// показывает объём запакованных данных, а из THttpInput мы будем вычитывать уже распакованные. + bool ContentEncoded() const noexcept; + + /// Returns true if Content-Length or Transfer-Encoding header received + bool HasContent() const noexcept; + + bool HasExpect100Continue() const noexcept; + +private: + size_t DoRead(void* buf, size_t len) override; + size_t DoSkip(size_t len) override; + +private: + class TImpl; + THolder<TImpl> Impl_; +}; + +/// Передача запроса HTTP-серверу. +class THttpOutput: public IOutputStream { +public: + THttpOutput(IOutputStream* slave); + THttpOutput(IOutputStream* slave, THttpInput* request); + ~THttpOutput() override; + + /* + * sent http headers + */ + /// Возвращает контейнер с заголовками запроса к HTTP-серверу. + const THttpHeaders& SentHeaders() const noexcept; + + /// Устанавливает режим, при котором сервер выдает ответ в упакованном виде. + void EnableCompression(bool enable); + void EnableCompression(TArrayRef<const TStringBuf> schemas); + + /// Устанавливает режим, при котором соединение с сервером не завершается + /// после окончания транзакции. + void EnableKeepAlive(bool enable); + + /// Устанавливает режим, при котором тело HTTP-запроса/ответа преобразуется в соответствии + /// с заголовками Content-Encoding и Transfer-Encoding (включен по умолчанию) + void EnableBodyEncoding(bool enable); + + /// Устанавливает режим, при котором тело HTTP-ответа сжимается кодеком + /// указанным в Content-Encoding (включен по умолчанию) + void EnableCompressionHeader(bool enable); + + /// Проверяет, производится ли выдача ответов в упакованном виде. + bool IsCompressionEnabled() const noexcept; + + /// Проверяет, не завершается ли соединение с сервером после окончания транзакции. + bool IsKeepAliveEnabled() const noexcept; + + /// Проверяет, преобразуется ли тело HTTP-запроса/ответа в соответствии + /// с заголовками Content-Encoding и Transfer-Encoding + bool IsBodyEncodingEnabled() const noexcept; + + /// Проверяет, сжимается ли тело HTTP-ответа кодеком + /// указанным в Content-Encoding + bool IsCompressionHeaderEnabled() const noexcept; + + /* + * is this connection can be really keep-alive + */ + /// Проверяет, можно ли установить режим, при котором соединение с сервером + /// не завершается после окончания транзакции. + bool CanBeKeepAlive() const noexcept; + + void SendContinue(); + + /* + * first line - response or request + */ + /// Возвращает первую строку HTTP-запроса/ответа + const TString& FirstLine() const noexcept; + + /// Возвращает размер отправленных данных (без заголовков, с учётом сжатия, без + /// учёта chunked transfer encoding) + size_t SentSize() const noexcept; + +private: + void DoWrite(const void* buf, size_t len) override; + void DoFlush() override; + void DoFinish() override; + +private: + class TImpl; + THolder<TImpl> Impl_; +}; + +/// Возвращает код состояния из ответа сервера. +unsigned ParseHttpRetCode(const TStringBuf& ret); + +/// Отправляет HTTP-серверу запрос с минимумом необходимых заголовков. +void SendMinimalHttpRequest(TSocket& s, const TStringBuf& host, const TStringBuf& request, const TStringBuf& agent = "YandexSomething/1.0", const TStringBuf& from = "webadmin@yandex.ru"); + +TArrayRef<const TStringBuf> SupportedCodings(); + +/// @} diff --git a/library/cpp/http/io/stream_ut.cpp b/library/cpp/http/io/stream_ut.cpp new file mode 100644 index 0000000000..1ea35df675 --- /dev/null +++ b/library/cpp/http/io/stream_ut.cpp @@ -0,0 +1,732 @@ +#include "stream.h" +#include "chunk.h" + +#include <library/cpp/http/server/http_ex.h> + +#include <library/cpp/testing/unittest/registar.h> +#include <library/cpp/testing/unittest/tests_data.h> + +#include <util/string/printf.h> +#include <util/network/socket.h> +#include <util/stream/file.h> +#include <util/stream/output.h> +#include <util/stream/tee.h> +#include <util/stream/zlib.h> +#include <util/stream/null.h> + +Y_UNIT_TEST_SUITE(THttpStreamTest) { + class TTestHttpServer: public THttpServer::ICallBack { + class TRequest: public THttpClientRequestEx { + public: + inline TRequest(TTestHttpServer* parent) + : Parent_(parent) + { + } + + bool Reply(void* /*tsr*/) override { + if (!ProcessHeaders()) { + return true; + } + + // Check that function will not hang on + Input().ReadAll(); + + // "lo" is for "local" + if (RD.ServerName() == "yandex.lo") { + // do redirect + Output() << "HTTP/1.1 301 Moved permanently\r\n" + "Location: http://www.yandex.lo\r\n" + "\r\n"; + } else if (RD.ServerName() == "www.yandex.lo") { + Output() << "HTTP/1.1 200 Ok\r\n" + "\r\n"; + } else { + Output() << "HTTP/1.1 200 Ok\r\n\r\n"; + if (Buf.Size()) { + Output().Write(Buf.AsCharPtr(), Buf.Size()); + } else { + Output() << Parent_->Res_; + } + } + Output().Finish(); + + Parent_->LastRequestSentSize_ = Output().SentSize(); + + return true; + } + + private: + TTestHttpServer* Parent_ = nullptr; + }; + + public: + inline TTestHttpServer(const TString& res) + : Res_(res) + { + } + + TClientRequest* CreateClient() override { + return new TRequest(this); + } + + size_t LastRequestSentSize() const { + return LastRequestSentSize_; + } + + private: + TString Res_; + size_t LastRequestSentSize_ = 0; + }; + + Y_UNIT_TEST(TestCodings1) { + UNIT_ASSERT(SupportedCodings().size() > 0); + } + + Y_UNIT_TEST(TestHttpInput) { + TString res = "I'm a teapot"; + TPortManager pm; + const ui16 port = pm.GetPort(); + + TTestHttpServer serverImpl(res); + THttpServer server(&serverImpl, THttpServer::TOptions(port).EnableKeepAlive(true).EnableCompression(true)); + + UNIT_ASSERT(server.Start()); + + TNetworkAddress addr("localhost", port); + TSocket s(addr); + + //TDebugOutput dbg; + TNullOutput dbg; + + { + TSocketOutput so(s); + TTeeOutput out(&so, &dbg); + THttpOutput output(&out); + + output.EnableKeepAlive(true); + output.EnableCompression(true); + + TString r; + r += "GET / HTTP/1.1"; + r += "\r\n"; + r += "Host: yandex.lo"; + r += "\r\n"; + r += "\r\n"; + + output.Write(r.data(), r.size()); + output.Finish(); + } + + { + TSocketInput si(s); + THttpInput input(&si); + unsigned httpCode = ParseHttpRetCode(input.FirstLine()); + UNIT_ASSERT_VALUES_EQUAL(httpCode / 10, 30u); + + TransferData(&input, &dbg); + } + server.Stop(); + } + + Y_UNIT_TEST(TestHttpInputDelete) { + TString res = "I'm a teapot"; + TPortManager pm; + const ui16 port = pm.GetPort(); + + TTestHttpServer serverImpl(res); + THttpServer server(&serverImpl, THttpServer::TOptions(port).EnableKeepAlive(true).EnableCompression(true)); + + UNIT_ASSERT(server.Start()); + + TNetworkAddress addr("localhost", port); + TSocket s(addr); + + //TDebugOutput dbg; + TNullOutput dbg; + + { + TSocketOutput so(s); + TTeeOutput out(&so, &dbg); + THttpOutput output(&out); + + output.EnableKeepAlive(true); + output.EnableCompression(true); + + TString r; + r += "DELETE / HTTP/1.1"; + r += "\r\n"; + r += "Host: yandex.lo"; + r += "\r\n"; + r += "\r\n"; + + output.Write(r.data(), r.size()); + output.Finish(); + } + + { + TSocketInput si(s); + THttpInput input(&si); + unsigned httpCode = ParseHttpRetCode(input.FirstLine()); + UNIT_ASSERT_VALUES_EQUAL(httpCode / 10, 30u); + + TransferData(&input, &dbg); + } + server.Stop(); + } + + Y_UNIT_TEST(TestParseHttpRetCode) { + UNIT_ASSERT_VALUES_EQUAL(ParseHttpRetCode("HTTP/1.1 301"), 301u); + } + + Y_UNIT_TEST(TestKeepAlive) { + { + TString s = "GET / HTTP/1.0\r\n\r\n"; + TStringInput si(s); + THttpInput in(&si); + UNIT_ASSERT(!in.IsKeepAlive()); + } + + { + TString s = "GET / HTTP/1.0\r\nConnection: keep-alive\r\n\r\n"; + TStringInput si(s); + THttpInput in(&si); + UNIT_ASSERT(in.IsKeepAlive()); + } + + { + TString s = "GET / HTTP/1.1\r\n\r\n"; + TStringInput si(s); + THttpInput in(&si); + UNIT_ASSERT(in.IsKeepAlive()); + } + + { + TString s = "GET / HTTP/1.1\r\nConnection: close\r\n\r\n"; + TStringInput si(s); + THttpInput in(&si); + UNIT_ASSERT(!in.IsKeepAlive()); + } + + { + TString s = "HTTP/1.0 200 Ok\r\n\r\n"; + TStringInput si(s); + THttpInput in(&si); + UNIT_ASSERT(!in.IsKeepAlive()); + } + + { + TString s = "HTTP/1.0 200 Ok\r\nConnection: keep-alive\r\n\r\n"; + TStringInput si(s); + THttpInput in(&si); + UNIT_ASSERT(in.IsKeepAlive()); + } + + { + TString s = "HTTP/1.1 200 Ok\r\n\r\n"; + TStringInput si(s); + THttpInput in(&si); + UNIT_ASSERT(in.IsKeepAlive()); + } + + { + TString s = "HTTP/1.1 200 Ok\r\nConnection: close\r\n\r\n"; + TStringInput si(s); + THttpInput in(&si); + UNIT_ASSERT(!in.IsKeepAlive()); + } + } + + Y_UNIT_TEST(TestMinRequest) { + TString res = "qqqqqq"; + TPortManager pm; + const ui16 port = pm.GetPort(); + + TTestHttpServer serverImpl(res); + THttpServer server(&serverImpl, THttpServer::TOptions(port).EnableKeepAlive(true).EnableCompression(true)); + + UNIT_ASSERT(server.Start()); + + TNetworkAddress addr("localhost", port); + + TSocket s(addr); + TNullOutput dbg; + + SendMinimalHttpRequest(s, "www.yandex.lo", "/"); + + TSocketInput si(s); + THttpInput input(&si); + unsigned httpCode = ParseHttpRetCode(input.FirstLine()); + UNIT_ASSERT_VALUES_EQUAL(httpCode, 200u); + + TransferData(&input, &dbg); + server.Stop(); + } + + Y_UNIT_TEST(TestResponseWithBlanks) { + TString res = "qqqqqq\r\n\r\nsdasdsad\r\n"; + TPortManager pm; + const ui16 port = pm.GetPort(); + + TTestHttpServer serverImpl(res); + THttpServer server(&serverImpl, THttpServer::TOptions(port).EnableKeepAlive(true).EnableCompression(true)); + + UNIT_ASSERT(server.Start()); + + TNetworkAddress addr("localhost", port); + + TSocket s(addr); + + SendMinimalHttpRequest(s, "www.yandex.ru", "/"); + + TSocketInput si(s); + THttpInput input(&si); + unsigned httpCode = ParseHttpRetCode(input.FirstLine()); + UNIT_ASSERT_VALUES_EQUAL(httpCode, 200u); + TString reply = input.ReadAll(); + UNIT_ASSERT_VALUES_EQUAL(reply, res); + server.Stop(); + } + + Y_UNIT_TEST(TestOutputFlush) { + TString str; + TStringOutput strOut(str); + TBufferedOutput bufOut(&strOut, 8192); + THttpOutput httpOut(&bufOut); + + httpOut.EnableKeepAlive(true); + httpOut.EnableCompression(true); + + const char* header = "GET / HTTP/1.1\r\nHost: yandex.ru\r\n\r\n"; + httpOut << header; + + unsigned curLen = str.size(); + const char* body = "<html>Hello</html>"; + httpOut << body; + UNIT_ASSERT_VALUES_EQUAL(curLen, str.size()); + httpOut.Flush(); + UNIT_ASSERT_VALUES_EQUAL(curLen + strlen(body), str.size()); + } + + Y_UNIT_TEST(TestOutputPostFlush) { + TString str; + TString checkStr; + TStringOutput strOut(str); + TStringOutput checkOut(checkStr); + TBufferedOutput bufOut(&strOut, 8192); + TTeeOutput teeOut(&bufOut, &checkOut); + THttpOutput httpOut(&teeOut); + + httpOut.EnableKeepAlive(true); + httpOut.EnableCompression(true); + + const char* header = "POST / HTTP/1.1\r\nHost: yandex.ru\r\n\r\n"; + httpOut << header; + + UNIT_ASSERT_VALUES_EQUAL(str.size(), 0u); + + const char* body = "<html>Hello</html>"; + httpOut << body; + UNIT_ASSERT_VALUES_EQUAL(str.size(), 0u); + + httpOut.Flush(); + UNIT_ASSERT_VALUES_EQUAL(checkStr.size(), str.size()); + } + + TString MakeHttpOutputBody(const char* body, bool encodingEnabled) { + TString str; + TStringOutput strOut(str); + { + TBufferedOutput bufOut(&strOut, 8192); + THttpOutput httpOut(&bufOut); + + httpOut.EnableKeepAlive(true); + httpOut.EnableCompression(true); + httpOut.EnableBodyEncoding(encodingEnabled); + + httpOut << "POST / HTTP/1.1\r\n"; + httpOut << "Host: yandex.ru\r\n"; + httpOut << "Content-Encoding: gzip\r\n"; + httpOut << "\r\n"; + + UNIT_ASSERT_VALUES_EQUAL(str.size(), 0u); + httpOut << body; + } + const char* bodyDelimiter = "\r\n\r\n"; + size_t bodyPos = str.find(bodyDelimiter); + UNIT_ASSERT(bodyPos != TString::npos); + return str.substr(bodyPos + strlen(bodyDelimiter)); + }; + + TString SimulateBodyEncoding(const char* body) { + TString bodyStr; + TStringOutput bodyOut(bodyStr); + TChunkedOutput chunkOut(&bodyOut); + TZLibCompress comprOut(&chunkOut, ZLib::GZip); + comprOut << body; + return bodyStr; + }; + + Y_UNIT_TEST(TestRebuildStreamOnPost) { + const char* body = "<html>Hello</html>"; + UNIT_ASSERT(MakeHttpOutputBody(body, false) == body); + UNIT_ASSERT(MakeHttpOutputBody(body, true) == SimulateBodyEncoding(body)); + } + + Y_UNIT_TEST(TestOutputFinish) { + TString str; + TStringOutput strOut(str); + TBufferedOutput bufOut(&strOut, 8192); + THttpOutput httpOut(&bufOut); + + httpOut.EnableKeepAlive(true); + httpOut.EnableCompression(true); + + const char* header = "GET / HTTP/1.1\r\nHost: yandex.ru\r\n\r\n"; + httpOut << header; + + unsigned curLen = str.size(); + const char* body = "<html>Hello</html>"; + httpOut << body; + UNIT_ASSERT_VALUES_EQUAL(curLen, str.size()); + httpOut.Finish(); + UNIT_ASSERT_VALUES_EQUAL(curLen + strlen(body), str.size()); + } + + Y_UNIT_TEST(TestMultilineHeaders) { + const char* headerLine0 = "HTTP/1.1 200 OK"; + const char* headerLine1 = "Content-Language: en"; + const char* headerLine2 = "Vary: Accept-Encoding, "; + const char* headerLine3 = "\tAccept-Language"; + const char* headerLine4 = "Content-Length: 18"; + + TString endLine("\r\n"); + TString r; + r += headerLine0 + endLine; + r += headerLine1 + endLine; + r += headerLine2 + endLine; + r += headerLine3 + endLine; + r += headerLine4 + endLine + endLine; + r += "<html>Hello</html>"; + TStringInput stringInput(r); + THttpInput input(&stringInput); + + const THttpHeaders& httpHeaders = input.Headers(); + UNIT_ASSERT_VALUES_EQUAL(httpHeaders.Count(), 3u); + + THttpHeaders::TConstIterator it = httpHeaders.Begin(); + UNIT_ASSERT_VALUES_EQUAL(it->ToString(), TString(headerLine1)); + UNIT_ASSERT_VALUES_EQUAL((++it)->ToString(), TString::Join(headerLine2, headerLine3)); + UNIT_ASSERT_VALUES_EQUAL((++it)->ToString(), TString(headerLine4)); + } + + Y_UNIT_TEST(ContentLengthRemoval) { + TMemoryInput request("GET / HTTP/1.1\r\nAccept-Encoding: gzip\r\n\r\n"); + THttpInput i(&request); + TString result; + TStringOutput out(result); + THttpOutput httpOut(&out, &i); + + httpOut.EnableKeepAlive(true); + httpOut.EnableCompression(true); + httpOut << "HTTP/1.1 200 OK\r\n"; + char answer[] = "Mary had a little lamb."; + httpOut << "Content-Length: " << strlen(answer) << "\r\n" + "\r\n"; + httpOut << answer; + httpOut.Finish(); + + Cdbg << result; + result.to_lower(); + UNIT_ASSERT(result.Contains("content-encoding: gzip")); + UNIT_ASSERT(!result.Contains("content-length")); + } + + Y_UNIT_TEST(CodecsPriority) { + TMemoryInput request("GET / HTTP/1.1\r\nAccept-Encoding: gzip, br\r\n\r\n"); + TVector<TStringBuf> codecs = {"br", "gzip"}; + + THttpInput i(&request); + TString result; + TStringOutput out(result); + THttpOutput httpOut(&out, &i); + + httpOut.EnableKeepAlive(true); + httpOut.EnableCompression(codecs); + httpOut << "HTTP/1.1 200 OK\r\n"; + char answer[] = "Mary had a little lamb."; + httpOut << "Content-Length: " << strlen(answer) << "\r\n" + "\r\n"; + httpOut << answer; + httpOut.Finish(); + + Cdbg << result; + result.to_lower(); + UNIT_ASSERT(result.Contains("content-encoding: br")); + } + + Y_UNIT_TEST(CodecsPriority2) { + TMemoryInput request("GET / HTTP/1.1\r\nAccept-Encoding: gzip, br\r\n\r\n"); + TVector<TStringBuf> codecs = {"gzip", "br"}; + + THttpInput i(&request); + TString result; + TStringOutput out(result); + THttpOutput httpOut(&out, &i); + + httpOut.EnableKeepAlive(true); + httpOut.EnableCompression(codecs); + httpOut << "HTTP/1.1 200 OK\r\n"; + char answer[] = "Mary had a little lamb."; + httpOut << "Content-Length: " << strlen(answer) << "\r\n" + "\r\n"; + httpOut << answer; + httpOut.Finish(); + + Cdbg << result; + result.to_lower(); + UNIT_ASSERT(result.Contains("content-encoding: gzip")); + } + + Y_UNIT_TEST(HasTrailers) { + TMemoryInput response( + "HTTP/1.1 200 OK\r\n" + "Transfer-Encoding: chunked\r\n" + "\r\n" + "3\r\n" + "foo" + "0\r\n" + "Bar: baz\r\n" + "\r\n"); + THttpInput i(&response); + TMaybe<THttpHeaders> trailers = i.Trailers(); + UNIT_ASSERT(!trailers.Defined()); + i.ReadAll(); + trailers = i.Trailers(); + UNIT_ASSERT_VALUES_EQUAL(trailers.GetRef().Count(), 1); + UNIT_ASSERT_VALUES_EQUAL(trailers.GetRef().Begin()->ToString(), "Bar: baz"); + } + + Y_UNIT_TEST(NoTrailersWithChunks) { + TMemoryInput response( + "HTTP/1.1 200 OK\r\n" + "Transfer-Encoding: chunked\r\n" + "\r\n" + "3\r\n" + "foo" + "0\r\n" + "\r\n"); + THttpInput i(&response); + TMaybe<THttpHeaders> trailers = i.Trailers(); + UNIT_ASSERT(!trailers.Defined()); + i.ReadAll(); + trailers = i.Trailers(); + UNIT_ASSERT_VALUES_EQUAL(trailers.GetRef().Count(), 0); + } + + Y_UNIT_TEST(NoTrailersNoChunks) { + TMemoryInput response( + "HTTP/1.1 200 OK\r\n" + "Content-Length: 3\r\n" + "\r\n" + "bar"); + THttpInput i(&response); + TMaybe<THttpHeaders> trailers = i.Trailers(); + UNIT_ASSERT(!trailers.Defined()); + i.ReadAll(); + trailers = i.Trailers(); + UNIT_ASSERT_VALUES_EQUAL(trailers.GetRef().Count(), 0); + } + + Y_UNIT_TEST(RequestWithoutContentLength) { + TStringStream request; + { + THttpOutput httpOutput(&request); + httpOutput << "POST / HTTP/1.1\r\n" + "Host: yandex.ru\r\n" + "\r\n"; + httpOutput << "GGLOL"; + } + { + TStringInput input(request.Str()); + THttpInput httpInput(&input); + bool chunkedOrHasContentLength = false; + for (const auto& header : httpInput.Headers()) { + if (header.Name() == "Transfer-Encoding" && header.Value() == "chunked" || header.Name() == "Content-Length") { + chunkedOrHasContentLength = true; + } + } + + // If request doesn't contain neither Content-Length header nor Transfer-Encoding header + // then server considers message body length to be zero. + // (See https://tools.ietf.org/html/rfc7230#section-3.3.3) + UNIT_ASSERT(chunkedOrHasContentLength); + + UNIT_ASSERT_VALUES_EQUAL(httpInput.ReadAll(), "GGLOL"); + } + } + + Y_UNIT_TEST(TestInputHasContent) { + { + TStringStream request; + request << "POST / HTTP/1.1\r\n" + "Host: yandex.ru\r\n" + "\r\n"; + request << "HTTPDATA"; + + TStringInput input(request.Str()); + THttpInput httpInput(&input); + + UNIT_ASSERT(!httpInput.HasContent()); + UNIT_ASSERT_VALUES_EQUAL(httpInput.ReadAll(), ""); + } + + { + TStringStream request; + request << "POST / HTTP/1.1\r\n" + "Host: yandex.ru\r\n" + "Content-Length: 8" + "\r\n\r\n"; + request << "HTTPDATA"; + + TStringInput input(request.Str()); + THttpInput httpInput(&input); + + UNIT_ASSERT(httpInput.HasContent()); + UNIT_ASSERT_VALUES_EQUAL(httpInput.ReadAll(), "HTTPDATA"); + } + + { + TStringStream request; + request << "POST / HTTP/1.1\r\n" + "Host: yandex.ru\r\n" + "Transfer-Encoding: chunked" + "\r\n\r\n"; + request << "8\r\nHTTPDATA\r\n0\r\n"; + + TStringInput input(request.Str()); + THttpInput httpInput(&input); + + UNIT_ASSERT(httpInput.HasContent()); + UNIT_ASSERT_VALUES_EQUAL(httpInput.ReadAll(), "HTTPDATA"); + } + } + + Y_UNIT_TEST(TestHttpInputHeadRequest) { + class THeadOnlyInput: public IInputStream { + public: + THeadOnlyInput() = default; + + private: + size_t DoRead(void* buf, size_t len) override { + if (Eof_) { + ythrow yexception() << "should not read after EOF"; + } + + const size_t toWrite = Min(len, Data_.size() - Pos_); + if (toWrite == 0) { + Eof_ = true; + return 0; + } + + memcpy(buf, Data_.data() + Pos_, toWrite); + Pos_ += toWrite; + return toWrite; + } + + private: + TString Data_{TStringBuf("HEAD / HTTP/1.1\r\nHost: yandex.ru\r\n\r\n")}; + size_t Pos_{0}; + bool Eof_{false}; + }; + THeadOnlyInput input; + THttpInput httpInput(&input); + + UNIT_ASSERT(!httpInput.HasContent()); + UNIT_ASSERT_VALUES_EQUAL(httpInput.ReadAll(), ""); + } + + Y_UNIT_TEST(TestHttpOutputResponseToHeadRequestNoZeroChunk) { + TStringStream request; + request << "HEAD / HTTP/1.1\r\n" + "Host: yandex.ru\r\n" + "Connection: Keep-Alive\r\n" + "\r\n"; + + TStringInput input(request.Str()); + THttpInput httpInput(&input); + + TStringStream outBuf; + THttpOutput out(&outBuf, &httpInput); + out.EnableKeepAlive(true); + out << "HTTP/1.1 200 OK\r\nConnection: Keep-Alive\r\n\r\n"; + out << ""; + out.Finish(); + TString result = outBuf.Str(); + UNIT_ASSERT(!result.Contains(TStringBuf("0\r\n"))); + } + + Y_UNIT_TEST(TestHttpOutputDisableCompressionHeader) { + TMemoryInput request("GET / HTTP/1.1\r\nAccept-Encoding: gzip\r\n\r\n"); + const TString data = "qqqqqqqqqqqqqqqqqqqqqqqqqqqqqq"; + + THttpInput httpInput(&request); + TString result; + + { + TStringOutput output(result); + THttpOutput httpOutput(&output, &httpInput); + httpOutput.EnableCompressionHeader(false); + httpOutput << "HTTP/1.1 200 OK\r\n" + "content-encoding: gzip\r\n" + "\r\n" + data; + httpOutput.Finish(); + } + + UNIT_ASSERT(result.Contains("content-encoding: gzip")); + UNIT_ASSERT(result.Contains(data)); + } + + size_t DoTestHttpOutputSize(const TString& res, bool enableCompession) { + TTestHttpServer serverImpl(res); + TPortManager pm; + + const ui16 port = pm.GetPort(); + THttpServer server(&serverImpl, + THttpServer::TOptions(port) + .EnableKeepAlive(true) + .EnableCompression(enableCompession)); + UNIT_ASSERT(server.Start()); + + TNetworkAddress addr("localhost", port); + TSocket s(addr); + + { + TSocketOutput so(s); + THttpOutput out(&so); + out << "GET / HTTP/1.1\r\n" + "Host: www.yandex.ru\r\n" + "Connection: Keep-Alive\r\n" + "Accept-Encoding: gzip\r\n" + "\r\n"; + out.Finish(); + } + + TSocketInput si(s); + THttpInput input(&si); + + unsigned httpCode = ParseHttpRetCode(input.FirstLine()); + UNIT_ASSERT_VALUES_EQUAL(httpCode, 200u); + + UNIT_ASSERT_VALUES_EQUAL(res, input.ReadAll()); + + server.Stop(); + + return serverImpl.LastRequestSentSize(); + } + + Y_UNIT_TEST(TestHttpOutputSize) { + TString res = "qqqqqq"; + UNIT_ASSERT_VALUES_EQUAL(res.size(), DoTestHttpOutputSize(res, false)); + UNIT_ASSERT_VALUES_UNEQUAL(res.size(), DoTestHttpOutputSize(res, true)); + } +} // THttpStreamTest suite diff --git a/library/cpp/http/io/stream_ut_medium.cpp b/library/cpp/http/io/stream_ut_medium.cpp new file mode 100644 index 0000000000..2c125eb21e --- /dev/null +++ b/library/cpp/http/io/stream_ut_medium.cpp @@ -0,0 +1,54 @@ +#include "stream.h" +#include <library/cpp/testing/unittest/registar.h> +#include <util/stream/zlib.h> + +Y_UNIT_TEST_SUITE(THttpTestMedium) { + Y_UNIT_TEST(TestCodings2) { + TStringBuf data = "aaaaaaaaaaaaaaaaaaaaaaa"; + + for (auto codec : SupportedCodings()) { + if (codec == TStringBuf("z-zlib-0")) { + continue; + } + + if (codec == TStringBuf("z-null")) { + continue; + } + + TString s; + + { + TStringOutput so(s); + THttpOutput ho(&so); + TBufferedOutput bo(&ho, 10000); + + bo << "HTTP/1.1 200 Ok\r\n" + << "Connection: close\r\n" + << "Content-Encoding: " << codec << "\r\n\r\n"; + + for (size_t i = 0; i < 100; ++i) { + bo << data; + } + } + + try { + UNIT_ASSERT(s.size() > 10); + UNIT_ASSERT(s.find(data) == TString::npos); + } catch (...) { + Cerr << codec << " " << s << Endl; + + throw; + } + + { + TStringInput si(s); + THttpInput hi(&si); + + auto res = hi.ReadAll(); + + UNIT_ASSERT(res.find(data) == 0); + } + } + } + +} // THttpTestMedium suite diff --git a/library/cpp/http/io/ut/medium/ya.make b/library/cpp/http/io/ut/medium/ya.make new file mode 100644 index 0000000000..235a23dcd7 --- /dev/null +++ b/library/cpp/http/io/ut/medium/ya.make @@ -0,0 +1,11 @@ +UNITTEST_FOR(library/cpp/http/io) + +SIZE(MEDIUM) + +OWNER(g:util) + +SRCS( + stream_ut_medium.cpp +) + +END() diff --git a/library/cpp/http/io/ut/ya.make b/library/cpp/http/io/ut/ya.make new file mode 100644 index 0000000000..84f6949db3 --- /dev/null +++ b/library/cpp/http/io/ut/ya.make @@ -0,0 +1,16 @@ +UNITTEST_FOR(library/cpp/http/io) + +OWNER(g:util) + +PEERDIR( + library/cpp/http/server +) + +SRCS( + chunk_ut.cpp + compression_ut.cpp + headers_ut.cpp + stream_ut.cpp +) + +END() diff --git a/library/cpp/http/io/ya.make b/library/cpp/http/io/ya.make new file mode 100644 index 0000000000..dcfbd79885 --- /dev/null +++ b/library/cpp/http/io/ya.make @@ -0,0 +1,22 @@ +LIBRARY() + +OWNER( + g:util + mvel +) + +PEERDIR( + library/cpp/blockcodecs + library/cpp/streams/brotli + library/cpp/streams/bzip2 + library/cpp/streams/lzma +) + +SRCS( + chunk.cpp + compression.cpp + headers.cpp + stream.cpp +) + +END() |