aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/http/io/stream.cpp
diff options
context:
space:
mode:
authorAnton Samokhvalov <pg83@yandex.ru>2022-02-10 16:45:15 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:15 +0300
commit72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch)
treeda2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /library/cpp/http/io/stream.cpp
parent778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff)
downloadydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/http/io/stream.cpp')
-rw-r--r--library/cpp/http/io/stream.cpp1302
1 files changed, 651 insertions, 651 deletions
diff --git a/library/cpp/http/io/stream.cpp b/library/cpp/http/io/stream.cpp
index 6689be684f..c38faffe0b 100644
--- a/library/cpp/http/io/stream.cpp
+++ b/library/cpp/http/io/stream.cpp
@@ -1,156 +1,156 @@
-#include "stream.h"
+#include "stream.h"
#include "compression.h"
-#include "chunk.h"
-
+#include "chunk.h"
+
#include <util/stream/buffered.h>
-#include <util/stream/length.h>
+#include <util/stream/length.h>
#include <util/stream/multi.h>
#include <util/stream/null.h>
#include <util/stream/tee.h>
-
+
#include <util/system/compat.h>
-#include <util/system/yassert.h>
-
-#include <util/network/socket.h>
-
-#include <util/string/cast.h>
+#include <util/system/yassert.h>
+
+#include <util/network/socket.h>
+
+#include <util/string/cast.h>
#include <util/string/strip.h>
-
+
#include <util/generic/string.h>
-#include <util/generic/utility.h>
-#include <util/generic/hash_set.h>
-#include <util/generic/yexception.h>
-
-#define HEADERCMP(header, str) \
- case sizeof(str) - 1: \
+#include <util/generic/utility.h>
+#include <util/generic/hash_set.h>
+#include <util/generic/yexception.h>
+
+#define HEADERCMP(header, str) \
+ case sizeof(str) - 1: \
if (!stricmp((header).Name().data(), str))
-namespace {
+namespace {
inline size_t SuggestBufferSize() {
- return 8192;
- }
-
+ return 8192;
+ }
+
inline TStringBuf Trim(const char* b, const char* e) noexcept {
- return StripString(TStringBuf(b, e));
- }
-
+ return StripString(TStringBuf(b, e));
+ }
+
inline TStringBuf RmSemiColon(const TStringBuf& s) {
- return s.Before(';');
- }
-
- template <class T, size_t N>
- class TStreams: private TNonCopyable {
- struct TDelete {
- inline void operator()(T* t) noexcept {
- delete t;
- }
- };
-
- typedef T* TPtr;
-
- public:
- inline TStreams() noexcept
- : Beg_(T_ + N)
- {
- }
-
- inline ~TStreams() {
- TDelete f;
-
- ForEach(f);
- }
-
- template <class S>
- inline S* Add(S* t) noexcept {
- return (S*)AddImpl((T*)t);
- }
-
- template <class Functor>
- inline void ForEach(Functor& f) {
- const TPtr* end = T_ + N;
-
- for (TPtr* cur = Beg_; cur != end; ++cur) {
- f(*cur);
- }
- }
-
- TPtr Top() {
- const TPtr* end = T_ + N;
- return end == Beg_ ? nullptr : *Beg_;
- }
-
- private:
- inline T* AddImpl(T* t) noexcept {
- Y_ASSERT(Beg_ > T_);
-
- return (*--Beg_ = t);
- }
-
- private:
- TPtr T_[N];
- TPtr* Beg_;
- };
-
- template <class TStream>
- class TLazy: public IOutputStream {
- public:
- TLazy(IOutputStream* out, ui16 bs)
- : Output_(out)
- , BlockSize_(bs)
- {
- }
-
- void DoWrite(const void* buf, size_t len) override {
- ConstructSlave();
- Slave_->Write(buf, len);
- }
-
- void DoFlush() override {
- ConstructSlave();
- Slave_->Flush();
- }
-
- void DoFinish() override {
- ConstructSlave();
- Slave_->Finish();
- }
-
- private:
- inline void ConstructSlave() {
- if (!Slave_) {
- Slave_.Reset(new TStream(Output_, BlockSize_));
- }
- }
-
- private:
- IOutputStream* Output_;
- ui16 BlockSize_;
- THolder<IOutputStream> Slave_;
- };
-}
-
-class THttpInput::TImpl {
+ return s.Before(';');
+ }
+
+ template <class T, size_t N>
+ class TStreams: private TNonCopyable {
+ struct TDelete {
+ inline void operator()(T* t) noexcept {
+ delete t;
+ }
+ };
+
+ typedef T* TPtr;
+
+ public:
+ inline TStreams() noexcept
+ : Beg_(T_ + N)
+ {
+ }
+
+ inline ~TStreams() {
+ TDelete f;
+
+ ForEach(f);
+ }
+
+ template <class S>
+ inline S* Add(S* t) noexcept {
+ return (S*)AddImpl((T*)t);
+ }
+
+ template <class Functor>
+ inline void ForEach(Functor& f) {
+ const TPtr* end = T_ + N;
+
+ for (TPtr* cur = Beg_; cur != end; ++cur) {
+ f(*cur);
+ }
+ }
+
+ TPtr Top() {
+ const TPtr* end = T_ + N;
+ return end == Beg_ ? nullptr : *Beg_;
+ }
+
+ private:
+ inline T* AddImpl(T* t) noexcept {
+ Y_ASSERT(Beg_ > T_);
+
+ return (*--Beg_ = t);
+ }
+
+ private:
+ TPtr T_[N];
+ TPtr* Beg_;
+ };
+
+ template <class TStream>
+ class TLazy: public IOutputStream {
+ public:
+ TLazy(IOutputStream* out, ui16 bs)
+ : Output_(out)
+ , BlockSize_(bs)
+ {
+ }
+
+ void DoWrite(const void* buf, size_t len) override {
+ ConstructSlave();
+ Slave_->Write(buf, len);
+ }
+
+ void DoFlush() override {
+ ConstructSlave();
+ Slave_->Flush();
+ }
+
+ void DoFinish() override {
+ ConstructSlave();
+ Slave_->Finish();
+ }
+
+ private:
+ inline void ConstructSlave() {
+ if (!Slave_) {
+ Slave_.Reset(new TStream(Output_, BlockSize_));
+ }
+ }
+
+ private:
+ IOutputStream* Output_;
+ ui16 BlockSize_;
+ THolder<IOutputStream> Slave_;
+ };
+}
+
+class THttpInput::TImpl {
typedef THashSet<TString> TAcceptCodings;
-
-public:
+
+public:
inline TImpl(IInputStream* slave)
- : Slave_(slave)
- , Buffered_(Slave_, SuggestBufferSize())
+ : Slave_(slave)
+ , Buffered_(Slave_, SuggestBufferSize())
, ChunkedInput_(nullptr)
, Input_(nullptr)
, FirstLine_(ReadFirstLine(Buffered_))
- , Headers_(&Buffered_)
- , KeepAlive_(false)
- , HasContentLength_(false)
- , ContentLength_(0)
- , ContentEncoded_(false)
+ , Headers_(&Buffered_)
+ , KeepAlive_(false)
+ , HasContentLength_(false)
+ , ContentLength_(0)
+ , ContentEncoded_(false)
, Expect100Continue_(false)
- {
- BuildInputChain();
+ {
+ BuildInputChain();
Y_ASSERT(Input_);
- }
-
+ }
+
static TString ReadFirstLine(TBufferedInput& in) {
TString s;
Y_ENSURE_EX(in.ReadLine(s), THttpReadException() << "Failed to get first line");
@@ -158,48 +158,48 @@ public:
}
inline ~TImpl() {
- }
-
- inline size_t Read(void* buf, size_t len) {
+ }
+
+ inline size_t Read(void* buf, size_t len) {
return Perform(len, [this, buf](size_t toRead) { return Input_->Read(buf, toRead); });
- }
+ }
inline size_t Skip(size_t len) {
return Perform(len, [this](size_t toSkip) { return Input_->Skip(toSkip); });
- }
-
+ }
+
inline const TString& FirstLine() const noexcept {
- return FirstLine_;
- }
-
+ return FirstLine_;
+ }
+
inline const THttpHeaders& Headers() const noexcept {
- return Headers_;
- }
-
+ return Headers_;
+ }
+
inline const TMaybe<THttpHeaders>& Trailers() const noexcept {
return Trailers_;
}
inline bool IsKeepAlive() const noexcept {
- return KeepAlive_;
- }
-
+ return KeepAlive_;
+ }
+
inline bool AcceptEncoding(const TString& s) const {
- return Codings_.find(to_lower(s)) != Codings_.end();
- }
-
+ return Codings_.find(to_lower(s)) != Codings_.end();
+ }
+
inline bool GetContentLength(ui64& value) const noexcept {
- if (HasContentLength_) {
- value = ContentLength_;
- return true;
+ if (HasContentLength_) {
+ value = ContentLength_;
+ return true;
}
- return false;
- }
+ return false;
+ }
inline bool ContentEncoded() const noexcept {
- return ContentEncoded_;
- }
-
+ return ContentEncoded_;
+ }
+
inline bool HasContent() const noexcept {
return HasContentLength_ || ChunkedInput_;
}
@@ -208,8 +208,8 @@ public:
return Expect100Continue_;
}
-private:
- template <class Operation>
+private:
+ template <class Operation>
inline size_t Perform(size_t len, const Operation& operation) {
size_t processed = operation(len);
if (processed == 0 && len > 0) {
@@ -227,52 +227,52 @@ private:
return processed;
}
- struct TParsedHeaders {
+ struct TParsedHeaders {
bool Chunked = false;
bool KeepAlive = false;
- TStringBuf LZipped;
- };
-
- struct TTrEnc {
- inline void operator()(const TStringBuf& s) {
+ TStringBuf LZipped;
+ };
+
+ struct TTrEnc {
+ inline void operator()(const TStringBuf& s) {
if (s == TStringBuf("chunked")) {
p->Chunked = true;
- }
- }
-
- TParsedHeaders* p;
- };
-
- struct TAccCoding {
- inline void operator()(const TStringBuf& s) {
- c->insert(ToString(s));
- }
-
- TAcceptCodings* c;
- };
-
- template <class Functor>
+ }
+ }
+
+ TParsedHeaders* p;
+ };
+
+ struct TAccCoding {
+ inline void operator()(const TStringBuf& s) {
+ c->insert(ToString(s));
+ }
+
+ TAcceptCodings* c;
+ };
+
+ template <class Functor>
inline void ForEach(TString in, Functor& f) {
- in.to_lower();
-
- const char* b = in.begin();
- const char* c = b;
- const char* e = in.end();
-
- while (c != e) {
- if (*c == ',') {
- f(RmSemiColon(Trim(b, c)));
- b = c + 1;
- }
-
- ++c;
- }
-
- if (b != c) {
- f(RmSemiColon(Trim(b, c)));
- }
- }
-
+ in.to_lower();
+
+ const char* b = in.begin();
+ const char* c = b;
+ const char* e = in.end();
+
+ while (c != e) {
+ if (*c == ',') {
+ f(RmSemiColon(Trim(b, c)));
+ b = c + 1;
+ }
+
+ ++c;
+ }
+
+ if (b != c) {
+ f(RmSemiColon(Trim(b, c)));
+ }
+ }
+
inline bool IsRequest() const {
return strnicmp(FirstLine().data(), "get", 3) == 0 ||
strnicmp(FirstLine().data(), "post", 4) == 0 ||
@@ -282,47 +282,47 @@ private:
strnicmp(FirstLine().data(), "delete", 6) == 0;
}
- inline void BuildInputChain() {
- TParsedHeaders p;
+ inline void BuildInputChain() {
+ TParsedHeaders p;
- size_t pos = FirstLine_.rfind(' ');
+ size_t pos = FirstLine_.rfind(' ');
// In HTTP/1.1 Keep-Alive is turned on by default
if (pos != TString::npos && strcmp(FirstLine_.c_str() + pos + 1, "HTTP/1.1") == 0) {
- p.KeepAlive = true; //request
+ p.KeepAlive = true; //request
} else if (strnicmp(FirstLine_.data(), "HTTP/1.1", 8) == 0) {
- p.KeepAlive = true; //reply
- }
-
- for (THttpHeaders::TConstIterator h = Headers_.Begin(); h != Headers_.End(); ++h) {
- const THttpInputHeader& header = *h;
+ p.KeepAlive = true; //reply
+ }
+
+ for (THttpHeaders::TConstIterator h = Headers_.Begin(); h != Headers_.End(); ++h) {
+ const THttpInputHeader& header = *h;
switch (header.Name().size()) {
- HEADERCMP(header, "transfer-encoding") {
- TTrEnc f = {&p};
- ForEach(header.Value(), f);
- }
- break;
- HEADERCMP(header, "content-encoding") {
- p.LZipped = header.Value();
- }
- break;
- HEADERCMP(header, "accept-encoding") {
- TAccCoding f = {&Codings_};
- ForEach(header.Value(), f);
- }
- break;
- HEADERCMP(header, "content-length") {
- HasContentLength_ = true;
- ContentLength_ = FromString(header.Value());
- }
- break;
- HEADERCMP(header, "connection") {
- // accept header "Connection: Keep-Alive, TE"
+ HEADERCMP(header, "transfer-encoding") {
+ TTrEnc f = {&p};
+ ForEach(header.Value(), f);
+ }
+ break;
+ HEADERCMP(header, "content-encoding") {
+ p.LZipped = header.Value();
+ }
+ break;
+ HEADERCMP(header, "accept-encoding") {
+ TAccCoding f = {&Codings_};
+ ForEach(header.Value(), f);
+ }
+ break;
+ HEADERCMP(header, "content-length") {
+ HasContentLength_ = true;
+ ContentLength_ = FromString(header.Value());
+ }
+ break;
+ HEADERCMP(header, "connection") {
+ // accept header "Connection: Keep-Alive, TE"
if (strnicmp(header.Value().data(), "keep-alive", 10) == 0) {
p.KeepAlive = true;
} else if (stricmp(header.Value().data(), "close") == 0) {
p.KeepAlive = false;
}
- }
+ }
[[fallthrough]];
HEADERCMP(header, "expect") {
auto findContinue = [&](const TStringBuf& s) {
@@ -332,101 +332,101 @@ private:
};
ForEach(header.Value(), findContinue);
}
- break;
- }
- }
-
+ break;
+ }
+ }
+
if (p.Chunked) {
ChunkedInput_ = Streams_.Add(new TChunkedInput(&Buffered_, &Trailers_));
Input_ = ChunkedInput_;
- } else {
+ } else {
// disable buffering
- Buffered_.Reset(&Cnull);
- Input_ = Streams_.Add(new TMultiInput(&Buffered_, Slave_));
-
+ Buffered_.Reset(&Cnull);
+ Input_ = Streams_.Add(new TMultiInput(&Buffered_, Slave_));
+
if (IsRequest() || HasContentLength_) {
- /*
- * TODO - we have other cases
- */
- Input_ = Streams_.Add(new TLengthLimitedInput(Input_, ContentLength_));
- }
- }
-
+ /*
+ * TODO - we have other cases
+ */
+ Input_ = Streams_.Add(new TLengthLimitedInput(Input_, ContentLength_));
+ }
+ }
+
if (auto decoder = TCompressionCodecFactory::Instance().FindDecoder(p.LZipped)) {
- ContentEncoded_ = true;
+ ContentEncoded_ = true;
Input_ = Streams_.Add((*decoder)(Input_).Release());
- }
-
+ }
+
KeepAlive_ = p.KeepAlive;
- }
-
-private:
+ }
+
+private:
IInputStream* Slave_;
-
- /*
- * input helpers
- */
- TBufferedInput Buffered_;
+
+ /*
+ * input helpers
+ */
+ TBufferedInput Buffered_;
TStreams<IInputStream, 8> Streams_;
IInputStream* ChunkedInput_;
-
- /*
- * final input stream
- */
+
+ /*
+ * final input stream
+ */
IInputStream* Input_;
-
+
TString FirstLine_;
- THttpHeaders Headers_;
+ THttpHeaders Headers_;
TMaybe<THttpHeaders> Trailers_;
- bool KeepAlive_;
+ bool KeepAlive_;
+
+ TAcceptCodings Codings_;
- TAcceptCodings Codings_;
+ bool HasContentLength_;
+ ui64 ContentLength_;
- bool HasContentLength_;
- ui64 ContentLength_;
-
- bool ContentEncoded_;
+ bool ContentEncoded_;
bool Expect100Continue_;
-};
-
+};
+
THttpInput::THttpInput(IInputStream* slave)
- : Impl_(new TImpl(slave))
-{
-}
-
+ : Impl_(new TImpl(slave))
+{
+}
+
THttpInput::THttpInput(THttpInput&& httpInput) = default;
THttpInput::~THttpInput() {
-}
-
-size_t THttpInput::DoRead(void* buf, size_t len) {
- return Impl_->Read(buf, len);
-}
-
+}
+
+size_t THttpInput::DoRead(void* buf, size_t len) {
+ return Impl_->Read(buf, len);
+}
+
size_t THttpInput::DoSkip(size_t len) {
return Impl_->Skip(len);
}
const THttpHeaders& THttpInput::Headers() const noexcept {
- return Impl_->Headers();
-}
-
+ return Impl_->Headers();
+}
+
const TMaybe<THttpHeaders>& THttpInput::Trailers() const noexcept {
return Impl_->Trailers();
}
const TString& THttpInput::FirstLine() const noexcept {
- return Impl_->FirstLine();
-}
-
+ return Impl_->FirstLine();
+}
+
bool THttpInput::IsKeepAlive() const noexcept {
- return Impl_->IsKeepAlive();
-}
-
+ return Impl_->IsKeepAlive();
+}
+
bool THttpInput::AcceptEncoding(const TString& coding) const {
- return Impl_->AcceptEncoding(coding);
-}
-
+ return Impl_->AcceptEncoding(coding);
+}
+
TString THttpInput::BestCompressionScheme(TArrayRef<const TStringBuf> codings) const {
return NHttp::ChooseBestCompressionScheme(
[this](const TString& coding) {
@@ -434,8 +434,8 @@ TString THttpInput::BestCompressionScheme(TArrayRef<const TStringBuf> codings) c
},
codings
);
-}
-
+}
+
TString THttpInput::BestCompressionScheme() const {
return BestCompressionScheme(TCompressionCodecFactory::Instance().GetBestCodecs());
}
@@ -456,141 +456,141 @@ bool THttpInput::HasExpect100Continue() const noexcept {
return Impl_->HasExpect100Continue();
}
-class THttpOutput::TImpl {
+class THttpOutput::TImpl {
class TSizeCalculator: public IOutputStream {
- public:
+ public:
inline TSizeCalculator() noexcept {
- }
-
+ }
+
~TSizeCalculator() override {
- }
-
+ }
+
void DoWrite(const void* /*buf*/, size_t len) override {
- Length_ += len;
- }
-
+ Length_ += len;
+ }
+
inline size_t Length() const noexcept {
- return Length_;
- }
-
- private:
+ return Length_;
+ }
+
+ private:
size_t Length_ = 0;
- };
-
- enum TState {
- Begin = 0,
- FirstLineSent = 1,
- HeadersSent = 2
- };
-
- struct TFlush {
+ };
+
+ enum TState {
+ Begin = 0,
+ FirstLineSent = 1,
+ HeadersSent = 2
+ };
+
+ struct TFlush {
inline void operator()(IOutputStream* s) {
- s->Flush();
- }
- };
-
- struct TFinish {
+ s->Flush();
+ }
+ };
+
+ struct TFinish {
inline void operator()(IOutputStream* s) {
- s->Finish();
- }
- };
-
-public:
+ s->Finish();
+ }
+ };
+
+public:
inline TImpl(IOutputStream* slave, THttpInput* request)
- : Slave_(slave)
- , State_(Begin)
- , Output_(Slave_)
- , Request_(request)
- , Version_(1100)
- , KeepAliveEnabled_(false)
+ : Slave_(slave)
+ , State_(Begin)
+ , Output_(Slave_)
+ , Request_(request)
+ , Version_(1100)
+ , KeepAliveEnabled_(false)
, BodyEncodingEnabled_(true)
, CompressionHeaderEnabled_(true)
- , Finished_(false)
- {
- }
-
+ , Finished_(false)
+ {
+ }
+
inline ~TImpl() {
- }
-
+ }
+
inline void SendContinue() {
Output_->Write("HTTP/1.1 100 Continue\r\n\r\n");
Output_->Flush();
}
- inline void Write(const void* buf, size_t len) {
- if (Finished_) {
- ythrow THttpException() << "can not write to finished stream";
- }
-
- if (State_ == HeadersSent) {
- Output_->Write(buf, len);
-
- return;
- }
-
- const char* b = (const char*)buf;
- const char* e = b + len;
- const char* c = b;
-
- while (c != e) {
- if (*c == '\n') {
- Line_.append(b, c);
-
+ inline void Write(const void* buf, size_t len) {
+ if (Finished_) {
+ ythrow THttpException() << "can not write to finished stream";
+ }
+
+ if (State_ == HeadersSent) {
+ Output_->Write(buf, len);
+
+ return;
+ }
+
+ const char* b = (const char*)buf;
+ const char* e = b + len;
+ const char* c = b;
+
+ while (c != e) {
+ if (*c == '\n') {
+ Line_.append(b, c);
+
if (!Line_.empty() && Line_.back() == '\r') {
- Line_.pop_back();
- }
-
- b = c + 1;
-
- Process(Line_);
-
- if (State_ == HeadersSent) {
- Output_->Write(b, e - b);
-
- return;
- }
-
- Line_.clear();
- }
-
- ++c;
- }
-
- if (b != c) {
- Line_.append(b, c);
- }
- }
-
- inline void Flush() {
- TFlush f;
- Streams_.ForEach(f);
+ Line_.pop_back();
+ }
+
+ b = c + 1;
+
+ Process(Line_);
+
+ if (State_ == HeadersSent) {
+ Output_->Write(b, e - b);
+
+ return;
+ }
+
+ Line_.clear();
+ }
+
+ ++c;
+ }
+
+ if (b != c) {
+ Line_.append(b, c);
+ }
+ }
+
+ inline void Flush() {
+ TFlush f;
+ Streams_.ForEach(f);
Slave_->Flush(); // see SEARCH-1030
- }
-
- inline void Finish() {
- if (Finished_) {
- return;
- }
-
- TFinish f;
- Streams_.ForEach(f);
+ }
+
+ inline void Finish() {
+ if (Finished_) {
+ return;
+ }
+
+ TFinish f;
+ Streams_.ForEach(f);
Slave_->Finish(); // see SEARCH-1030
-
- Finished_ = true;
- }
-
+
+ Finished_ = true;
+ }
+
inline const THttpHeaders& SentHeaders() const noexcept {
- return Headers_;
- }
-
+ return Headers_;
+ }
+
inline void EnableCompression(TArrayRef<const TStringBuf> schemas) {
- ComprSchemas_ = schemas;
- }
-
- inline void EnableKeepAlive(bool enable) {
- KeepAliveEnabled_ = enable;
- }
-
+ ComprSchemas_ = schemas;
+ }
+
+ inline void EnableKeepAlive(bool enable) {
+ KeepAliveEnabled_ = enable;
+ }
+
inline void EnableBodyEncoding(bool enable) {
BodyEncodingEnabled_ = enable;
}
@@ -601,12 +601,12 @@ public:
inline bool IsCompressionEnabled() const noexcept {
return !ComprSchemas_.empty();
- }
-
+ }
+
inline bool IsKeepAliveEnabled() const noexcept {
- return KeepAliveEnabled_;
- }
-
+ return KeepAliveEnabled_;
+ }
+
inline bool IsBodyEncodingEnabled() const noexcept {
return BodyEncodingEnabled_;
}
@@ -616,9 +616,9 @@ public:
}
inline bool CanBeKeepAlive() const noexcept {
- return SupportChunkedTransfer() && IsKeepAliveEnabled() && (Request_ ? Request_->IsKeepAlive() : true);
- }
-
+ return SupportChunkedTransfer() && IsKeepAliveEnabled() && (Request_ ? Request_->IsKeepAlive() : true);
+ }
+
inline const TString& FirstLine() const noexcept {
return FirstLine_;
}
@@ -627,18 +627,18 @@ public:
return SizeCalculator_.Length();
}
-private:
+private:
static inline bool IsResponse(const TString& s) noexcept {
return strnicmp(s.data(), "HTTP/", 5) == 0;
- }
-
+ }
+
static inline bool IsRequest(const TString& s) noexcept {
- return !IsResponse(s);
- }
-
+ return !IsResponse(s);
+ }
+
inline bool IsHttpRequest() const noexcept {
- return IsRequest(FirstLine_);
- }
+ return IsRequest(FirstLine_);
+ }
inline bool HasResponseBody() const noexcept {
if (IsHttpResponse()) {
@@ -652,169 +652,169 @@ private:
}
inline bool IsHttpResponse() const noexcept {
- return IsResponse(FirstLine_);
- }
-
+ return IsResponse(FirstLine_);
+ }
+
inline bool HasRequestBody() const noexcept {
return strnicmp(FirstLine_.data(), "POST", 4) == 0 ||
strnicmp(FirstLine_.data(), "PATCH", 5) == 0 ||
strnicmp(FirstLine_.data(), "PUT", 3) == 0;
- }
+ }
static inline size_t ParseHttpVersion(const TString& s) {
- if (s.empty()) {
- ythrow THttpParseException() << "malformed http stream";
+ if (s.empty()) {
+ ythrow THttpParseException() << "malformed http stream";
}
-
- size_t parsed_version = 0;
-
- if (IsResponse(s)) {
+
+ size_t parsed_version = 0;
+
+ if (IsResponse(s)) {
const char* b = s.data() + 5;
-
- while (*b && *b != ' ') {
- if (*b != '.') {
- parsed_version *= 10;
- parsed_version += (*b - '0');
- }
-
- ++b;
- }
- } else {
- /*
- * s not empty here
- */
- const char* e = s.end() - 1;
- const char* b = s.begin();
- size_t mult = 1;
-
- while (e != b && *e != '/') {
- if (*e != '.') {
- parsed_version += (*e - '0') * mult;
- mult *= 10;
- }
-
- --e;
- }
- }
-
- return parsed_version * 100;
- }
-
- inline void ParseHttpVersion() {
- size_t parsed_version = ParseHttpVersion(FirstLine_);
-
- if (Request_) {
- parsed_version = Min(parsed_version, ParseHttpVersion(Request_->FirstLine()));
- }
-
- Version_ = parsed_version;
- }
-
+
+ while (*b && *b != ' ') {
+ if (*b != '.') {
+ parsed_version *= 10;
+ parsed_version += (*b - '0');
+ }
+
+ ++b;
+ }
+ } else {
+ /*
+ * s not empty here
+ */
+ const char* e = s.end() - 1;
+ const char* b = s.begin();
+ size_t mult = 1;
+
+ while (e != b && *e != '/') {
+ if (*e != '.') {
+ parsed_version += (*e - '0') * mult;
+ mult *= 10;
+ }
+
+ --e;
+ }
+ }
+
+ return parsed_version * 100;
+ }
+
+ inline void ParseHttpVersion() {
+ size_t parsed_version = ParseHttpVersion(FirstLine_);
+
+ if (Request_) {
+ parsed_version = Min(parsed_version, ParseHttpVersion(Request_->FirstLine()));
+ }
+
+ Version_ = parsed_version;
+ }
+
inline void Process(const TString& s) {
Y_ASSERT(State_ != HeadersSent);
-
- if (State_ == Begin) {
- FirstLine_ = s;
- ParseHttpVersion();
- State_ = FirstLineSent;
- } else {
- if (s.empty()) {
- BuildOutputStream();
- WriteCached();
- State_ = HeadersSent;
- } else {
+
+ if (State_ == Begin) {
+ FirstLine_ = s;
+ ParseHttpVersion();
+ State_ = FirstLineSent;
+ } else {
+ if (s.empty()) {
+ BuildOutputStream();
+ WriteCached();
+ State_ = HeadersSent;
+ } else {
AddHeader(THttpInputHeader(s));
- }
- }
- }
-
+ }
+ }
+ }
+
inline void WriteCachedImpl(IOutputStream* s) const {
s->Write(FirstLine_.data(), FirstLine_.size());
- s->Write("\r\n", 2);
- Headers_.OutTo(s);
- s->Write("\r\n", 2);
- s->Finish();
- }
-
- inline void WriteCached() {
- size_t buflen = 0;
-
- {
- TSizeCalculator out;
-
- WriteCachedImpl(&out);
- buflen = out.Length();
- }
-
- {
- TBufferedOutput out(Slave_, buflen);
-
- WriteCachedImpl(&out);
- }
-
- if (IsHttpRequest() && !HasRequestBody()) {
- /*
- * if this is http request, then send it now
- */
-
- Slave_->Flush();
- }
- }
-
+ s->Write("\r\n", 2);
+ Headers_.OutTo(s);
+ s->Write("\r\n", 2);
+ s->Finish();
+ }
+
+ inline void WriteCached() {
+ size_t buflen = 0;
+
+ {
+ TSizeCalculator out;
+
+ WriteCachedImpl(&out);
+ buflen = out.Length();
+ }
+
+ {
+ TBufferedOutput out(Slave_, buflen);
+
+ WriteCachedImpl(&out);
+ }
+
+ if (IsHttpRequest() && !HasRequestBody()) {
+ /*
+ * if this is http request, then send it now
+ */
+
+ Slave_->Flush();
+ }
+ }
+
inline bool SupportChunkedTransfer() const noexcept {
- return Version_ >= 1100;
- }
-
- inline void BuildOutputStream() {
- if (CanBeKeepAlive()) {
- AddOrReplaceHeader(THttpInputHeader("Connection", "Keep-Alive"));
- } else {
- AddOrReplaceHeader(THttpInputHeader("Connection", "Close"));
- }
-
- if (IsHttpResponse()) {
+ return Version_ >= 1100;
+ }
+
+ inline void BuildOutputStream() {
+ if (CanBeKeepAlive()) {
+ AddOrReplaceHeader(THttpInputHeader("Connection", "Keep-Alive"));
+ } else {
+ AddOrReplaceHeader(THttpInputHeader("Connection", "Close"));
+ }
+
+ if (IsHttpResponse()) {
if (Request_ && IsCompressionEnabled() && HasResponseBody()) {
TString scheme = Request_->BestCompressionScheme(ComprSchemas_);
- if (scheme != "identity") {
- AddOrReplaceHeader(THttpInputHeader("Content-Encoding", scheme));
+ if (scheme != "identity") {
+ AddOrReplaceHeader(THttpInputHeader("Content-Encoding", scheme));
RemoveHeader("Content-Length");
- }
- }
-
- RebuildStream();
- } else {
- if (IsCompressionEnabled()) {
- AddOrReplaceHeader(THttpInputHeader("Accept-Encoding", BuildAcceptEncoding()));
- }
- if (HasRequestBody()) {
- RebuildStream();
- }
- }
- }
-
+ }
+ }
+
+ RebuildStream();
+ } else {
+ if (IsCompressionEnabled()) {
+ AddOrReplaceHeader(THttpInputHeader("Accept-Encoding", BuildAcceptEncoding()));
+ }
+ if (HasRequestBody()) {
+ RebuildStream();
+ }
+ }
+ }
+
inline TString BuildAcceptEncoding() const {
TString ret;
-
+
for (const auto& coding : ComprSchemas_) {
- if (ret) {
- ret += ", ";
- }
-
+ if (ret) {
+ ret += ", ";
+ }
+
ret += coding;
- }
-
- return ret;
- }
+ }
+
+ return ret;
+ }
- inline void RebuildStream() {
+ inline void RebuildStream() {
bool keepAlive = false;
const TCompressionCodecFactory::TEncoderConstructor* encoder = nullptr;
- bool chunked = false;
+ bool chunked = false;
bool haveContentLength = false;
-
- for (THttpHeaders::TConstIterator h = Headers_.Begin(); h != Headers_.End(); ++h) {
- const THttpInputHeader& header = *h;
+
+ for (THttpHeaders::TConstIterator h = Headers_.Begin(); h != Headers_.End(); ++h) {
+ const THttpInputHeader& header = *h;
const TString hl = to_lower(header.Name());
-
+
if (hl == TStringBuf("connection")) {
keepAlive = to_lower(header.Value()) == TStringBuf("keep-alive");
} else if (IsCompressionHeaderEnabled() && hl == TStringBuf("content-encoding")) {
@@ -823,109 +823,109 @@ private:
chunked = to_lower(header.Value()) == TStringBuf("chunked");
} else if (hl == TStringBuf("content-length")) {
haveContentLength = true;
- }
- }
-
+ }
+ }
+
if (!haveContentLength && !chunked && (IsHttpRequest() || HasResponseBody()) && SupportChunkedTransfer() && (keepAlive || encoder || IsHttpRequest())) {
- AddHeader(THttpInputHeader("Transfer-Encoding", "chunked"));
- chunked = true;
- }
-
+ AddHeader(THttpInputHeader("Transfer-Encoding", "chunked"));
+ chunked = true;
+ }
+
if (IsBodyEncodingEnabled() && chunked) {
- Output_ = Streams_.Add(new TChunkedOutput(Output_));
- }
-
+ Output_ = Streams_.Add(new TChunkedOutput(Output_));
+ }
+
Output_ = Streams_.Add(new TTeeOutput(Output_, &SizeCalculator_));
if (IsBodyEncodingEnabled() && encoder) {
Output_ = Streams_.Add((*encoder)(Output_).Release());
- }
- }
-
- inline void AddHeader(const THttpInputHeader& hdr) {
- Headers_.AddHeader(hdr);
- }
-
- inline void AddOrReplaceHeader(const THttpInputHeader& hdr) {
- Headers_.AddOrReplaceHeader(hdr);
- }
-
+ }
+ }
+
+ inline void AddHeader(const THttpInputHeader& hdr) {
+ Headers_.AddHeader(hdr);
+ }
+
+ inline void AddOrReplaceHeader(const THttpInputHeader& hdr) {
+ Headers_.AddOrReplaceHeader(hdr);
+ }
+
inline void RemoveHeader(const TString& hdr) {
Headers_.RemoveHeader(hdr);
}
-private:
+private:
IOutputStream* Slave_;
- TState State_;
+ TState State_;
IOutputStream* Output_;
TStreams<IOutputStream, 8> Streams_;
TString Line_;
TString FirstLine_;
- THttpHeaders Headers_;
- THttpInput* Request_;
- size_t Version_;
-
+ THttpHeaders Headers_;
+ THttpInput* Request_;
+ size_t Version_;
+
TArrayRef<const TStringBuf> ComprSchemas_;
-
- bool KeepAliveEnabled_;
+
+ bool KeepAliveEnabled_;
bool BodyEncodingEnabled_;
bool CompressionHeaderEnabled_;
- bool Finished_;
+ bool Finished_;
TSizeCalculator SizeCalculator_;
-};
-
+};
+
THttpOutput::THttpOutput(IOutputStream* slave)
: Impl_(new TImpl(slave, nullptr))
-{
-}
-
+{
+}
+
THttpOutput::THttpOutput(IOutputStream* slave, THttpInput* request)
- : Impl_(new TImpl(slave, request))
-{
-}
-
+ : Impl_(new TImpl(slave, request))
+{
+}
+
THttpOutput::~THttpOutput() {
- try {
- Finish();
- } catch (...) {
- }
-}
-
-void THttpOutput::DoWrite(const void* buf, size_t len) {
- Impl_->Write(buf, len);
-}
-
-void THttpOutput::DoFlush() {
- Impl_->Flush();
-}
-
-void THttpOutput::DoFinish() {
- Impl_->Finish();
-}
-
+ try {
+ Finish();
+ } catch (...) {
+ }
+}
+
+void THttpOutput::DoWrite(const void* buf, size_t len) {
+ Impl_->Write(buf, len);
+}
+
+void THttpOutput::DoFlush() {
+ Impl_->Flush();
+}
+
+void THttpOutput::DoFinish() {
+ Impl_->Finish();
+}
+
const THttpHeaders& THttpOutput::SentHeaders() const noexcept {
return Impl_->SentHeaders();
-}
-
-void THttpOutput::EnableCompression(bool enable) {
- if (enable) {
+}
+
+void THttpOutput::EnableCompression(bool enable) {
+ if (enable) {
EnableCompression(TCompressionCodecFactory::Instance().GetBestCodecs());
- } else {
+ } else {
TArrayRef<TStringBuf> codings;
EnableCompression(codings);
- }
-}
-
+ }
+}
+
void THttpOutput::EnableCompression(TArrayRef<const TStringBuf> schemas) {
Impl_->EnableCompression(schemas);
-}
-
-void THttpOutput::EnableKeepAlive(bool enable) {
- Impl_->EnableKeepAlive(enable);
-}
-
+}
+
+void THttpOutput::EnableKeepAlive(bool enable) {
+ Impl_->EnableKeepAlive(enable);
+}
+
void THttpOutput::EnableBodyEncoding(bool enable) {
Impl_->EnableBodyEncoding(enable);
}
@@ -935,25 +935,25 @@ void THttpOutput::EnableCompressionHeader(bool enable) {
}
bool THttpOutput::IsKeepAliveEnabled() const noexcept {
- return Impl_->IsKeepAliveEnabled();
-}
-
+ return Impl_->IsKeepAliveEnabled();
+}
+
bool THttpOutput::IsBodyEncodingEnabled() const noexcept {
return Impl_->IsBodyEncodingEnabled();
}
bool THttpOutput::IsCompressionEnabled() const noexcept {
- return Impl_->IsCompressionEnabled();
-}
-
+ return Impl_->IsCompressionEnabled();
+}
+
bool THttpOutput::IsCompressionHeaderEnabled() const noexcept {
return Impl_->IsCompressionHeaderEnabled();
}
bool THttpOutput::CanBeKeepAlive() const noexcept {
- return Impl_->CanBeKeepAlive();
-}
-
+ return Impl_->CanBeKeepAlive();
+}
+
void THttpOutput::SendContinue() {
Impl_->SendContinue();
}
@@ -966,13 +966,13 @@ size_t THttpOutput::SentSize() const noexcept {
return Impl_->SentSize();
}
-unsigned ParseHttpRetCode(const TStringBuf& ret) {
- const TStringBuf code = StripString(StripString(ret.After(' ')).Before(' '));
-
+unsigned ParseHttpRetCode(const TStringBuf& ret) {
+ const TStringBuf code = StripString(StripString(ret.After(' ')).Before(' '));
+
return FromString<unsigned>(code.data(), code.size());
-}
+}
-void SendMinimalHttpRequest(TSocket& s, const TStringBuf& host, const TStringBuf& request, const TStringBuf& agent, const TStringBuf& from) {
+void SendMinimalHttpRequest(TSocket& s, const TStringBuf& host, const TStringBuf& request, const TStringBuf& agent, const TStringBuf& from) {
TSocketOutput so(s);
THttpOutput output(&so);
@@ -995,11 +995,11 @@ void SendMinimalHttpRequest(TSocket& s, const TStringBuf& host, const TStringBuf
IOutputStream::TPart::CrLf(),
IOutputStream::TPart::CrLf(),
};
-
+
output.Write(parts, sizeof(parts) / sizeof(*parts));
output.Finish();
}
-
+
TArrayRef<const TStringBuf> SupportedCodings() {
return TCompressionCodecFactory::Instance().GetBestCodecs();
-}
+}