aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/http/io
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/http/io
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/http/io')
-rw-r--r--library/cpp/http/io/chunk.cpp246
-rw-r--r--library/cpp/http/io/chunk.h47
-rw-r--r--library/cpp/http/io/chunk_ut.cpp105
-rw-r--r--library/cpp/http/io/compression.cpp66
-rw-r--r--library/cpp/http/io/compression.h72
-rw-r--r--library/cpp/http/io/compression_ut.cpp60
-rw-r--r--library/cpp/http/io/fuzz/main.cpp15
-rw-r--r--library/cpp/http/io/fuzz/ya.make18
-rw-r--r--library/cpp/http/io/headers.cpp108
-rw-r--r--library/cpp/http/io/headers.h125
-rw-r--r--library/cpp/http/io/headers_ut.cpp176
-rw-r--r--library/cpp/http/io/list_codings/main.cpp8
-rw-r--r--library/cpp/http/io/list_codings/ya.make13
-rw-r--r--library/cpp/http/io/stream.cpp1005
-rw-r--r--library/cpp/http/io/stream.h178
-rw-r--r--library/cpp/http/io/stream_ut.cpp732
-rw-r--r--library/cpp/http/io/stream_ut_medium.cpp54
-rw-r--r--library/cpp/http/io/ut/medium/ya.make11
-rw-r--r--library/cpp/http/io/ut/ya.make16
-rw-r--r--library/cpp/http/io/ya.make22
20 files changed, 3077 insertions, 0 deletions
diff --git a/library/cpp/http/io/chunk.cpp b/library/cpp/http/io/chunk.cpp
new file mode 100644
index 0000000000..6975d9eac1
--- /dev/null
+++ b/library/cpp/http/io/chunk.cpp
@@ -0,0 +1,246 @@
+#include "chunk.h"
+
+#include "headers.h"
+
+#include <util/string/cast.h>
+#include <util/generic/utility.h>
+#include <util/generic/yexception.h>
+
+static inline size_t ParseHex(const TString& s) {
+ if (s.empty()) {
+ ythrow yexception() << "can not parse chunk length(empty string)";
+ }
+
+ size_t ret = 0;
+
+ for (TString::const_iterator c = s.begin(); c != s.end(); ++c) {
+ const char ch = *c;
+
+ if (ch >= '0' && ch <= '9') {
+ ret *= 16;
+ ret += ch - '0';
+ } else if (ch >= 'a' && ch <= 'f') {
+ ret *= 16;
+ ret += 10 + ch - 'a';
+ } else if (ch >= 'A' && ch <= 'F') {
+ ret *= 16;
+ ret += 10 + ch - 'A';
+ } else if (ch == ';') {
+ break;
+ } else if (isspace(ch)) {
+ continue;
+ } else {
+ ythrow yexception() << "can not parse chunk length(" << s.data() << ")";
+ }
+ }
+
+ return ret;
+}
+
+static inline char* ToHex(size_t len, char* buf) {
+ do {
+ const size_t val = len % 16;
+
+ *--buf = (val < 10) ? (val + '0') : (val - 10 + 'a');
+ len /= 16;
+ } while (len);
+
+ return buf;
+}
+
+class TChunkedInput::TImpl {
+public:
+ inline TImpl(IInputStream* slave, TMaybe<THttpHeaders>* trailers)
+ : Slave_(slave)
+ , Trailers_(trailers)
+ , Pending_(0)
+ , LastChunkReaded_(false)
+ {
+ if (Trailers_) {
+ Trailers_->Clear();
+ }
+ }
+
+ inline ~TImpl() {
+ }
+
+ inline size_t Read(void* buf, size_t len) {
+ return Perform(len, [this, buf](size_t toRead) { return Slave_->Read(buf, toRead); });
+ }
+
+ inline size_t Skip(size_t len) {
+ return Perform(len, [this](size_t toSkip) { return Slave_->Skip(toSkip); });
+ }
+
+private:
+ template <class Operation>
+ inline size_t Perform(size_t len, const Operation& operation) {
+ if (!HavePendingData()) {
+ return 0;
+ }
+
+ const size_t toProcess = Min(Pending_, len);
+
+ if (toProcess) {
+ const size_t processed = operation(toProcess);
+
+ if (!processed) {
+ ythrow yexception() << "malformed http chunk";
+ }
+
+ Pending_ -= processed;
+
+ return processed;
+ }
+
+ return 0;
+ }
+
+ inline bool HavePendingData() {
+ if (LastChunkReaded_) {
+ return false;
+ }
+
+ if (!Pending_) {
+ if (!ProceedToNextChunk()) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ inline bool ProceedToNextChunk() {
+ TString len(Slave_->ReadLine());
+
+ if (len.empty()) {
+ /*
+ * skip crlf from previous chunk
+ */
+
+ len = Slave_->ReadLine();
+ }
+
+ Pending_ = ParseHex(len);
+
+ if (Pending_) {
+ return true;
+ }
+
+ if (Trailers_) {
+ Trailers_->ConstructInPlace(Slave_);
+ }
+ LastChunkReaded_ = true;
+
+ return false;
+ }
+
+private:
+ IInputStream* Slave_;
+ TMaybe<THttpHeaders>* Trailers_;
+ size_t Pending_;
+ bool LastChunkReaded_;
+};
+
+TChunkedInput::TChunkedInput(IInputStream* slave, TMaybe<THttpHeaders>* trailers)
+ : Impl_(new TImpl(slave, trailers))
+{
+}
+
+TChunkedInput::~TChunkedInput() {
+}
+
+size_t TChunkedInput::DoRead(void* buf, size_t len) {
+ return Impl_->Read(buf, len);
+}
+
+size_t TChunkedInput::DoSkip(size_t len) {
+ return Impl_->Skip(len);
+}
+
+class TChunkedOutput::TImpl {
+ typedef IOutputStream::TPart TPart;
+
+public:
+ inline TImpl(IOutputStream* slave)
+ : Slave_(slave)
+ {
+ }
+
+ inline ~TImpl() {
+ }
+
+ inline void Write(const void* buf, size_t len) {
+ const char* ptr = (const char*)buf;
+
+ while (len) {
+ const size_t portion = Min<size_t>(len, 1024 * 16);
+
+ WriteImpl(ptr, portion);
+
+ ptr += portion;
+ len -= portion;
+ }
+ }
+
+ inline void WriteImpl(const void* buf, size_t len) {
+ char tmp[32];
+ char* e = tmp + sizeof(tmp);
+ char* b = ToHex(len, e);
+
+ const TPart parts[] = {
+ TPart(b, e - b),
+ TPart::CrLf(),
+ TPart(buf, len),
+ TPart::CrLf(),
+ };
+
+ Slave_->Write(parts, sizeof(parts) / sizeof(*parts));
+ }
+
+ inline void Flush() {
+ Slave_->Flush();
+ }
+
+ inline void Finish() {
+ Slave_->Write("0\r\n\r\n", 5);
+
+ Flush();
+ }
+
+private:
+ IOutputStream* Slave_;
+};
+
+TChunkedOutput::TChunkedOutput(IOutputStream* slave)
+ : Impl_(new TImpl(slave))
+{
+}
+
+TChunkedOutput::~TChunkedOutput() {
+ try {
+ Finish();
+ } catch (...) {
+ }
+}
+
+void TChunkedOutput::DoWrite(const void* buf, size_t len) {
+ if (Impl_.Get()) {
+ Impl_->Write(buf, len);
+ } else {
+ ythrow yexception() << "can not write to finished stream";
+ }
+}
+
+void TChunkedOutput::DoFlush() {
+ if (Impl_.Get()) {
+ Impl_->Flush();
+ }
+}
+
+void TChunkedOutput::DoFinish() {
+ if (Impl_.Get()) {
+ Impl_->Finish();
+ Impl_.Destroy();
+ }
+}
diff --git a/library/cpp/http/io/chunk.h b/library/cpp/http/io/chunk.h
new file mode 100644
index 0000000000..88d89fafda
--- /dev/null
+++ b/library/cpp/http/io/chunk.h
@@ -0,0 +1,47 @@
+#pragma once
+
+#include <util/stream/output.h>
+#include <util/generic/maybe.h>
+#include <util/generic/ptr.h>
+
+class THttpHeaders;
+
+/// @addtogroup Streams_Chunked
+/// @{
+/// Ввод данных порциями.
+/// @details Последовательное чтение блоков данных. Предполагается, что
+/// данные записаны в виде <длина блока><блок данных>.
+class TChunkedInput: public IInputStream {
+public:
+ /// Если передан указатель на trailers, то туда будут записаны HTTP trailer'ы (возможно пустые),
+ /// которые идут после чанков.
+ TChunkedInput(IInputStream* slave, TMaybe<THttpHeaders>* trailers = nullptr);
+ ~TChunkedInput() override;
+
+private:
+ size_t DoRead(void* buf, size_t len) override;
+ size_t DoSkip(size_t len) override;
+
+private:
+ class TImpl;
+ THolder<TImpl> Impl_;
+};
+
+/// Вывод данных порциями.
+/// @details Вывод данных блоками в виде <длина блока><блок данных>. Если объем
+/// данных превышает 64K, они записываются в виде n блоков по 64K + то, что осталось.
+class TChunkedOutput: public IOutputStream {
+public:
+ TChunkedOutput(IOutputStream* slave);
+ ~TChunkedOutput() override;
+
+private:
+ void DoWrite(const void* buf, size_t len) override;
+ void DoFlush() override;
+ void DoFinish() override;
+
+private:
+ class TImpl;
+ THolder<TImpl> Impl_;
+};
+/// @}
diff --git a/library/cpp/http/io/chunk_ut.cpp b/library/cpp/http/io/chunk_ut.cpp
new file mode 100644
index 0000000000..da283f8568
--- /dev/null
+++ b/library/cpp/http/io/chunk_ut.cpp
@@ -0,0 +1,105 @@
+#include "chunk.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/stream/file.h>
+#include <util/system/tempfile.h>
+#include <util/stream/null.h>
+
+#define CDATA "./chunkedio"
+
+Y_UNIT_TEST_SUITE(TestChunkedIO) {
+ static const char test_data[] = "87s6cfbsudg cuisg s igasidftasiy tfrcua6s";
+
+ TString CombString(const TString& s, size_t chunkSize) {
+ TString result;
+ for (size_t pos = 0; pos < s.size(); pos += 2 * chunkSize)
+ result += s.substr(pos, chunkSize);
+ return result;
+ }
+
+ void WriteTestData(IOutputStream * stream, TString * contents) {
+ contents->clear();
+ for (size_t i = 0; i < sizeof(test_data); ++i) {
+ stream->Write(test_data, i);
+ contents->append(test_data, i);
+ }
+ }
+
+ void ReadInSmallChunks(IInputStream * stream, TString * contents) {
+ char buf[11];
+ size_t read = 0;
+
+ contents->clear();
+ do {
+ read = stream->Read(buf, sizeof(buf));
+ contents->append(buf, read);
+ } while (read > 0);
+ }
+
+ void ReadCombed(IInputStream * stream, TString * contents, size_t chunkSize) {
+ Y_ASSERT(chunkSize < 128);
+ char buf[128];
+
+ contents->clear();
+ while (true) {
+ size_t read = stream->Load(buf, chunkSize);
+ contents->append(buf, read);
+ if (read == 0)
+ break;
+
+ size_t toSkip = chunkSize;
+ size_t skipped = 0;
+ do {
+ skipped = stream->Skip(toSkip);
+ toSkip -= skipped;
+ } while (skipped != 0 && toSkip != 0);
+ }
+ }
+
+ Y_UNIT_TEST(TestChunkedIo) {
+ TTempFile tmpFile(CDATA);
+ TString tmp;
+
+ {
+ TUnbufferedFileOutput fo(CDATA);
+ TChunkedOutput co(&fo);
+ WriteTestData(&co, &tmp);
+ }
+
+ {
+ TUnbufferedFileInput fi(CDATA);
+ TChunkedInput ci(&fi);
+ TString r;
+
+ ReadInSmallChunks(&ci, &r);
+
+ UNIT_ASSERT_EQUAL(r, tmp);
+ }
+
+ {
+ TUnbufferedFileInput fi(CDATA);
+ TChunkedInput ci(&fi);
+ TString r;
+
+ ReadCombed(&ci, &r, 11);
+
+ UNIT_ASSERT_EQUAL(r, CombString(tmp, 11));
+ }
+ }
+
+ Y_UNIT_TEST(TestBadChunk) {
+ bool hasError = false;
+
+ try {
+ TString badChunk = "10\r\nqwerty";
+ TMemoryInput mi(badChunk.data(), badChunk.size());
+ TChunkedInput ci(&mi);
+ TransferData(&ci, &Cnull);
+ } catch (...) {
+ hasError = true;
+ }
+
+ UNIT_ASSERT(hasError);
+ }
+}
diff --git a/library/cpp/http/io/compression.cpp b/library/cpp/http/io/compression.cpp
new file mode 100644
index 0000000000..8fa1f62ae6
--- /dev/null
+++ b/library/cpp/http/io/compression.cpp
@@ -0,0 +1,66 @@
+#include "compression.h"
+
+#if defined(ENABLE_GPL)
+#include <library/cpp/streams/lz/lz.h>
+#endif
+
+#include <library/cpp/streams/brotli/brotli.h>
+#include <library/cpp/streams/lzma/lzma.h>
+#include <library/cpp/streams/bzip2/bzip2.h>
+
+#include <library/cpp/blockcodecs/stream.h>
+#include <library/cpp/blockcodecs/codecs.h>
+
+#include <util/stream/zlib.h>
+
+
+TCompressionCodecFactory::TCompressionCodecFactory() {
+ auto gzip = [](auto s) {
+ return MakeHolder<TZLibDecompress>(s);
+ };
+
+ Add("gzip", gzip, [](auto s) { return MakeHolder<TZLibCompress>(s, ZLib::GZip); });
+ Add("deflate", gzip, [](auto s) { return MakeHolder<TZLibCompress>(s, ZLib::ZLib); });
+ Add("br", [](auto s) { return MakeHolder<TBrotliDecompress>(s); }, [](auto s) { return MakeHolder<TBrotliCompress>(s, 4); });
+ Add("x-gzip", gzip, [](auto s) { return MakeHolder<TZLibCompress>(s, ZLib::GZip); });
+ Add("x-deflate", gzip, [](auto s) { return MakeHolder<TZLibCompress>(s, ZLib::ZLib); });
+
+#if defined(ENABLE_GPL)
+ const ui16 bs = 32 * 1024;
+
+ Add("y-lzo", [](auto s) { return MakeHolder<TLzoDecompress>(s); }, [bs](auto s) { return MakeHolder<TLazy<TLzoCompress> >(s, bs); });
+ Add("y-lzf", [](auto s) { return MakeHolder<TLzfDecompress>(s); }, [bs](auto s) { return MakeHolder<TLazy<TLzfCompress> >(s, bs); });
+ Add("y-lzq", [](auto s) { return MakeHolder<TLzqDecompress>(s); }, [bs](auto s) { return MakeHolder<TLazy<TLzqCompress> >(s, bs); });
+#endif
+
+ Add("y-bzip2", [](auto s) { return MakeHolder<TBZipDecompress>(s); }, [](auto s) { return MakeHolder<TBZipCompress>(s); });
+ Add("y-lzma", [](auto s) { return MakeHolder<TLzmaDecompress>(s); }, [](auto s) { return MakeHolder<TLzmaCompress>(s); });
+
+ for (auto codecName : NBlockCodecs::ListAllCodecs()) {
+ if (codecName.StartsWith("zstd06")) {
+ continue;
+ }
+
+ if (codecName.StartsWith("zstd08")) {
+ continue;
+ }
+
+ auto codec = NBlockCodecs::Codec(codecName);
+
+ auto enc = [codec](auto s) {
+ return MakeHolder<NBlockCodecs::TCodedOutput>(s, codec, 32 * 1024);
+ };
+
+ auto dec = [codec](auto s) {
+ return MakeHolder<NBlockCodecs::TDecodedInput>(s, codec);
+ };
+
+ Add(TString("z-") + codecName, dec, enc);
+ }
+}
+
+void TCompressionCodecFactory::Add(TStringBuf name, TDecoderConstructor d, TEncoderConstructor e) {
+ Strings_.emplace_back(name);
+ Codecs_[Strings_.back()] = TCodec{d, e};
+ BestCodecs_.emplace_back(Strings_.back());
+}
diff --git a/library/cpp/http/io/compression.h b/library/cpp/http/io/compression.h
new file mode 100644
index 0000000000..f16c4a18eb
--- /dev/null
+++ b/library/cpp/http/io/compression.h
@@ -0,0 +1,72 @@
+#pragma once
+
+#include "stream.h"
+
+#include <util/generic/deque.h>
+#include <util/generic/hash.h>
+
+class TCompressionCodecFactory {
+public:
+ using TDecoderConstructor = std::function<THolder<IInputStream>(IInputStream*)>;
+ using TEncoderConstructor = std::function<THolder<IOutputStream>(IOutputStream*)>;
+
+ TCompressionCodecFactory();
+
+ static inline TCompressionCodecFactory& Instance() noexcept {
+ return *SingletonWithPriority<TCompressionCodecFactory, 0>();
+ }
+
+ inline const TDecoderConstructor* FindDecoder(TStringBuf name) const {
+ if (auto codec = Codecs_.FindPtr(name)) {
+ return &codec->Decoder;
+ }
+
+ return nullptr;
+ }
+
+ inline const TEncoderConstructor* FindEncoder(TStringBuf name) const {
+ if (auto codec = Codecs_.FindPtr(name)) {
+ return &codec->Encoder;
+ }
+
+ return nullptr;
+ }
+
+ inline TArrayRef<const TStringBuf> GetBestCodecs() const {
+ return BestCodecs_;
+ }
+
+private:
+ void Add(TStringBuf name, TDecoderConstructor d, TEncoderConstructor e);
+
+ struct TCodec {
+ TDecoderConstructor Decoder;
+ TEncoderConstructor Encoder;
+ };
+
+ TDeque<TString> Strings_;
+ THashMap<TStringBuf, TCodec> Codecs_;
+ TVector<TStringBuf> BestCodecs_;
+};
+
+namespace NHttp {
+ template <typename F>
+ TString ChooseBestCompressionScheme(F accepted, TArrayRef<const TStringBuf> available) {
+ if (available.empty()) {
+ return "identity";
+ }
+
+ if (accepted("*")) {
+ return TString(available[0]);
+ }
+
+ for (const auto& coding : available) {
+ TString s(coding);
+ if (accepted(s)) {
+ return s;
+ }
+ }
+
+ return "identity";
+ }
+}
diff --git a/library/cpp/http/io/compression_ut.cpp b/library/cpp/http/io/compression_ut.cpp
new file mode 100644
index 0000000000..2f3d131f8c
--- /dev/null
+++ b/library/cpp/http/io/compression_ut.cpp
@@ -0,0 +1,60 @@
+#include "stream.h"
+#include "compression.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+#include <library/cpp/testing/unittest/tests_data.h>
+
+#include <util/stream/zlib.h>
+#include <util/generic/hash_set.h>
+
+Y_UNIT_TEST_SUITE(THttpCompressionTest) {
+ static const TString DATA = "I'm a teapot";
+
+ Y_UNIT_TEST(TestGetBestCodecs) {
+ UNIT_ASSERT(TCompressionCodecFactory::Instance().GetBestCodecs().size() > 0);
+ }
+
+ Y_UNIT_TEST(TestEncoder) {
+ TStringStream buffer;
+
+ {
+ auto encoder = TCompressionCodecFactory::Instance().FindEncoder("gzip");
+ UNIT_ASSERT(encoder);
+
+ auto encodedStream = (*encoder)(&buffer);
+ encodedStream->Write(DATA);
+ }
+
+ TZLibDecompress decompressor(&buffer);
+ UNIT_ASSERT_EQUAL(decompressor.ReadAll(), DATA);
+ }
+
+ Y_UNIT_TEST(TestDecoder) {
+ TStringStream buffer;
+
+ {
+ TZLibCompress compressor(TZLibCompress::TParams(&buffer).SetType(ZLib::GZip));
+ compressor.Write(DATA);
+ }
+
+ auto decoder = TCompressionCodecFactory::Instance().FindDecoder("gzip");
+ UNIT_ASSERT(decoder);
+
+ auto decodedStream = (*decoder)(&buffer);
+ UNIT_ASSERT_EQUAL(decodedStream->ReadAll(), DATA);
+ }
+
+ Y_UNIT_TEST(TestChooseBestCompressionScheme) {
+ THashSet<TString> accepted;
+
+ auto checkAccepted = [&accepted](const TString& v) {
+ return accepted.contains(v);
+ };
+
+ UNIT_ASSERT_VALUES_EQUAL("identity", NHttp::ChooseBestCompressionScheme(checkAccepted, {"gzip", "deflate"}));
+ accepted.insert("deflate");
+ UNIT_ASSERT_VALUES_EQUAL("deflate", NHttp::ChooseBestCompressionScheme(checkAccepted, {"gzip", "deflate"}));
+ accepted.insert("*");
+ UNIT_ASSERT_VALUES_EQUAL("gzip", NHttp::ChooseBestCompressionScheme(checkAccepted, {"gzip", "deflate"}));
+ }
+} // THttpCompressionTest suite
diff --git a/library/cpp/http/io/fuzz/main.cpp b/library/cpp/http/io/fuzz/main.cpp
new file mode 100644
index 0000000000..8ded9c7e32
--- /dev/null
+++ b/library/cpp/http/io/fuzz/main.cpp
@@ -0,0 +1,15 @@
+#include <library/cpp/http/io/stream.h>
+
+#include <util/generic/vector.h>
+#include <util/stream/mem.h>
+
+extern "C" int LLVMFuzzerTestOneInput(const ui8* data, size_t size) {
+ TMemoryInput mi(data, size);
+
+ try {
+ THttpInput(&mi).ReadAll();
+ } catch (...) {
+ }
+
+ return 0; // Non-zero return values are reserved for future use.
+}
diff --git a/library/cpp/http/io/fuzz/ya.make b/library/cpp/http/io/fuzz/ya.make
new file mode 100644
index 0000000000..8b3ccb1969
--- /dev/null
+++ b/library/cpp/http/io/fuzz/ya.make
@@ -0,0 +1,18 @@
+FUZZ()
+
+OWNER(
+ pg
+ g:util
+)
+
+PEERDIR(
+ library/cpp/http/io
+)
+
+SIZE(MEDIUM)
+
+SRCS(
+ main.cpp
+)
+
+END()
diff --git a/library/cpp/http/io/headers.cpp b/library/cpp/http/io/headers.cpp
new file mode 100644
index 0000000000..4ec27a29e8
--- /dev/null
+++ b/library/cpp/http/io/headers.cpp
@@ -0,0 +1,108 @@
+#include "headers.h"
+#include "stream.h"
+
+#include <util/generic/strbuf.h>
+#include <util/generic/yexception.h>
+#include <util/stream/output.h>
+#include <util/string/ascii.h>
+#include <util/string/cast.h>
+#include <util/string/strip.h>
+
+static inline TStringBuf Trim(const char* b, const char* e) noexcept {
+ return StripString(TStringBuf(b, e));
+}
+
+THttpInputHeader::THttpInputHeader(const TStringBuf header) {
+ size_t pos = header.find(':');
+
+ if (pos == TString::npos) {
+ ythrow THttpParseException() << "can not parse http header(" << TString{header}.Quote() << ")";
+ }
+
+ Name_ = TString(header.cbegin(), header.cbegin() + pos);
+ Value_ = ::ToString(Trim(header.cbegin() + pos + 1, header.cend()));
+}
+
+THttpInputHeader::THttpInputHeader(TString name, TString value)
+ : Name_(std::move(name))
+ , Value_(std::move(value))
+{
+}
+
+void THttpInputHeader::OutTo(IOutputStream* stream) const {
+ typedef IOutputStream::TPart TPart;
+
+ const TPart parts[] = {
+ TPart(Name_),
+ TPart(": ", 2),
+ TPart(Value_),
+ TPart::CrLf(),
+ };
+
+ stream->Write(parts, sizeof(parts) / sizeof(*parts));
+}
+
+THttpHeaders::THttpHeaders(IInputStream* stream) {
+ TString header;
+ TString line;
+
+ bool rdOk = stream->ReadLine(header);
+ while (rdOk && !header.empty()) {
+ rdOk = stream->ReadLine(line);
+
+ if (rdOk && ((line[0] == ' ') || (line[0] == '\t'))) {
+ header += line;
+ } else {
+ AddHeader(THttpInputHeader(header));
+ header = line;
+ }
+ }
+}
+
+bool THttpHeaders::HasHeader(const TStringBuf header) const {
+ return FindHeader(header);
+}
+
+const THttpInputHeader* THttpHeaders::FindHeader(const TStringBuf header) const {
+ for (const auto& hdr : Headers_) {
+ if (AsciiCompareIgnoreCase(hdr.Name(), header) == 0) {
+ return &hdr;
+ }
+ }
+ return nullptr;
+}
+
+void THttpHeaders::RemoveHeader(const TStringBuf header) {
+ for (auto h = Headers_.begin(); h != Headers_.end(); ++h) {
+ if (AsciiCompareIgnoreCase(h->Name(), header) == 0) {
+ Headers_.erase(h);
+ return;
+ }
+ }
+}
+
+void THttpHeaders::AddOrReplaceHeader(const THttpInputHeader& header) {
+ for (auto& hdr : Headers_) {
+ if (AsciiCompareIgnoreCase(hdr.Name(), header.Name()) == 0) {
+ hdr = header;
+ return;
+ }
+ }
+
+ AddHeader(header);
+}
+
+void THttpHeaders::AddHeader(THttpInputHeader header) {
+ Headers_.push_back(std::move(header));
+}
+
+void THttpHeaders::OutTo(IOutputStream* stream) const {
+ for (TConstIterator header = Begin(); header != End(); ++header) {
+ header->OutTo(stream);
+ }
+}
+
+template <>
+void Out<THttpHeaders>(IOutputStream& out, const THttpHeaders& h) {
+ h.OutTo(&out);
+}
diff --git a/library/cpp/http/io/headers.h b/library/cpp/http/io/headers.h
new file mode 100644
index 0000000000..a71793d1c6
--- /dev/null
+++ b/library/cpp/http/io/headers.h
@@ -0,0 +1,125 @@
+#pragma once
+
+#include <util/generic/string.h>
+#include <util/generic/strbuf.h>
+#include <util/generic/deque.h>
+#include <util/generic/vector.h>
+#include <util/string/cast.h>
+
+class IInputStream;
+class IOutputStream;
+
+/// @addtogroup Streams_HTTP
+/// @{
+/// Объект, содержащий информацию о HTTP-заголовке.
+class THttpInputHeader {
+public:
+ /// @param[in] header - строка вида 'параметр: значение'.
+ THttpInputHeader(TStringBuf header);
+ /// @param[in] name - имя параметра.
+ /// @param[in] value - значение параметра.
+ THttpInputHeader(TString name, TString value);
+
+ /// Возвращает имя параметра.
+ inline const TString& Name() const noexcept {
+ return Name_;
+ }
+
+ /// Возвращает значение параметра.
+ inline const TString& Value() const noexcept {
+ return Value_;
+ }
+
+ /// Записывает заголовок вида "имя параметра: значение\r\n" в поток.
+ void OutTo(IOutputStream* stream) const;
+
+ /// Возвращает строку "имя параметра: значение".
+ inline TString ToString() const {
+ return Name_ + TStringBuf(": ") + Value_;
+ }
+
+private:
+ TString Name_;
+ TString Value_;
+};
+
+/// Контейнер для хранения HTTP-заголовков
+class THttpHeaders {
+ using THeaders = TDeque<THttpInputHeader>;
+
+public:
+ using TConstIterator = THeaders::const_iterator;
+
+ THttpHeaders() = default;
+
+ /// Добавляет каждую строку из потока в контейнер, считая ее правильным заголовком.
+ THttpHeaders(IInputStream* stream);
+
+ /// Стандартный итератор.
+ inline TConstIterator Begin() const noexcept {
+ return Headers_.begin();
+ }
+ inline TConstIterator begin() const noexcept {
+ return Headers_.begin();
+ }
+
+ /// Стандартный итератор.
+ inline TConstIterator End() const noexcept {
+ return Headers_.end();
+ }
+ inline TConstIterator end() const noexcept {
+ return Headers_.end();
+ }
+
+ /// Возвращает количество заголовков в контейнере.
+ inline size_t Count() const noexcept {
+ return Headers_.size();
+ }
+
+ /// Проверяет, содержит ли контейнер хотя бы один заголовок.
+ inline bool Empty() const noexcept {
+ return Headers_.empty();
+ }
+
+ /// Добавляет заголовок в контейнер.
+ void AddHeader(THttpInputHeader header);
+
+ template <typename ValueType>
+ void AddHeader(TString name, const ValueType& value) {
+ AddHeader(THttpInputHeader(std::move(name), ToString(value)));
+ }
+
+ /// Добавляет заголовок в контейнер, если тот не содержит заголовка
+ /// c таким же параметром. В противном случае, заменяет существующий
+ /// заголовок на новый.
+ void AddOrReplaceHeader(const THttpInputHeader& header);
+
+ template <typename ValueType>
+ void AddOrReplaceHeader(TString name, const ValueType& value) {
+ AddOrReplaceHeader(THttpInputHeader(std::move(name), ToString(value)));
+ }
+
+ // Проверяет, есть ли такой заголовок
+ bool HasHeader(TStringBuf header) const;
+
+ /// Удаляет заголовок, если он есть.
+ void RemoveHeader(TStringBuf header);
+
+ /// Ищет заголовок по указанному имени
+ /// Возвращает nullptr, если не нашел
+ const THttpInputHeader* FindHeader(TStringBuf header) const;
+
+ /// Записывает все заголовки контейнера в поток.
+ /// @details Каждый заголовк записывается в виде "имя параметра: значение\r\n".
+ void OutTo(IOutputStream* stream) const;
+
+ /// Обменивает наборы заголовков двух контейнеров.
+ void Swap(THttpHeaders& headers) noexcept {
+ Headers_.swap(headers.Headers_);
+ }
+
+private:
+ THeaders Headers_;
+};
+
+/// @}
diff --git a/library/cpp/http/io/headers_ut.cpp b/library/cpp/http/io/headers_ut.cpp
new file mode 100644
index 0000000000..1d23ef8fdc
--- /dev/null
+++ b/library/cpp/http/io/headers_ut.cpp
@@ -0,0 +1,176 @@
+#include <util/generic/set.h>
+#include <util/generic/string.h>
+#include <util/generic/strbuf.h>
+#include <utility>
+
+#include <library/cpp/http/io/headers.h>
+#include <library/cpp/testing/unittest/registar.h>
+
+namespace {
+ class THeadersExistence {
+ public:
+ THeadersExistence() = default;
+
+ THeadersExistence(const THttpHeaders& headers) {
+ for (THttpHeaders::TConstIterator it = headers.Begin();
+ it != headers.End();
+ ++it) {
+ Add(it->Name(), it->Value());
+ }
+ }
+
+ public:
+ void Add(TStringBuf name, TStringBuf value) {
+ Impl.emplace(TString(name), TString(value));
+ }
+
+ bool operator==(const THeadersExistence& rhs) const {
+ return Impl == rhs.Impl;
+ }
+
+ private:
+ typedef TMultiSet<std::pair<TString, TString>> TImpl;
+ TImpl Impl;
+ };
+}
+
+bool operator==(const THeadersExistence& lhs, const THttpHeaders& rhs) {
+ return lhs == THeadersExistence(rhs);
+}
+
+bool operator==(const THttpHeaders& lhs, const THeadersExistence& rhs) {
+ return THeadersExistence(lhs) == rhs;
+}
+
+class THttpHeadersTest: public TTestBase {
+ UNIT_TEST_SUITE(THttpHeadersTest);
+ UNIT_TEST(TestAddOperation1Arg);
+ UNIT_TEST(TestAddOperation2Args);
+ UNIT_TEST(TestAddOrReplaceOperation1Arg);
+ UNIT_TEST(TestAddOrReplaceOperation2Args);
+ UNIT_TEST(TestAddHeaderTemplateness);
+ UNIT_TEST(TestFindHeader);
+ UNIT_TEST_SUITE_END();
+
+private:
+ typedef void (*TAddHeaderFunction)(THttpHeaders&, TStringBuf name, TStringBuf value);
+ typedef void (*TAddOrReplaceHeaderFunction)(THttpHeaders&, TStringBuf name, TStringBuf value);
+
+public:
+ void TestAddOperation1Arg();
+ void TestAddOperation2Args();
+ void TestAddOrReplaceOperation1Arg();
+ void TestAddOrReplaceOperation2Args();
+ void TestAddHeaderTemplateness();
+ void TestFindHeader();
+
+private:
+ static void AddHeaderImpl1Arg(THttpHeaders& headers, TStringBuf name, TStringBuf value) {
+ headers.AddHeader(THttpInputHeader(TString(name), TString(value)));
+ }
+
+ static void AddHeaderImpl2Args(THttpHeaders& headers, TStringBuf name, TStringBuf value) {
+ headers.AddHeader(TString(name), TString(value));
+ }
+
+ static void AddOrReplaceHeaderImpl1Arg(THttpHeaders& headers, TStringBuf name, TStringBuf value) {
+ headers.AddOrReplaceHeader(THttpInputHeader(TString(name), TString(value)));
+ }
+
+ static void AddOrReplaceHeaderImpl2Args(THttpHeaders& headers, TStringBuf name, TStringBuf value) {
+ headers.AddOrReplaceHeader(TString(name), TString(value));
+ }
+
+ void DoTestAddOperation(TAddHeaderFunction);
+ void DoTestAddOrReplaceOperation(TAddHeaderFunction, TAddOrReplaceHeaderFunction);
+};
+
+UNIT_TEST_SUITE_REGISTRATION(THttpHeadersTest);
+
+void THttpHeadersTest::TestAddOperation1Arg() {
+ DoTestAddOperation(AddHeaderImpl1Arg);
+}
+void THttpHeadersTest::TestAddOperation2Args() {
+ DoTestAddOperation(AddHeaderImpl2Args);
+}
+
+void THttpHeadersTest::TestAddOrReplaceOperation1Arg() {
+ DoTestAddOrReplaceOperation(AddHeaderImpl1Arg, AddOrReplaceHeaderImpl1Arg);
+}
+void THttpHeadersTest::TestAddOrReplaceOperation2Args() {
+ DoTestAddOrReplaceOperation(AddHeaderImpl2Args, AddOrReplaceHeaderImpl2Args);
+}
+
+void THttpHeadersTest::DoTestAddOperation(TAddHeaderFunction addHeader) {
+ THttpHeaders h1;
+
+ addHeader(h1, "h1", "v1");
+ addHeader(h1, "h2", "v1");
+
+ addHeader(h1, "h3", "v1");
+ addHeader(h1, "h3", "v2");
+ addHeader(h1, "h3", "v2");
+
+ THeadersExistence h2;
+
+ h2.Add("h1", "v1");
+ h2.Add("h2", "v1");
+
+ h2.Add("h3", "v1");
+ h2.Add("h3", "v2");
+ h2.Add("h3", "v2");
+
+ UNIT_ASSERT(h2 == h1);
+}
+
+// Sorry, but AddOrReplaceHeader replaces only first occurence
+void THttpHeadersTest::DoTestAddOrReplaceOperation(TAddHeaderFunction addHeader, TAddOrReplaceHeaderFunction addOrReplaceHeader) {
+ THttpHeaders h1;
+
+ addHeader(h1, "h1", "v1");
+
+ addOrReplaceHeader(h1, "h2", "v1");
+ addOrReplaceHeader(h1, "h2", "v2");
+ addOrReplaceHeader(h1, "h2", "v3");
+ addHeader(h1, "h2", "v4");
+
+ addHeader(h1, "h3", "v1");
+ addHeader(h1, "h3", "v2");
+ addOrReplaceHeader(h1, "h3", "v3");
+
+ THeadersExistence h2;
+
+ h2.Add("h1", "v1");
+
+ h2.Add("h2", "v3");
+ h2.Add("h2", "v4");
+
+ h2.Add("h3", "v2");
+ h2.Add("h3", "v3");
+
+ UNIT_ASSERT(h2 == h1);
+}
+
+void THttpHeadersTest::TestAddHeaderTemplateness() {
+ THttpHeaders h1;
+ h1.AddHeader("h1", "v1");
+ h1.AddHeader("h2", TString("v2"));
+ h1.AddHeader("h3", TStringBuf("v3"));
+ h1.AddHeader("h4", TStringBuf("v4"));
+
+ THeadersExistence h2;
+ h2.Add("h1", "v1");
+ h2.Add("h2", "v2");
+ h2.Add("h3", "v3");
+ h2.Add("h4", "v4");
+
+ UNIT_ASSERT(h1 == h2);
+}
+
+void THttpHeadersTest::TestFindHeader() {
+ THttpHeaders sut;
+ sut.AddHeader("NaMe", "Value");
+
+ UNIT_ASSERT(sut.FindHeader("name"));
+ UNIT_ASSERT(sut.FindHeader("name")->Value() == "Value");
+}
diff --git a/library/cpp/http/io/list_codings/main.cpp b/library/cpp/http/io/list_codings/main.cpp
new file mode 100644
index 0000000000..9818d02bdf
--- /dev/null
+++ b/library/cpp/http/io/list_codings/main.cpp
@@ -0,0 +1,8 @@
+#include <library/cpp/http/io/stream.h>
+#include <util/stream/output.h>
+
+int main() {
+ for (auto codec : SupportedCodings()) {
+ Cout << codec << Endl;
+ }
+}
diff --git a/library/cpp/http/io/list_codings/ya.make b/library/cpp/http/io/list_codings/ya.make
new file mode 100644
index 0000000000..e5c5fed6dc
--- /dev/null
+++ b/library/cpp/http/io/list_codings/ya.make
@@ -0,0 +1,13 @@
+PROGRAM()
+
+OWNER(pg)
+
+PEERDIR(
+ library/cpp/http/io
+)
+
+SRCS(
+ main.cpp
+)
+
+END()
diff --git a/library/cpp/http/io/stream.cpp b/library/cpp/http/io/stream.cpp
new file mode 100644
index 0000000000..6689be684f
--- /dev/null
+++ b/library/cpp/http/io/stream.cpp
@@ -0,0 +1,1005 @@
+#include "stream.h"
+
+#include "compression.h"
+#include "chunk.h"
+
+#include <util/stream/buffered.h>
+#include <util/stream/length.h>
+#include <util/stream/multi.h>
+#include <util/stream/null.h>
+#include <util/stream/tee.h>
+
+#include <util/system/compat.h>
+#include <util/system/yassert.h>
+
+#include <util/network/socket.h>
+
+#include <util/string/cast.h>
+#include <util/string/strip.h>
+
+#include <util/generic/string.h>
+#include <util/generic/utility.h>
+#include <util/generic/hash_set.h>
+#include <util/generic/yexception.h>
+
+#define HEADERCMP(header, str) \
+ case sizeof(str) - 1: \
+ if (!stricmp((header).Name().data(), str))
+
+namespace {
+ inline size_t SuggestBufferSize() {
+ return 8192;
+ }
+
+ inline TStringBuf Trim(const char* b, const char* e) noexcept {
+ return StripString(TStringBuf(b, e));
+ }
+
+ inline TStringBuf RmSemiColon(const TStringBuf& s) {
+ return s.Before(';');
+ }
+
+ template <class T, size_t N>
+ class TStreams: private TNonCopyable {
+ struct TDelete {
+ inline void operator()(T* t) noexcept {
+ delete t;
+ }
+ };
+
+ typedef T* TPtr;
+
+ public:
+ inline TStreams() noexcept
+ : Beg_(T_ + N)
+ {
+ }
+
+ inline ~TStreams() {
+ TDelete f;
+
+ ForEach(f);
+ }
+
+ template <class S>
+ inline S* Add(S* t) noexcept {
+ return (S*)AddImpl((T*)t);
+ }
+
+ template <class Functor>
+ inline void ForEach(Functor& f) {
+ const TPtr* end = T_ + N;
+
+ for (TPtr* cur = Beg_; cur != end; ++cur) {
+ f(*cur);
+ }
+ }
+
+ TPtr Top() {
+ const TPtr* end = T_ + N;
+ return end == Beg_ ? nullptr : *Beg_;
+ }
+
+ private:
+ inline T* AddImpl(T* t) noexcept {
+ Y_ASSERT(Beg_ > T_);
+
+ return (*--Beg_ = t);
+ }
+
+ private:
+ TPtr T_[N];
+ TPtr* Beg_;
+ };
+
+ template <class TStream>
+ class TLazy: public IOutputStream {
+ public:
+ TLazy(IOutputStream* out, ui16 bs)
+ : Output_(out)
+ , BlockSize_(bs)
+ {
+ }
+
+ void DoWrite(const void* buf, size_t len) override {
+ ConstructSlave();
+ Slave_->Write(buf, len);
+ }
+
+ void DoFlush() override {
+ ConstructSlave();
+ Slave_->Flush();
+ }
+
+ void DoFinish() override {
+ ConstructSlave();
+ Slave_->Finish();
+ }
+
+ private:
+ inline void ConstructSlave() {
+ if (!Slave_) {
+ Slave_.Reset(new TStream(Output_, BlockSize_));
+ }
+ }
+
+ private:
+ IOutputStream* Output_;
+ ui16 BlockSize_;
+ THolder<IOutputStream> Slave_;
+ };
+}
+
+class THttpInput::TImpl {
+ typedef THashSet<TString> TAcceptCodings;
+
+public:
+ inline TImpl(IInputStream* slave)
+ : Slave_(slave)
+ , Buffered_(Slave_, SuggestBufferSize())
+ , ChunkedInput_(nullptr)
+ , Input_(nullptr)
+ , FirstLine_(ReadFirstLine(Buffered_))
+ , Headers_(&Buffered_)
+ , KeepAlive_(false)
+ , HasContentLength_(false)
+ , ContentLength_(0)
+ , ContentEncoded_(false)
+ , Expect100Continue_(false)
+ {
+ BuildInputChain();
+ Y_ASSERT(Input_);
+ }
+
+ static TString ReadFirstLine(TBufferedInput& in) {
+ TString s;
+ Y_ENSURE_EX(in.ReadLine(s), THttpReadException() << "Failed to get first line");
+ return s;
+ }
+
+ inline ~TImpl() {
+ }
+
+ inline size_t Read(void* buf, size_t len) {
+ return Perform(len, [this, buf](size_t toRead) { return Input_->Read(buf, toRead); });
+ }
+
+ inline size_t Skip(size_t len) {
+ return Perform(len, [this](size_t toSkip) { return Input_->Skip(toSkip); });
+ }
+
+ inline const TString& FirstLine() const noexcept {
+ return FirstLine_;
+ }
+
+ inline const THttpHeaders& Headers() const noexcept {
+ return Headers_;
+ }
+
+ inline const TMaybe<THttpHeaders>& Trailers() const noexcept {
+ return Trailers_;
+ }
+
+ inline bool IsKeepAlive() const noexcept {
+ return KeepAlive_;
+ }
+
+ inline bool AcceptEncoding(const TString& s) const {
+ return Codings_.find(to_lower(s)) != Codings_.end();
+ }
+
+ inline bool GetContentLength(ui64& value) const noexcept {
+ if (HasContentLength_) {
+ value = ContentLength_;
+ return true;
+ }
+ return false;
+ }
+
+ inline bool ContentEncoded() const noexcept {
+ return ContentEncoded_;
+ }
+
+ inline bool HasContent() const noexcept {
+ return HasContentLength_ || ChunkedInput_;
+ }
+
+ inline bool HasExpect100Continue() const noexcept {
+ return Expect100Continue_;
+ }
+
+private:
+ template <class Operation>
+ inline size_t Perform(size_t len, const Operation& operation) {
+ size_t processed = operation(len);
+ if (processed == 0 && len > 0) {
+ if (!ChunkedInput_) {
+ Trailers_.ConstructInPlace();
+ } else {
+ // Read the header of the trailing chunk. It remains in
+ // the TChunkedInput stream if the HTTP response is compressed.
+ char symbol;
+ if (ChunkedInput_->Read(&symbol, 1) != 0) {
+ ythrow THttpParseException() << "some data remaining in TChunkedInput";
+ }
+ }
+ }
+ return processed;
+ }
+
+ struct TParsedHeaders {
+ bool Chunked = false;
+ bool KeepAlive = false;
+ TStringBuf LZipped;
+ };
+
+ struct TTrEnc {
+ inline void operator()(const TStringBuf& s) {
+ if (s == TStringBuf("chunked")) {
+ p->Chunked = true;
+ }
+ }
+
+ TParsedHeaders* p;
+ };
+
+ struct TAccCoding {
+ inline void operator()(const TStringBuf& s) {
+ c->insert(ToString(s));
+ }
+
+ TAcceptCodings* c;
+ };
+
+ template <class Functor>
+ inline void ForEach(TString in, Functor& f) {
+ in.to_lower();
+
+ const char* b = in.begin();
+ const char* c = b;
+ const char* e = in.end();
+
+ while (c != e) {
+ if (*c == ',') {
+ f(RmSemiColon(Trim(b, c)));
+ b = c + 1;
+ }
+
+ ++c;
+ }
+
+ if (b != c) {
+ f(RmSemiColon(Trim(b, c)));
+ }
+ }
+
+ inline bool IsRequest() const {
+ return strnicmp(FirstLine().data(), "get", 3) == 0 ||
+ strnicmp(FirstLine().data(), "post", 4) == 0 ||
+ strnicmp(FirstLine().data(), "put", 3) == 0 ||
+ strnicmp(FirstLine().data(), "patch", 5) == 0 ||
+ strnicmp(FirstLine().data(), "head", 4) == 0 ||
+ strnicmp(FirstLine().data(), "delete", 6) == 0;
+ }
+
+ inline void BuildInputChain() {
+ TParsedHeaders p;
+
+ size_t pos = FirstLine_.rfind(' ');
+ // In HTTP/1.1 Keep-Alive is turned on by default
+ if (pos != TString::npos && strcmp(FirstLine_.c_str() + pos + 1, "HTTP/1.1") == 0) {
+ p.KeepAlive = true; //request
+ } else if (strnicmp(FirstLine_.data(), "HTTP/1.1", 8) == 0) {
+ p.KeepAlive = true; //reply
+ }
+
+ for (THttpHeaders::TConstIterator h = Headers_.Begin(); h != Headers_.End(); ++h) {
+ const THttpInputHeader& header = *h;
+ switch (header.Name().size()) {
+ HEADERCMP(header, "transfer-encoding") {
+ TTrEnc f = {&p};
+ ForEach(header.Value(), f);
+ }
+ break;
+ HEADERCMP(header, "content-encoding") {
+ p.LZipped = header.Value();
+ }
+ break;
+ HEADERCMP(header, "accept-encoding") {
+ TAccCoding f = {&Codings_};
+ ForEach(header.Value(), f);
+ }
+ break;
+ HEADERCMP(header, "content-length") {
+ HasContentLength_ = true;
+ ContentLength_ = FromString(header.Value());
+ }
+ break;
+ HEADERCMP(header, "connection") {
+ // accept header "Connection: Keep-Alive, TE"
+ if (strnicmp(header.Value().data(), "keep-alive", 10) == 0) {
+ p.KeepAlive = true;
+ } else if (stricmp(header.Value().data(), "close") == 0) {
+ p.KeepAlive = false;
+ }
+ }
+ [[fallthrough]];
+ HEADERCMP(header, "expect") {
+ auto findContinue = [&](const TStringBuf& s) {
+ if (strnicmp(s.data(), "100-continue", 13) == 0) {
+ Expect100Continue_ = true;
+ }
+ };
+ ForEach(header.Value(), findContinue);
+ }
+ break;
+ }
+ }
+
+ if (p.Chunked) {
+ ChunkedInput_ = Streams_.Add(new TChunkedInput(&Buffered_, &Trailers_));
+ Input_ = ChunkedInput_;
+ } else {
+ // disable buffering
+ Buffered_.Reset(&Cnull);
+ Input_ = Streams_.Add(new TMultiInput(&Buffered_, Slave_));
+
+ if (IsRequest() || HasContentLength_) {
+ /*
+ * TODO - we have other cases
+ */
+ Input_ = Streams_.Add(new TLengthLimitedInput(Input_, ContentLength_));
+ }
+ }
+
+ if (auto decoder = TCompressionCodecFactory::Instance().FindDecoder(p.LZipped)) {
+ ContentEncoded_ = true;
+ Input_ = Streams_.Add((*decoder)(Input_).Release());
+ }
+
+ KeepAlive_ = p.KeepAlive;
+ }
+
+private:
+ IInputStream* Slave_;
+
+ /*
+ * input helpers
+ */
+ TBufferedInput Buffered_;
+ TStreams<IInputStream, 8> Streams_;
+ IInputStream* ChunkedInput_;
+
+ /*
+ * final input stream
+ */
+ IInputStream* Input_;
+
+ TString FirstLine_;
+ THttpHeaders Headers_;
+ TMaybe<THttpHeaders> Trailers_;
+ bool KeepAlive_;
+
+ TAcceptCodings Codings_;
+
+ bool HasContentLength_;
+ ui64 ContentLength_;
+
+ bool ContentEncoded_;
+ bool Expect100Continue_;
+};
+
+THttpInput::THttpInput(IInputStream* slave)
+ : Impl_(new TImpl(slave))
+{
+}
+
+THttpInput::THttpInput(THttpInput&& httpInput) = default;
+
+THttpInput::~THttpInput() {
+}
+
+size_t THttpInput::DoRead(void* buf, size_t len) {
+ return Impl_->Read(buf, len);
+}
+
+size_t THttpInput::DoSkip(size_t len) {
+ return Impl_->Skip(len);
+}
+
+const THttpHeaders& THttpInput::Headers() const noexcept {
+ return Impl_->Headers();
+}
+
+const TMaybe<THttpHeaders>& THttpInput::Trailers() const noexcept {
+ return Impl_->Trailers();
+}
+
+const TString& THttpInput::FirstLine() const noexcept {
+ return Impl_->FirstLine();
+}
+
+bool THttpInput::IsKeepAlive() const noexcept {
+ return Impl_->IsKeepAlive();
+}
+
+bool THttpInput::AcceptEncoding(const TString& coding) const {
+ return Impl_->AcceptEncoding(coding);
+}
+
+TString THttpInput::BestCompressionScheme(TArrayRef<const TStringBuf> codings) const {
+ return NHttp::ChooseBestCompressionScheme(
+ [this](const TString& coding) {
+ return AcceptEncoding(coding);
+ },
+ codings
+ );
+}
+
+TString THttpInput::BestCompressionScheme() const {
+ return BestCompressionScheme(TCompressionCodecFactory::Instance().GetBestCodecs());
+}
+
+bool THttpInput::GetContentLength(ui64& value) const noexcept {
+ return Impl_->GetContentLength(value);
+}
+
+bool THttpInput::ContentEncoded() const noexcept {
+ return Impl_->ContentEncoded();
+}
+
+bool THttpInput::HasContent() const noexcept {
+ return Impl_->HasContent();
+}
+
+bool THttpInput::HasExpect100Continue() const noexcept {
+ return Impl_->HasExpect100Continue();
+}
+
+class THttpOutput::TImpl {
+ class TSizeCalculator: public IOutputStream {
+ public:
+ inline TSizeCalculator() noexcept {
+ }
+
+ ~TSizeCalculator() override {
+ }
+
+ void DoWrite(const void* /*buf*/, size_t len) override {
+ Length_ += len;
+ }
+
+ inline size_t Length() const noexcept {
+ return Length_;
+ }
+
+ private:
+ size_t Length_ = 0;
+ };
+
+ enum TState {
+ Begin = 0,
+ FirstLineSent = 1,
+ HeadersSent = 2
+ };
+
+ struct TFlush {
+ inline void operator()(IOutputStream* s) {
+ s->Flush();
+ }
+ };
+
+ struct TFinish {
+ inline void operator()(IOutputStream* s) {
+ s->Finish();
+ }
+ };
+
+public:
+ inline TImpl(IOutputStream* slave, THttpInput* request)
+ : Slave_(slave)
+ , State_(Begin)
+ , Output_(Slave_)
+ , Request_(request)
+ , Version_(1100)
+ , KeepAliveEnabled_(false)
+ , BodyEncodingEnabled_(true)
+ , CompressionHeaderEnabled_(true)
+ , Finished_(false)
+ {
+ }
+
+ inline ~TImpl() {
+ }
+
+ inline void SendContinue() {
+ Output_->Write("HTTP/1.1 100 Continue\r\n\r\n");
+ Output_->Flush();
+ }
+
+ inline void Write(const void* buf, size_t len) {
+ if (Finished_) {
+ ythrow THttpException() << "can not write to finished stream";
+ }
+
+ if (State_ == HeadersSent) {
+ Output_->Write(buf, len);
+
+ return;
+ }
+
+ const char* b = (const char*)buf;
+ const char* e = b + len;
+ const char* c = b;
+
+ while (c != e) {
+ if (*c == '\n') {
+ Line_.append(b, c);
+
+ if (!Line_.empty() && Line_.back() == '\r') {
+ Line_.pop_back();
+ }
+
+ b = c + 1;
+
+ Process(Line_);
+
+ if (State_ == HeadersSent) {
+ Output_->Write(b, e - b);
+
+ return;
+ }
+
+ Line_.clear();
+ }
+
+ ++c;
+ }
+
+ if (b != c) {
+ Line_.append(b, c);
+ }
+ }
+
+ inline void Flush() {
+ TFlush f;
+ Streams_.ForEach(f);
+ Slave_->Flush(); // see SEARCH-1030
+ }
+
+ inline void Finish() {
+ if (Finished_) {
+ return;
+ }
+
+ TFinish f;
+ Streams_.ForEach(f);
+ Slave_->Finish(); // see SEARCH-1030
+
+ Finished_ = true;
+ }
+
+ inline const THttpHeaders& SentHeaders() const noexcept {
+ return Headers_;
+ }
+
+ inline void EnableCompression(TArrayRef<const TStringBuf> schemas) {
+ ComprSchemas_ = schemas;
+ }
+
+ inline void EnableKeepAlive(bool enable) {
+ KeepAliveEnabled_ = enable;
+ }
+
+ inline void EnableBodyEncoding(bool enable) {
+ BodyEncodingEnabled_ = enable;
+ }
+
+ inline void EnableCompressionHeader(bool enable) {
+ CompressionHeaderEnabled_ = enable;
+ }
+
+ inline bool IsCompressionEnabled() const noexcept {
+ return !ComprSchemas_.empty();
+ }
+
+ inline bool IsKeepAliveEnabled() const noexcept {
+ return KeepAliveEnabled_;
+ }
+
+ inline bool IsBodyEncodingEnabled() const noexcept {
+ return BodyEncodingEnabled_;
+ }
+
+ inline bool IsCompressionHeaderEnabled() const noexcept {
+ return CompressionHeaderEnabled_;
+ }
+
+ inline bool CanBeKeepAlive() const noexcept {
+ return SupportChunkedTransfer() && IsKeepAliveEnabled() && (Request_ ? Request_->IsKeepAlive() : true);
+ }
+
+ inline const TString& FirstLine() const noexcept {
+ return FirstLine_;
+ }
+
+ inline size_t SentSize() const noexcept {
+ return SizeCalculator_.Length();
+ }
+
+private:
+ static inline bool IsResponse(const TString& s) noexcept {
+ return strnicmp(s.data(), "HTTP/", 5) == 0;
+ }
+
+ static inline bool IsRequest(const TString& s) noexcept {
+ return !IsResponse(s);
+ }
+
+ inline bool IsHttpRequest() const noexcept {
+ return IsRequest(FirstLine_);
+ }
+
+ inline bool HasResponseBody() const noexcept {
+ if (IsHttpResponse()) {
+ if (Request_ && Request_->FirstLine().StartsWith(TStringBuf("HEAD")))
+ return false;
+ if (FirstLine_.size() > 9 && strncmp(FirstLine_.data() + 9, "204", 3) == 0)
+ return false;
+ return true;
+ }
+ return false;
+ }
+
+ inline bool IsHttpResponse() const noexcept {
+ return IsResponse(FirstLine_);
+ }
+
+ inline bool HasRequestBody() const noexcept {
+ return strnicmp(FirstLine_.data(), "POST", 4) == 0 ||
+ strnicmp(FirstLine_.data(), "PATCH", 5) == 0 ||
+ strnicmp(FirstLine_.data(), "PUT", 3) == 0;
+ }
+ static inline size_t ParseHttpVersion(const TString& s) {
+ if (s.empty()) {
+ ythrow THttpParseException() << "malformed http stream";
+ }
+
+ size_t parsed_version = 0;
+
+ if (IsResponse(s)) {
+ const char* b = s.data() + 5;
+
+ while (*b && *b != ' ') {
+ if (*b != '.') {
+ parsed_version *= 10;
+ parsed_version += (*b - '0');
+ }
+
+ ++b;
+ }
+ } else {
+ /*
+ * s not empty here
+ */
+ const char* e = s.end() - 1;
+ const char* b = s.begin();
+ size_t mult = 1;
+
+ while (e != b && *e != '/') {
+ if (*e != '.') {
+ parsed_version += (*e - '0') * mult;
+ mult *= 10;
+ }
+
+ --e;
+ }
+ }
+
+ return parsed_version * 100;
+ }
+
+ inline void ParseHttpVersion() {
+ size_t parsed_version = ParseHttpVersion(FirstLine_);
+
+ if (Request_) {
+ parsed_version = Min(parsed_version, ParseHttpVersion(Request_->FirstLine()));
+ }
+
+ Version_ = parsed_version;
+ }
+
+ inline void Process(const TString& s) {
+ Y_ASSERT(State_ != HeadersSent);
+
+ if (State_ == Begin) {
+ FirstLine_ = s;
+ ParseHttpVersion();
+ State_ = FirstLineSent;
+ } else {
+ if (s.empty()) {
+ BuildOutputStream();
+ WriteCached();
+ State_ = HeadersSent;
+ } else {
+ AddHeader(THttpInputHeader(s));
+ }
+ }
+ }
+
+ inline void WriteCachedImpl(IOutputStream* s) const {
+ s->Write(FirstLine_.data(), FirstLine_.size());
+ s->Write("\r\n", 2);
+ Headers_.OutTo(s);
+ s->Write("\r\n", 2);
+ s->Finish();
+ }
+
+ inline void WriteCached() {
+ size_t buflen = 0;
+
+ {
+ TSizeCalculator out;
+
+ WriteCachedImpl(&out);
+ buflen = out.Length();
+ }
+
+ {
+ TBufferedOutput out(Slave_, buflen);
+
+ WriteCachedImpl(&out);
+ }
+
+ if (IsHttpRequest() && !HasRequestBody()) {
+ /*
+ * if this is http request, then send it now
+ */
+
+ Slave_->Flush();
+ }
+ }
+
+ inline bool SupportChunkedTransfer() const noexcept {
+ return Version_ >= 1100;
+ }
+
+ inline void BuildOutputStream() {
+ if (CanBeKeepAlive()) {
+ AddOrReplaceHeader(THttpInputHeader("Connection", "Keep-Alive"));
+ } else {
+ AddOrReplaceHeader(THttpInputHeader("Connection", "Close"));
+ }
+
+ if (IsHttpResponse()) {
+ if (Request_ && IsCompressionEnabled() && HasResponseBody()) {
+ TString scheme = Request_->BestCompressionScheme(ComprSchemas_);
+ if (scheme != "identity") {
+ AddOrReplaceHeader(THttpInputHeader("Content-Encoding", scheme));
+ RemoveHeader("Content-Length");
+ }
+ }
+
+ RebuildStream();
+ } else {
+ if (IsCompressionEnabled()) {
+ AddOrReplaceHeader(THttpInputHeader("Accept-Encoding", BuildAcceptEncoding()));
+ }
+ if (HasRequestBody()) {
+ RebuildStream();
+ }
+ }
+ }
+
+ inline TString BuildAcceptEncoding() const {
+ TString ret;
+
+ for (const auto& coding : ComprSchemas_) {
+ if (ret) {
+ ret += ", ";
+ }
+
+ ret += coding;
+ }
+
+ return ret;
+ }
+
+ inline void RebuildStream() {
+ bool keepAlive = false;
+ const TCompressionCodecFactory::TEncoderConstructor* encoder = nullptr;
+ bool chunked = false;
+ bool haveContentLength = false;
+
+ for (THttpHeaders::TConstIterator h = Headers_.Begin(); h != Headers_.End(); ++h) {
+ const THttpInputHeader& header = *h;
+ const TString hl = to_lower(header.Name());
+
+ if (hl == TStringBuf("connection")) {
+ keepAlive = to_lower(header.Value()) == TStringBuf("keep-alive");
+ } else if (IsCompressionHeaderEnabled() && hl == TStringBuf("content-encoding")) {
+ encoder = TCompressionCodecFactory::Instance().FindEncoder(to_lower(header.Value()));
+ } else if (hl == TStringBuf("transfer-encoding")) {
+ chunked = to_lower(header.Value()) == TStringBuf("chunked");
+ } else if (hl == TStringBuf("content-length")) {
+ haveContentLength = true;
+ }
+ }
+
+ if (!haveContentLength && !chunked && (IsHttpRequest() || HasResponseBody()) && SupportChunkedTransfer() && (keepAlive || encoder || IsHttpRequest())) {
+ AddHeader(THttpInputHeader("Transfer-Encoding", "chunked"));
+ chunked = true;
+ }
+
+ if (IsBodyEncodingEnabled() && chunked) {
+ Output_ = Streams_.Add(new TChunkedOutput(Output_));
+ }
+
+ Output_ = Streams_.Add(new TTeeOutput(Output_, &SizeCalculator_));
+
+ if (IsBodyEncodingEnabled() && encoder) {
+ Output_ = Streams_.Add((*encoder)(Output_).Release());
+ }
+ }
+
+ inline void AddHeader(const THttpInputHeader& hdr) {
+ Headers_.AddHeader(hdr);
+ }
+
+ inline void AddOrReplaceHeader(const THttpInputHeader& hdr) {
+ Headers_.AddOrReplaceHeader(hdr);
+ }
+
+ inline void RemoveHeader(const TString& hdr) {
+ Headers_.RemoveHeader(hdr);
+ }
+
+private:
+ IOutputStream* Slave_;
+ TState State_;
+ IOutputStream* Output_;
+ TStreams<IOutputStream, 8> Streams_;
+ TString Line_;
+ TString FirstLine_;
+ THttpHeaders Headers_;
+ THttpInput* Request_;
+ size_t Version_;
+
+ TArrayRef<const TStringBuf> ComprSchemas_;
+
+ bool KeepAliveEnabled_;
+ bool BodyEncodingEnabled_;
+ bool CompressionHeaderEnabled_;
+
+ bool Finished_;
+
+ TSizeCalculator SizeCalculator_;
+};
+
+THttpOutput::THttpOutput(IOutputStream* slave)
+ : Impl_(new TImpl(slave, nullptr))
+{
+}
+
+THttpOutput::THttpOutput(IOutputStream* slave, THttpInput* request)
+ : Impl_(new TImpl(slave, request))
+{
+}
+
+THttpOutput::~THttpOutput() {
+ try {
+ Finish();
+ } catch (...) {
+ }
+}
+
+void THttpOutput::DoWrite(const void* buf, size_t len) {
+ Impl_->Write(buf, len);
+}
+
+void THttpOutput::DoFlush() {
+ Impl_->Flush();
+}
+
+void THttpOutput::DoFinish() {
+ Impl_->Finish();
+}
+
+const THttpHeaders& THttpOutput::SentHeaders() const noexcept {
+ return Impl_->SentHeaders();
+}
+
+void THttpOutput::EnableCompression(bool enable) {
+ if (enable) {
+ EnableCompression(TCompressionCodecFactory::Instance().GetBestCodecs());
+ } else {
+ TArrayRef<TStringBuf> codings;
+ EnableCompression(codings);
+ }
+}
+
+void THttpOutput::EnableCompression(TArrayRef<const TStringBuf> schemas) {
+ Impl_->EnableCompression(schemas);
+}
+
+void THttpOutput::EnableKeepAlive(bool enable) {
+ Impl_->EnableKeepAlive(enable);
+}
+
+void THttpOutput::EnableBodyEncoding(bool enable) {
+ Impl_->EnableBodyEncoding(enable);
+}
+
+void THttpOutput::EnableCompressionHeader(bool enable) {
+ Impl_->EnableCompressionHeader(enable);
+}
+
+bool THttpOutput::IsKeepAliveEnabled() const noexcept {
+ return Impl_->IsKeepAliveEnabled();
+}
+
+bool THttpOutput::IsBodyEncodingEnabled() const noexcept {
+ return Impl_->IsBodyEncodingEnabled();
+}
+
+bool THttpOutput::IsCompressionEnabled() const noexcept {
+ return Impl_->IsCompressionEnabled();
+}
+
+bool THttpOutput::IsCompressionHeaderEnabled() const noexcept {
+ return Impl_->IsCompressionHeaderEnabled();
+}
+
+bool THttpOutput::CanBeKeepAlive() const noexcept {
+ return Impl_->CanBeKeepAlive();
+}
+
+void THttpOutput::SendContinue() {
+ Impl_->SendContinue();
+}
+
+const TString& THttpOutput::FirstLine() const noexcept {
+ return Impl_->FirstLine();
+}
+
+size_t THttpOutput::SentSize() const noexcept {
+ return Impl_->SentSize();
+}
+
+unsigned ParseHttpRetCode(const TStringBuf& ret) {
+ const TStringBuf code = StripString(StripString(ret.After(' ')).Before(' '));
+
+ return FromString<unsigned>(code.data(), code.size());
+}
+
+void SendMinimalHttpRequest(TSocket& s, const TStringBuf& host, const TStringBuf& request, const TStringBuf& agent, const TStringBuf& from) {
+ TSocketOutput so(s);
+ THttpOutput output(&so);
+
+ output.EnableKeepAlive(false);
+ output.EnableCompression(false);
+
+ const IOutputStream::TPart parts[] = {
+ IOutputStream::TPart(TStringBuf("GET ")),
+ IOutputStream::TPart(request),
+ IOutputStream::TPart(TStringBuf(" HTTP/1.1")),
+ IOutputStream::TPart::CrLf(),
+ IOutputStream::TPart(TStringBuf("Host: ")),
+ IOutputStream::TPart(host),
+ IOutputStream::TPart::CrLf(),
+ IOutputStream::TPart(TStringBuf("User-Agent: ")),
+ IOutputStream::TPart(agent),
+ IOutputStream::TPart::CrLf(),
+ IOutputStream::TPart(TStringBuf("From: ")),
+ IOutputStream::TPart(from),
+ IOutputStream::TPart::CrLf(),
+ IOutputStream::TPart::CrLf(),
+ };
+
+ output.Write(parts, sizeof(parts) / sizeof(*parts));
+ output.Finish();
+}
+
+TArrayRef<const TStringBuf> SupportedCodings() {
+ return TCompressionCodecFactory::Instance().GetBestCodecs();
+}
diff --git a/library/cpp/http/io/stream.h b/library/cpp/http/io/stream.h
new file mode 100644
index 0000000000..78ca4fc814
--- /dev/null
+++ b/library/cpp/http/io/stream.h
@@ -0,0 +1,178 @@
+#pragma once
+
+#include "headers.h"
+
+#include <util/stream/output.h>
+#include <util/generic/maybe.h>
+#include <util/generic/ptr.h>
+#include <util/generic/string.h>
+#include <util/generic/strbuf.h>
+#include <util/generic/yexception.h>
+#include <util/generic/array_ref.h>
+
+class TSocket;
+
+struct THttpException: public yexception {
+};
+
+struct THttpParseException: public THttpException {
+};
+
+struct THttpReadException: public THttpException {
+};
+
+/// Чтение ответа HTTP-сервера.
+class THttpInput: public IInputStream {
+public:
+ THttpInput(IInputStream* slave);
+ THttpInput(THttpInput&& httpInput);
+ ~THttpInput() override;
+
+ /*
+ * parsed http headers
+ */
+ /// Возвращает контейнер с заголовками ответа HTTP-сервера.
+ const THttpHeaders& Headers() const noexcept;
+
+ /*
+ * parsed http trailers
+ */
+ /// Возвращает контейнер (возможно пустой) с trailer'ами ответа HTTP-сервера.
+ /// Поток должен быть вычитан полностью прежде чем trailer'ы будут доступны.
+ /// Пока поток не вычитан до конца возвращается Nothing.
+ /// https://tools.ietf.org/html/rfc7230#section-4.1.2
+ const TMaybe<THttpHeaders>& Trailers() const noexcept;
+
+ /*
+ * first line - response or request
+ */
+ /// Возвращает первую строку ответа HTTP-сервера.
+ /// @details Первая строка HTTP-сервера - строка состояния,
+ /// содержащая три поля: версию HTTP, код состояния и описание.
+ const TString& FirstLine() const noexcept;
+
+ /*
+ * connection can be keep-alive
+ */
+ /// Проверяет, не завершено ли соединение с сервером.
+ /// @details Транзакция считается завершенной, если не передан заголовок
+ /// "Connection: Keep Alive".
+ bool IsKeepAlive() const noexcept;
+
+ /*
+ * output data can be encoded
+ */
+ /// Проверяет, поддерживается ли данный тип кодирования содержимого
+ /// ответа HTTP-сервера.
+ bool AcceptEncoding(const TString& coding) const;
+
+ /// Пытается определить наилучший тип кодирования ответа HTTP-сервера.
+ /// @details Если ответ сервера говорит о том, что поддерживаются
+ /// любые типы кодирования, выбирается gzip. В противном случае
+ /// из списка типов кодирования выбирается лучший из поддерживаемых сервером.
+ TString BestCompressionScheme() const;
+ TString BestCompressionScheme(TArrayRef<const TStringBuf> codings) const;
+
+ /// Если заголовки содержат Content-Length, возвращает true и
+ /// записывает значение из заголовка в value
+ bool GetContentLength(ui64& value) const noexcept;
+
+ /// Признак запакованности данных, - если выставлен, то Content-Length, при наличии в заголовках,
+ /// показывает объём запакованных данных, а из THttpInput мы будем вычитывать уже распакованные.
+ bool ContentEncoded() const noexcept;
+
+ /// Returns true if Content-Length or Transfer-Encoding header received
+ bool HasContent() const noexcept;
+
+ bool HasExpect100Continue() const noexcept;
+
+private:
+ size_t DoRead(void* buf, size_t len) override;
+ size_t DoSkip(size_t len) override;
+
+private:
+ class TImpl;
+ THolder<TImpl> Impl_;
+};
+
+/// Передача запроса HTTP-серверу.
+class THttpOutput: public IOutputStream {
+public:
+ THttpOutput(IOutputStream* slave);
+ THttpOutput(IOutputStream* slave, THttpInput* request);
+ ~THttpOutput() override;
+
+ /*
+ * sent http headers
+ */
+ /// Возвращает контейнер с заголовками запроса к HTTP-серверу.
+ const THttpHeaders& SentHeaders() const noexcept;
+
+ /// Устанавливает режим, при котором сервер выдает ответ в упакованном виде.
+ void EnableCompression(bool enable);
+ void EnableCompression(TArrayRef<const TStringBuf> schemas);
+
+ /// Устанавливает режим, при котором соединение с сервером не завершается
+ /// после окончания транзакции.
+ void EnableKeepAlive(bool enable);
+
+ /// Устанавливает режим, при котором тело HTTP-запроса/ответа преобразуется в соответствии
+ /// с заголовками Content-Encoding и Transfer-Encoding (включен по умолчанию)
+ void EnableBodyEncoding(bool enable);
+
+ /// Устанавливает режим, при котором тело HTTP-ответа сжимается кодеком
+ /// указанным в Content-Encoding (включен по умолчанию)
+ void EnableCompressionHeader(bool enable);
+
+ /// Проверяет, производится ли выдача ответов в упакованном виде.
+ bool IsCompressionEnabled() const noexcept;
+
+ /// Проверяет, не завершается ли соединение с сервером после окончания транзакции.
+ bool IsKeepAliveEnabled() const noexcept;
+
+ /// Проверяет, преобразуется ли тело HTTP-запроса/ответа в соответствии
+ /// с заголовками Content-Encoding и Transfer-Encoding
+ bool IsBodyEncodingEnabled() const noexcept;
+
+ /// Проверяет, сжимается ли тело HTTP-ответа кодеком
+ /// указанным в Content-Encoding
+ bool IsCompressionHeaderEnabled() const noexcept;
+
+ /*
+ * is this connection can be really keep-alive
+ */
+ /// Проверяет, можно ли установить режим, при котором соединение с сервером
+ /// не завершается после окончания транзакции.
+ bool CanBeKeepAlive() const noexcept;
+
+ void SendContinue();
+
+ /*
+ * first line - response or request
+ */
+ /// Возвращает первую строку HTTP-запроса/ответа
+ const TString& FirstLine() const noexcept;
+
+ /// Возвращает размер отправленных данных (без заголовков, с учётом сжатия, без
+ /// учёта chunked transfer encoding)
+ size_t SentSize() const noexcept;
+
+private:
+ void DoWrite(const void* buf, size_t len) override;
+ void DoFlush() override;
+ void DoFinish() override;
+
+private:
+ class TImpl;
+ THolder<TImpl> Impl_;
+};
+
+/// Возвращает код состояния из ответа сервера.
+unsigned ParseHttpRetCode(const TStringBuf& ret);
+
+/// Отправляет HTTP-серверу запрос с минимумом необходимых заголовков.
+void SendMinimalHttpRequest(TSocket& s, const TStringBuf& host, const TStringBuf& request, const TStringBuf& agent = "YandexSomething/1.0", const TStringBuf& from = "webadmin@yandex.ru");
+
+TArrayRef<const TStringBuf> SupportedCodings();
+
+/// @}
diff --git a/library/cpp/http/io/stream_ut.cpp b/library/cpp/http/io/stream_ut.cpp
new file mode 100644
index 0000000000..1ea35df675
--- /dev/null
+++ b/library/cpp/http/io/stream_ut.cpp
@@ -0,0 +1,732 @@
+#include "stream.h"
+#include "chunk.h"
+
+#include <library/cpp/http/server/http_ex.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+#include <library/cpp/testing/unittest/tests_data.h>
+
+#include <util/string/printf.h>
+#include <util/network/socket.h>
+#include <util/stream/file.h>
+#include <util/stream/output.h>
+#include <util/stream/tee.h>
+#include <util/stream/zlib.h>
+#include <util/stream/null.h>
+
+Y_UNIT_TEST_SUITE(THttpStreamTest) {
+ class TTestHttpServer: public THttpServer::ICallBack {
+ class TRequest: public THttpClientRequestEx {
+ public:
+ inline TRequest(TTestHttpServer* parent)
+ : Parent_(parent)
+ {
+ }
+
+ bool Reply(void* /*tsr*/) override {
+ if (!ProcessHeaders()) {
+ return true;
+ }
+
+ // Check that function will not hang on
+ Input().ReadAll();
+
+ // "lo" is for "local"
+ if (RD.ServerName() == "yandex.lo") {
+ // do redirect
+ Output() << "HTTP/1.1 301 Moved permanently\r\n"
+ "Location: http://www.yandex.lo\r\n"
+ "\r\n";
+ } else if (RD.ServerName() == "www.yandex.lo") {
+ Output() << "HTTP/1.1 200 Ok\r\n"
+ "\r\n";
+ } else {
+ Output() << "HTTP/1.1 200 Ok\r\n\r\n";
+ if (Buf.Size()) {
+ Output().Write(Buf.AsCharPtr(), Buf.Size());
+ } else {
+ Output() << Parent_->Res_;
+ }
+ }
+ Output().Finish();
+
+ Parent_->LastRequestSentSize_ = Output().SentSize();
+
+ return true;
+ }
+
+ private:
+ TTestHttpServer* Parent_ = nullptr;
+ };
+
+ public:
+ inline TTestHttpServer(const TString& res)
+ : Res_(res)
+ {
+ }
+
+ TClientRequest* CreateClient() override {
+ return new TRequest(this);
+ }
+
+ size_t LastRequestSentSize() const {
+ return LastRequestSentSize_;
+ }
+
+ private:
+ TString Res_;
+ size_t LastRequestSentSize_ = 0;
+ };
+
+ Y_UNIT_TEST(TestCodings1) {
+ UNIT_ASSERT(SupportedCodings().size() > 0);
+ }
+
+ Y_UNIT_TEST(TestHttpInput) {
+ TString res = "I'm a teapot";
+ TPortManager pm;
+ const ui16 port = pm.GetPort();
+
+ TTestHttpServer serverImpl(res);
+ THttpServer server(&serverImpl, THttpServer::TOptions(port).EnableKeepAlive(true).EnableCompression(true));
+
+ UNIT_ASSERT(server.Start());
+
+ TNetworkAddress addr("localhost", port);
+ TSocket s(addr);
+
+ //TDebugOutput dbg;
+ TNullOutput dbg;
+
+ {
+ TSocketOutput so(s);
+ TTeeOutput out(&so, &dbg);
+ THttpOutput output(&out);
+
+ output.EnableKeepAlive(true);
+ output.EnableCompression(true);
+
+ TString r;
+ r += "GET / HTTP/1.1";
+ r += "\r\n";
+ r += "Host: yandex.lo";
+ r += "\r\n";
+ r += "\r\n";
+
+ output.Write(r.data(), r.size());
+ output.Finish();
+ }
+
+ {
+ TSocketInput si(s);
+ THttpInput input(&si);
+ unsigned httpCode = ParseHttpRetCode(input.FirstLine());
+ UNIT_ASSERT_VALUES_EQUAL(httpCode / 10, 30u);
+
+ TransferData(&input, &dbg);
+ }
+ server.Stop();
+ }
+
+ Y_UNIT_TEST(TestHttpInputDelete) {
+ TString res = "I'm a teapot";
+ TPortManager pm;
+ const ui16 port = pm.GetPort();
+
+ TTestHttpServer serverImpl(res);
+ THttpServer server(&serverImpl, THttpServer::TOptions(port).EnableKeepAlive(true).EnableCompression(true));
+
+ UNIT_ASSERT(server.Start());
+
+ TNetworkAddress addr("localhost", port);
+ TSocket s(addr);
+
+ //TDebugOutput dbg;
+ TNullOutput dbg;
+
+ {
+ TSocketOutput so(s);
+ TTeeOutput out(&so, &dbg);
+ THttpOutput output(&out);
+
+ output.EnableKeepAlive(true);
+ output.EnableCompression(true);
+
+ TString r;
+ r += "DELETE / HTTP/1.1";
+ r += "\r\n";
+ r += "Host: yandex.lo";
+ r += "\r\n";
+ r += "\r\n";
+
+ output.Write(r.data(), r.size());
+ output.Finish();
+ }
+
+ {
+ TSocketInput si(s);
+ THttpInput input(&si);
+ unsigned httpCode = ParseHttpRetCode(input.FirstLine());
+ UNIT_ASSERT_VALUES_EQUAL(httpCode / 10, 30u);
+
+ TransferData(&input, &dbg);
+ }
+ server.Stop();
+ }
+
+ Y_UNIT_TEST(TestParseHttpRetCode) {
+ UNIT_ASSERT_VALUES_EQUAL(ParseHttpRetCode("HTTP/1.1 301"), 301u);
+ }
+
+ Y_UNIT_TEST(TestKeepAlive) {
+ {
+ TString s = "GET / HTTP/1.0\r\n\r\n";
+ TStringInput si(s);
+ THttpInput in(&si);
+ UNIT_ASSERT(!in.IsKeepAlive());
+ }
+
+ {
+ TString s = "GET / HTTP/1.0\r\nConnection: keep-alive\r\n\r\n";
+ TStringInput si(s);
+ THttpInput in(&si);
+ UNIT_ASSERT(in.IsKeepAlive());
+ }
+
+ {
+ TString s = "GET / HTTP/1.1\r\n\r\n";
+ TStringInput si(s);
+ THttpInput in(&si);
+ UNIT_ASSERT(in.IsKeepAlive());
+ }
+
+ {
+ TString s = "GET / HTTP/1.1\r\nConnection: close\r\n\r\n";
+ TStringInput si(s);
+ THttpInput in(&si);
+ UNIT_ASSERT(!in.IsKeepAlive());
+ }
+
+ {
+ TString s = "HTTP/1.0 200 Ok\r\n\r\n";
+ TStringInput si(s);
+ THttpInput in(&si);
+ UNIT_ASSERT(!in.IsKeepAlive());
+ }
+
+ {
+ TString s = "HTTP/1.0 200 Ok\r\nConnection: keep-alive\r\n\r\n";
+ TStringInput si(s);
+ THttpInput in(&si);
+ UNIT_ASSERT(in.IsKeepAlive());
+ }
+
+ {
+ TString s = "HTTP/1.1 200 Ok\r\n\r\n";
+ TStringInput si(s);
+ THttpInput in(&si);
+ UNIT_ASSERT(in.IsKeepAlive());
+ }
+
+ {
+ TString s = "HTTP/1.1 200 Ok\r\nConnection: close\r\n\r\n";
+ TStringInput si(s);
+ THttpInput in(&si);
+ UNIT_ASSERT(!in.IsKeepAlive());
+ }
+ }
+
+ Y_UNIT_TEST(TestMinRequest) {
+ TString res = "qqqqqq";
+ TPortManager pm;
+ const ui16 port = pm.GetPort();
+
+ TTestHttpServer serverImpl(res);
+ THttpServer server(&serverImpl, THttpServer::TOptions(port).EnableKeepAlive(true).EnableCompression(true));
+
+ UNIT_ASSERT(server.Start());
+
+ TNetworkAddress addr("localhost", port);
+
+ TSocket s(addr);
+ TNullOutput dbg;
+
+ SendMinimalHttpRequest(s, "www.yandex.lo", "/");
+
+ TSocketInput si(s);
+ THttpInput input(&si);
+ unsigned httpCode = ParseHttpRetCode(input.FirstLine());
+ UNIT_ASSERT_VALUES_EQUAL(httpCode, 200u);
+
+ TransferData(&input, &dbg);
+ server.Stop();
+ }
+
+ Y_UNIT_TEST(TestResponseWithBlanks) {
+ TString res = "qqqqqq\r\n\r\nsdasdsad\r\n";
+ TPortManager pm;
+ const ui16 port = pm.GetPort();
+
+ TTestHttpServer serverImpl(res);
+ THttpServer server(&serverImpl, THttpServer::TOptions(port).EnableKeepAlive(true).EnableCompression(true));
+
+ UNIT_ASSERT(server.Start());
+
+ TNetworkAddress addr("localhost", port);
+
+ TSocket s(addr);
+
+ SendMinimalHttpRequest(s, "www.yandex.ru", "/");
+
+ TSocketInput si(s);
+ THttpInput input(&si);
+ unsigned httpCode = ParseHttpRetCode(input.FirstLine());
+ UNIT_ASSERT_VALUES_EQUAL(httpCode, 200u);
+ TString reply = input.ReadAll();
+ UNIT_ASSERT_VALUES_EQUAL(reply, res);
+ server.Stop();
+ }
+
+ Y_UNIT_TEST(TestOutputFlush) {
+ TString str;
+ TStringOutput strOut(str);
+ TBufferedOutput bufOut(&strOut, 8192);
+ THttpOutput httpOut(&bufOut);
+
+ httpOut.EnableKeepAlive(true);
+ httpOut.EnableCompression(true);
+
+ const char* header = "GET / HTTP/1.1\r\nHost: yandex.ru\r\n\r\n";
+ httpOut << header;
+
+ unsigned curLen = str.size();
+ const char* body = "<html>Hello</html>";
+ httpOut << body;
+ UNIT_ASSERT_VALUES_EQUAL(curLen, str.size());
+ httpOut.Flush();
+ UNIT_ASSERT_VALUES_EQUAL(curLen + strlen(body), str.size());
+ }
+
+ Y_UNIT_TEST(TestOutputPostFlush) {
+ TString str;
+ TString checkStr;
+ TStringOutput strOut(str);
+ TStringOutput checkOut(checkStr);
+ TBufferedOutput bufOut(&strOut, 8192);
+ TTeeOutput teeOut(&bufOut, &checkOut);
+ THttpOutput httpOut(&teeOut);
+
+ httpOut.EnableKeepAlive(true);
+ httpOut.EnableCompression(true);
+
+ const char* header = "POST / HTTP/1.1\r\nHost: yandex.ru\r\n\r\n";
+ httpOut << header;
+
+ UNIT_ASSERT_VALUES_EQUAL(str.size(), 0u);
+
+ const char* body = "<html>Hello</html>";
+ httpOut << body;
+ UNIT_ASSERT_VALUES_EQUAL(str.size(), 0u);
+
+ httpOut.Flush();
+ UNIT_ASSERT_VALUES_EQUAL(checkStr.size(), str.size());
+ }
+
+ TString MakeHttpOutputBody(const char* body, bool encodingEnabled) {
+ TString str;
+ TStringOutput strOut(str);
+ {
+ TBufferedOutput bufOut(&strOut, 8192);
+ THttpOutput httpOut(&bufOut);
+
+ httpOut.EnableKeepAlive(true);
+ httpOut.EnableCompression(true);
+ httpOut.EnableBodyEncoding(encodingEnabled);
+
+ httpOut << "POST / HTTP/1.1\r\n";
+ httpOut << "Host: yandex.ru\r\n";
+ httpOut << "Content-Encoding: gzip\r\n";
+ httpOut << "\r\n";
+
+ UNIT_ASSERT_VALUES_EQUAL(str.size(), 0u);
+ httpOut << body;
+ }
+ const char* bodyDelimiter = "\r\n\r\n";
+ size_t bodyPos = str.find(bodyDelimiter);
+ UNIT_ASSERT(bodyPos != TString::npos);
+ return str.substr(bodyPos + strlen(bodyDelimiter));
+ };
+
+ TString SimulateBodyEncoding(const char* body) {
+ TString bodyStr;
+ TStringOutput bodyOut(bodyStr);
+ TChunkedOutput chunkOut(&bodyOut);
+ TZLibCompress comprOut(&chunkOut, ZLib::GZip);
+ comprOut << body;
+ return bodyStr;
+ };
+
+ Y_UNIT_TEST(TestRebuildStreamOnPost) {
+ const char* body = "<html>Hello</html>";
+ UNIT_ASSERT(MakeHttpOutputBody(body, false) == body);
+ UNIT_ASSERT(MakeHttpOutputBody(body, true) == SimulateBodyEncoding(body));
+ }
+
+ Y_UNIT_TEST(TestOutputFinish) {
+ TString str;
+ TStringOutput strOut(str);
+ TBufferedOutput bufOut(&strOut, 8192);
+ THttpOutput httpOut(&bufOut);
+
+ httpOut.EnableKeepAlive(true);
+ httpOut.EnableCompression(true);
+
+ const char* header = "GET / HTTP/1.1\r\nHost: yandex.ru\r\n\r\n";
+ httpOut << header;
+
+ unsigned curLen = str.size();
+ const char* body = "<html>Hello</html>";
+ httpOut << body;
+ UNIT_ASSERT_VALUES_EQUAL(curLen, str.size());
+ httpOut.Finish();
+ UNIT_ASSERT_VALUES_EQUAL(curLen + strlen(body), str.size());
+ }
+
+ Y_UNIT_TEST(TestMultilineHeaders) {
+ const char* headerLine0 = "HTTP/1.1 200 OK";
+ const char* headerLine1 = "Content-Language: en";
+ const char* headerLine2 = "Vary: Accept-Encoding, ";
+ const char* headerLine3 = "\tAccept-Language";
+ const char* headerLine4 = "Content-Length: 18";
+
+ TString endLine("\r\n");
+ TString r;
+ r += headerLine0 + endLine;
+ r += headerLine1 + endLine;
+ r += headerLine2 + endLine;
+ r += headerLine3 + endLine;
+ r += headerLine4 + endLine + endLine;
+ r += "<html>Hello</html>";
+ TStringInput stringInput(r);
+ THttpInput input(&stringInput);
+
+ const THttpHeaders& httpHeaders = input.Headers();
+ UNIT_ASSERT_VALUES_EQUAL(httpHeaders.Count(), 3u);
+
+ THttpHeaders::TConstIterator it = httpHeaders.Begin();
+ UNIT_ASSERT_VALUES_EQUAL(it->ToString(), TString(headerLine1));
+ UNIT_ASSERT_VALUES_EQUAL((++it)->ToString(), TString::Join(headerLine2, headerLine3));
+ UNIT_ASSERT_VALUES_EQUAL((++it)->ToString(), TString(headerLine4));
+ }
+
+ Y_UNIT_TEST(ContentLengthRemoval) {
+ TMemoryInput request("GET / HTTP/1.1\r\nAccept-Encoding: gzip\r\n\r\n");
+ THttpInput i(&request);
+ TString result;
+ TStringOutput out(result);
+ THttpOutput httpOut(&out, &i);
+
+ httpOut.EnableKeepAlive(true);
+ httpOut.EnableCompression(true);
+ httpOut << "HTTP/1.1 200 OK\r\n";
+ char answer[] = "Mary had a little lamb.";
+ httpOut << "Content-Length: " << strlen(answer) << "\r\n"
+ "\r\n";
+ httpOut << answer;
+ httpOut.Finish();
+
+ Cdbg << result;
+ result.to_lower();
+ UNIT_ASSERT(result.Contains("content-encoding: gzip"));
+ UNIT_ASSERT(!result.Contains("content-length"));
+ }
+
+ Y_UNIT_TEST(CodecsPriority) {
+ TMemoryInput request("GET / HTTP/1.1\r\nAccept-Encoding: gzip, br\r\n\r\n");
+ TVector<TStringBuf> codecs = {"br", "gzip"};
+
+ THttpInput i(&request);
+ TString result;
+ TStringOutput out(result);
+ THttpOutput httpOut(&out, &i);
+
+ httpOut.EnableKeepAlive(true);
+ httpOut.EnableCompression(codecs);
+ httpOut << "HTTP/1.1 200 OK\r\n";
+ char answer[] = "Mary had a little lamb.";
+ httpOut << "Content-Length: " << strlen(answer) << "\r\n"
+ "\r\n";
+ httpOut << answer;
+ httpOut.Finish();
+
+ Cdbg << result;
+ result.to_lower();
+ UNIT_ASSERT(result.Contains("content-encoding: br"));
+ }
+
+ Y_UNIT_TEST(CodecsPriority2) {
+ TMemoryInput request("GET / HTTP/1.1\r\nAccept-Encoding: gzip, br\r\n\r\n");
+ TVector<TStringBuf> codecs = {"gzip", "br"};
+
+ THttpInput i(&request);
+ TString result;
+ TStringOutput out(result);
+ THttpOutput httpOut(&out, &i);
+
+ httpOut.EnableKeepAlive(true);
+ httpOut.EnableCompression(codecs);
+ httpOut << "HTTP/1.1 200 OK\r\n";
+ char answer[] = "Mary had a little lamb.";
+ httpOut << "Content-Length: " << strlen(answer) << "\r\n"
+ "\r\n";
+ httpOut << answer;
+ httpOut.Finish();
+
+ Cdbg << result;
+ result.to_lower();
+ UNIT_ASSERT(result.Contains("content-encoding: gzip"));
+ }
+
+ Y_UNIT_TEST(HasTrailers) {
+ TMemoryInput response(
+ "HTTP/1.1 200 OK\r\n"
+ "Transfer-Encoding: chunked\r\n"
+ "\r\n"
+ "3\r\n"
+ "foo"
+ "0\r\n"
+ "Bar: baz\r\n"
+ "\r\n");
+ THttpInput i(&response);
+ TMaybe<THttpHeaders> trailers = i.Trailers();
+ UNIT_ASSERT(!trailers.Defined());
+ i.ReadAll();
+ trailers = i.Trailers();
+ UNIT_ASSERT_VALUES_EQUAL(trailers.GetRef().Count(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(trailers.GetRef().Begin()->ToString(), "Bar: baz");
+ }
+
+ Y_UNIT_TEST(NoTrailersWithChunks) {
+ TMemoryInput response(
+ "HTTP/1.1 200 OK\r\n"
+ "Transfer-Encoding: chunked\r\n"
+ "\r\n"
+ "3\r\n"
+ "foo"
+ "0\r\n"
+ "\r\n");
+ THttpInput i(&response);
+ TMaybe<THttpHeaders> trailers = i.Trailers();
+ UNIT_ASSERT(!trailers.Defined());
+ i.ReadAll();
+ trailers = i.Trailers();
+ UNIT_ASSERT_VALUES_EQUAL(trailers.GetRef().Count(), 0);
+ }
+
+ Y_UNIT_TEST(NoTrailersNoChunks) {
+ TMemoryInput response(
+ "HTTP/1.1 200 OK\r\n"
+ "Content-Length: 3\r\n"
+ "\r\n"
+ "bar");
+ THttpInput i(&response);
+ TMaybe<THttpHeaders> trailers = i.Trailers();
+ UNIT_ASSERT(!trailers.Defined());
+ i.ReadAll();
+ trailers = i.Trailers();
+ UNIT_ASSERT_VALUES_EQUAL(trailers.GetRef().Count(), 0);
+ }
+
+ Y_UNIT_TEST(RequestWithoutContentLength) {
+ TStringStream request;
+ {
+ THttpOutput httpOutput(&request);
+ httpOutput << "POST / HTTP/1.1\r\n"
+ "Host: yandex.ru\r\n"
+ "\r\n";
+ httpOutput << "GGLOL";
+ }
+ {
+ TStringInput input(request.Str());
+ THttpInput httpInput(&input);
+ bool chunkedOrHasContentLength = false;
+ for (const auto& header : httpInput.Headers()) {
+ if (header.Name() == "Transfer-Encoding" && header.Value() == "chunked" || header.Name() == "Content-Length") {
+ chunkedOrHasContentLength = true;
+ }
+ }
+
+ // If request doesn't contain neither Content-Length header nor Transfer-Encoding header
+ // then server considers message body length to be zero.
+ // (See https://tools.ietf.org/html/rfc7230#section-3.3.3)
+ UNIT_ASSERT(chunkedOrHasContentLength);
+
+ UNIT_ASSERT_VALUES_EQUAL(httpInput.ReadAll(), "GGLOL");
+ }
+ }
+
+ Y_UNIT_TEST(TestInputHasContent) {
+ {
+ TStringStream request;
+ request << "POST / HTTP/1.1\r\n"
+ "Host: yandex.ru\r\n"
+ "\r\n";
+ request << "HTTPDATA";
+
+ TStringInput input(request.Str());
+ THttpInput httpInput(&input);
+
+ UNIT_ASSERT(!httpInput.HasContent());
+ UNIT_ASSERT_VALUES_EQUAL(httpInput.ReadAll(), "");
+ }
+
+ {
+ TStringStream request;
+ request << "POST / HTTP/1.1\r\n"
+ "Host: yandex.ru\r\n"
+ "Content-Length: 8"
+ "\r\n\r\n";
+ request << "HTTPDATA";
+
+ TStringInput input(request.Str());
+ THttpInput httpInput(&input);
+
+ UNIT_ASSERT(httpInput.HasContent());
+ UNIT_ASSERT_VALUES_EQUAL(httpInput.ReadAll(), "HTTPDATA");
+ }
+
+ {
+ TStringStream request;
+ request << "POST / HTTP/1.1\r\n"
+ "Host: yandex.ru\r\n"
+ "Transfer-Encoding: chunked"
+ "\r\n\r\n";
+ request << "8\r\nHTTPDATA\r\n0\r\n";
+
+ TStringInput input(request.Str());
+ THttpInput httpInput(&input);
+
+ UNIT_ASSERT(httpInput.HasContent());
+ UNIT_ASSERT_VALUES_EQUAL(httpInput.ReadAll(), "HTTPDATA");
+ }
+ }
+
+ Y_UNIT_TEST(TestHttpInputHeadRequest) {
+ class THeadOnlyInput: public IInputStream {
+ public:
+ THeadOnlyInput() = default;
+
+ private:
+ size_t DoRead(void* buf, size_t len) override {
+ if (Eof_) {
+ ythrow yexception() << "should not read after EOF";
+ }
+
+ const size_t toWrite = Min(len, Data_.size() - Pos_);
+ if (toWrite == 0) {
+ Eof_ = true;
+ return 0;
+ }
+
+ memcpy(buf, Data_.data() + Pos_, toWrite);
+ Pos_ += toWrite;
+ return toWrite;
+ }
+
+ private:
+ TString Data_{TStringBuf("HEAD / HTTP/1.1\r\nHost: yandex.ru\r\n\r\n")};
+ size_t Pos_{0};
+ bool Eof_{false};
+ };
+ THeadOnlyInput input;
+ THttpInput httpInput(&input);
+
+ UNIT_ASSERT(!httpInput.HasContent());
+ UNIT_ASSERT_VALUES_EQUAL(httpInput.ReadAll(), "");
+ }
+
+ Y_UNIT_TEST(TestHttpOutputResponseToHeadRequestNoZeroChunk) {
+ TStringStream request;
+ request << "HEAD / HTTP/1.1\r\n"
+ "Host: yandex.ru\r\n"
+ "Connection: Keep-Alive\r\n"
+ "\r\n";
+
+ TStringInput input(request.Str());
+ THttpInput httpInput(&input);
+
+ TStringStream outBuf;
+ THttpOutput out(&outBuf, &httpInput);
+ out.EnableKeepAlive(true);
+ out << "HTTP/1.1 200 OK\r\nConnection: Keep-Alive\r\n\r\n";
+ out << "";
+ out.Finish();
+ TString result = outBuf.Str();
+ UNIT_ASSERT(!result.Contains(TStringBuf("0\r\n")));
+ }
+
+ Y_UNIT_TEST(TestHttpOutputDisableCompressionHeader) {
+ TMemoryInput request("GET / HTTP/1.1\r\nAccept-Encoding: gzip\r\n\r\n");
+ const TString data = "qqqqqqqqqqqqqqqqqqqqqqqqqqqqqq";
+
+ THttpInput httpInput(&request);
+ TString result;
+
+ {
+ TStringOutput output(result);
+ THttpOutput httpOutput(&output, &httpInput);
+ httpOutput.EnableCompressionHeader(false);
+ httpOutput << "HTTP/1.1 200 OK\r\n"
+ "content-encoding: gzip\r\n"
+ "\r\n" + data;
+ httpOutput.Finish();
+ }
+
+ UNIT_ASSERT(result.Contains("content-encoding: gzip"));
+ UNIT_ASSERT(result.Contains(data));
+ }
+
+ size_t DoTestHttpOutputSize(const TString& res, bool enableCompession) {
+ TTestHttpServer serverImpl(res);
+ TPortManager pm;
+
+ const ui16 port = pm.GetPort();
+ THttpServer server(&serverImpl,
+ THttpServer::TOptions(port)
+ .EnableKeepAlive(true)
+ .EnableCompression(enableCompession));
+ UNIT_ASSERT(server.Start());
+
+ TNetworkAddress addr("localhost", port);
+ TSocket s(addr);
+
+ {
+ TSocketOutput so(s);
+ THttpOutput out(&so);
+ out << "GET / HTTP/1.1\r\n"
+ "Host: www.yandex.ru\r\n"
+ "Connection: Keep-Alive\r\n"
+ "Accept-Encoding: gzip\r\n"
+ "\r\n";
+ out.Finish();
+ }
+
+ TSocketInput si(s);
+ THttpInput input(&si);
+
+ unsigned httpCode = ParseHttpRetCode(input.FirstLine());
+ UNIT_ASSERT_VALUES_EQUAL(httpCode, 200u);
+
+ UNIT_ASSERT_VALUES_EQUAL(res, input.ReadAll());
+
+ server.Stop();
+
+ return serverImpl.LastRequestSentSize();
+ }
+
+ Y_UNIT_TEST(TestHttpOutputSize) {
+ TString res = "qqqqqq";
+ UNIT_ASSERT_VALUES_EQUAL(res.size(), DoTestHttpOutputSize(res, false));
+ UNIT_ASSERT_VALUES_UNEQUAL(res.size(), DoTestHttpOutputSize(res, true));
+ }
+} // THttpStreamTest suite
diff --git a/library/cpp/http/io/stream_ut_medium.cpp b/library/cpp/http/io/stream_ut_medium.cpp
new file mode 100644
index 0000000000..2c125eb21e
--- /dev/null
+++ b/library/cpp/http/io/stream_ut_medium.cpp
@@ -0,0 +1,54 @@
+#include "stream.h"
+#include <library/cpp/testing/unittest/registar.h>
+#include <util/stream/zlib.h>
+
+Y_UNIT_TEST_SUITE(THttpTestMedium) {
+ Y_UNIT_TEST(TestCodings2) {
+ TStringBuf data = "aaaaaaaaaaaaaaaaaaaaaaa";
+
+ for (auto codec : SupportedCodings()) {
+ if (codec == TStringBuf("z-zlib-0")) {
+ continue;
+ }
+
+ if (codec == TStringBuf("z-null")) {
+ continue;
+ }
+
+ TString s;
+
+ {
+ TStringOutput so(s);
+ THttpOutput ho(&so);
+ TBufferedOutput bo(&ho, 10000);
+
+ bo << "HTTP/1.1 200 Ok\r\n"
+ << "Connection: close\r\n"
+ << "Content-Encoding: " << codec << "\r\n\r\n";
+
+ for (size_t i = 0; i < 100; ++i) {
+ bo << data;
+ }
+ }
+
+ try {
+ UNIT_ASSERT(s.size() > 10);
+ UNIT_ASSERT(s.find(data) == TString::npos);
+ } catch (...) {
+ Cerr << codec << " " << s << Endl;
+
+ throw;
+ }
+
+ {
+ TStringInput si(s);
+ THttpInput hi(&si);
+
+ auto res = hi.ReadAll();
+
+ UNIT_ASSERT(res.find(data) == 0);
+ }
+ }
+ }
+
+} // THttpTestMedium suite
diff --git a/library/cpp/http/io/ut/medium/ya.make b/library/cpp/http/io/ut/medium/ya.make
new file mode 100644
index 0000000000..235a23dcd7
--- /dev/null
+++ b/library/cpp/http/io/ut/medium/ya.make
@@ -0,0 +1,11 @@
+UNITTEST_FOR(library/cpp/http/io)
+
+SIZE(MEDIUM)
+
+OWNER(g:util)
+
+SRCS(
+ stream_ut_medium.cpp
+)
+
+END()
diff --git a/library/cpp/http/io/ut/ya.make b/library/cpp/http/io/ut/ya.make
new file mode 100644
index 0000000000..84f6949db3
--- /dev/null
+++ b/library/cpp/http/io/ut/ya.make
@@ -0,0 +1,16 @@
+UNITTEST_FOR(library/cpp/http/io)
+
+OWNER(g:util)
+
+PEERDIR(
+ library/cpp/http/server
+)
+
+SRCS(
+ chunk_ut.cpp
+ compression_ut.cpp
+ headers_ut.cpp
+ stream_ut.cpp
+)
+
+END()
diff --git a/library/cpp/http/io/ya.make b/library/cpp/http/io/ya.make
new file mode 100644
index 0000000000..dcfbd79885
--- /dev/null
+++ b/library/cpp/http/io/ya.make
@@ -0,0 +1,22 @@
+LIBRARY()
+
+OWNER(
+ g:util
+ mvel
+)
+
+PEERDIR(
+ library/cpp/blockcodecs
+ library/cpp/streams/brotli
+ library/cpp/streams/bzip2
+ library/cpp/streams/lzma
+)
+
+SRCS(
+ chunk.cpp
+ compression.cpp
+ headers.cpp
+ stream.cpp
+)
+
+END()