#include "http2.h"
#include "conn_cache.h"
#include "details.h"
#include "factory.h"
#include "http_common.h"
#include "smart_ptr.h"
#include "utils.h"
#include <library/cpp/http/push_parser/http_parser.h>
#include <library/cpp/http/misc/httpcodes.h>
#include <library/cpp/http/misc/parsed_request.h>
#include <library/cpp/neh/asio/executor.h>
#include <util/generic/singleton.h>
#include <util/generic/vector.h>
#include <util/network/iovec.h>
#include <util/stream/output.h>
#include <util/stream/zlib.h>
#include <util/system/condvar.h>
#include <util/system/mutex.h>
#include <util/system/spinlock.h>
#include <util/system/yassert.h>
#include <util/thread/factory.h>
#include <util/thread/singleton.h>
#include <util/system/sanitizers.h>
#include <util/system/thread.h>
#include <atomic>
#if defined(_unix_)
#include <sys/ioctl.h>
#endif
#if defined(_linux_)
#undef SIOCGSTAMP
#undef SIOCGSTAMPNS
#include <linux/sockios.h>
#define FIONWRITE SIOCOUTQ
#endif
//#define DEBUG_HTTP2
#ifdef DEBUG_HTTP2
#define DBGOUT(args) Cout << args << Endl;
#else
#define DBGOUT(args)
#endif
using namespace NDns;
using namespace NAsio;
using namespace NNeh;
using namespace NNeh::NHttp;
using namespace NNeh::NHttp2;
using namespace std::placeholders;
//
// has complex keep-alive references between entities in multi-thread enviroment,
// this create risks for races/memory leak, etc..
// so connecting/disconnecting entities must be doing carefully
//
// handler <=-> request <==> connection(socket) <= handlers, stored in io_service
// ^
// +== connections_cache
// '=>' -- shared/intrusive ptr
// '->' -- weak_ptr
//
static TDuration FixTimeoutForSanitizer(const TDuration timeout) {
ui64 multiplier = 1;
if (NSan::ASanIsOn()) {
// https://github.com/google/sanitizers/wiki/AddressSanitizer
multiplier = 4;
} else if (NSan::MSanIsOn()) {
// via https://github.com/google/sanitizers/wiki/MemorySanitizer
multiplier = 3;
} else if (NSan::TSanIsOn()) {
// via https://clang.llvm.org/docs/ThreadSanitizer.html
multiplier = 15;
}
return TDuration::FromValue(timeout.GetValue() * multiplier);
}
TDuration THttp2Options::ConnectTimeout = FixTimeoutForSanitizer(TDuration::MilliSeconds(1000));
TDuration THttp2Options::InputDeadline = TDuration::Max();
TDuration THttp2Options::OutputDeadline = TDuration::Max();
TDuration THttp2Options::SymptomSlowConnect = FixTimeoutForSanitizer(TDuration::MilliSeconds(10));
size_t THttp2Options::InputBufferSize = 16 * 1024;
bool THttp2Options::KeepInputBufferForCachedConnections = false;
size_t THttp2Options::AsioThreads = 4;
size_t THttp2Options::AsioServerThreads = 4;
bool THttp2Options::EnsureSendingCompleteByAck = false;
int THttp2Options::Backlog = 100;
TDuration THttp2Options::ServerInputDeadline = FixTimeoutForSanitizer(TDuration::MilliSeconds(500));
TDuration THttp2Options::ServerOutputDeadline = TDuration::Max();
TDuration THttp2Options::ServerInputDeadlineKeepAliveMax = FixTimeoutForSanitizer(TDuration::Seconds(120));
TDuration THttp2Options::ServerInputDeadlineKeepAliveMin = FixTimeoutForSanitizer(TDuration::Seconds(10));
bool THttp2Options::ServerUseDirectWrite = false;
bool THttp2Options::UseResponseAsErrorMessage = false;
bool THttp2Options::FullHeadersAsErrorMessage = false;
bool THttp2Options::ErrorDetailsAsResponseBody = false;
bool THttp2Options::RedirectionNotError = false;
bool THttp2Options::AnyResponseIsNotError = false;
bool THttp2Options::TcpKeepAlive = false;
i32 THttp2Options::LimitRequestsPerConnection = -1;
bool THttp2Options::QuickAck = false;
bool THttp2Options::UseAsyncSendRequest = false;
bool THttp2Options::RespectHostInHttpServerNetworkAddress = false;
bool THttp2Options::Set(TStringBuf name, TStringBuf value) {
#define HTTP2_TRY_SET(optType, optName) \
if (name == TStringBuf(#optName)) { \
optName = FromString<optType>(value); \
}
HTTP2_TRY_SET(TDuration, ConnectTimeout)
else HTTP2_TRY_SET(TDuration, InputDeadline)
else HTTP2_TRY_SET(TDuration, OutputDeadline)
else HTTP2_TRY_SET(TDuration, SymptomSlowConnect) else HTTP2_TRY_SET(size_t, InputBufferSize) else HTTP2_TRY_SET(bool, KeepInputBufferForCachedConnections) else HTTP2_TRY_SET(size_t, AsioThreads) else HTTP2_TRY_SET(size_t, AsioServerThreads) else HTTP2_TRY_SET(bool, EnsureSendingCompleteByAck) else HTTP2_TRY_SET(int, Backlog) else HTTP2_TRY_SET(TDuration, ServerInputDeadline) else HTTP2_TRY_SET(TDuration, ServerOutputDeadline) else HTTP2_TRY_SET(TDuration, ServerInputDeadlineKeepAliveMax) else HTTP2_TRY_SET(TDuration, ServerInputDeadlineKeepAliveMin) else HTTP2_TRY_SET(bool, ServerUseDirectWrite) else HTTP2_TRY_SET(bool, UseResponseAsErrorMessage) else HTTP2_TRY_SET(bool, FullHeadersAsErrorMessage) else HTTP2_TRY_SET(bool, ErrorDetailsAsResponseBody) else HTTP2_TRY_SET(bool, RedirectionNotError) else HTTP2_TRY_SET(bool, AnyResponseIsNotError) else HTTP2_TRY_SET(bool, TcpKeepAlive) else HTTP2_TRY_SET(i32, LimitRequestsPerConnection) else HTTP2_TRY_SET(bool, QuickAck)
else HTTP2_TRY_SET(bool, UseAsyncSendRequest) else {
return false;
}
return true;
}
namespace NNeh {
const NDns::TResolvedHost* Resolve(const TStringBuf host, ui16 port, NHttp::EResolverType resolverType);
}
namespace {
//#define DEBUG_STAT
#ifdef DEBUG_STAT
struct TDebugStat {
static std::atomic<size_t> ConnTotal;
static std::atomic<size_t> ConnActive;
static std::atomic<size_t> ConnCached;
static std::atomic<size_t> ConnDestroyed;
static std::atomic<size_t> ConnFailed;
static std::atomic<size_t> ConnConnCanceled;
static std::atomic<size_t> ConnSlow;
static std::atomic<size_t> Conn2Success;
static std::atomic<size_t> ConnPurgedInCache;
static std::atomic<size_t> ConnDestroyedInCache;
static std::atomic<size_t> RequestTotal;
static std::atomic<size_t> RequestSuccessed;
static std::atomic<size_t> RequestFailed;
static void Print() {
Cout << "ct=" << ConnTotal.load(std::memory_order_acquire)
<< " ca=" << ConnActive.load(std::memory_order_acquire)
<< " cch=" << ConnCached.load(std::memory_order_acquire)
<< " cd=" << ConnDestroyed.load(std::memory_order_acquire)
<< " cf=" << ConnFailed.load(std::memory_order_acquire)
<< " ccc=" << ConnConnCanceled.load(std::memory_order_acquire)
<< " csl=" << ConnSlow.load(std::memory_order_acquire)
<< " c2s=" << Conn2Success.load(std::memory_order_acquire)
<< " cpc=" << ConnPurgedInCache.load(std::memory_order_acquire)
<< " cdc=" << ConnDestroyedInCache.load(std::memory_order_acquire)
<< " rt=" << RequestTotal.load(std::memory_order_acquire)
<< " rs=" << RequestSuccessed.load(std::memory_order_acquire)
<< " rf=" << RequestFailed.load(std::memory_order_acquire)
<< Endl;
}
};
std::atomic<size_t> TDebugStat::ConnTotal = 0;
std::atomic<size_t> TDebugStat::ConnActive = 0;
std::atomic<size_t> TDebugStat::ConnCached = 0;
std::atomic<size_t> TDebugStat::ConnDestroyed = 0;
std::atomic<size_t> TDebugStat::ConnFailed = 0;
std::atomic<size_t> TDebugStat::ConnConnCanceled = 0;
std::atomic<size_t> TDebugStat::ConnSlow = 0;
std::atomic<size_t> TDebugStat::Conn2Success = 0;
std::atomic<size_t> TDebugStat::ConnPurgedInCache = 0;
std::atomic<size_t> TDebugStat::ConnDestroyedInCache = 0;
std::atomic<size_t> TDebugStat::RequestTotal = 0;
std::atomic<size_t> TDebugStat::RequestSuccessed = 0;
std::atomic<size_t> TDebugStat::RequestFailed = 0;
#endif
inline void PrepareSocket(SOCKET s, const TRequestSettings& requestSettings = TRequestSettings()) {
if (requestSettings.NoDelay) {
SetNoDelay(s, true);
}
}
bool Compress(TData& data, const TString& compressionScheme) {
if (compressionScheme == "gzip" && data.size() > 23) { // there is no string less than 24 bytes long that might be compressed with gzip
try {
TData gzipped(data.size());
TMemoryOutput out(gzipped.data(), gzipped.size());
TZLibCompress c(&out, ZLib::GZip);
c.Write(data.data(), data.size());
c.Finish();
gzipped.resize(out.Buf() - gzipped.data());
data.swap(gzipped);
return true;
} catch (yexception&) {
// gzipped data occupies more space than original data
}
}
return false;
}
class THttpRequestBuffers: public NAsio::TTcpSocket::IBuffers {
public:
THttpRequestBuffers(TRequestData::TPtr rd)
: Req_(rd)
, Parts_(Req_->Parts())
, IOvec_(Parts_.data(), Parts_.size())
{
}
TContIOVector* GetIOvec() override {
return &IOvec_;
}
private:
TRequestData::TPtr Req_;
TVector<IOutputStream::TPart> Parts_;
TContIOVector IOvec_;
};
struct TRequestGet1: public TRequestGet {
static inline TStringBuf Name() noexcept {
return TStringBuf("http");
}
};
struct TRequestPost1: public TRequestPost {
static inline TStringBuf Name() noexcept {
return TStringBuf("post");
}
};
struct TRequestFull1: public TRequestFull {
static inline TStringBuf Name() noexcept {
return TStringBuf("full");
}
};
struct TRequestGet2: public TRequestGet {
static inline TStringBuf Name() noexcept {
return TStringBuf("http2");
}
};
struct TRequestPost2: public TRequestPost {
static inline TStringBuf Name() noexcept {
return TStringBuf("post2");
}
};
struct TRequestFull2: public TRequestFull {
static inline TStringBuf Name() noexcept {
return TStringBuf("full2");
}
};
struct TRequestUnixSocketGet: public TRequestGet {
static inline TStringBuf Name() noexcept {
return TStringBuf("http+unix");
}
static TRequestSettings RequestSettings() {
return TRequestSettings()
.SetNoDelay(false)
.SetResolverType(EResolverType::EUNIXSOCKET);
}
};
struct TRequestUnixSocketPost: public TRequestPost {
static inline TStringBuf Name() noexcept {
return TStringBuf("post+unix");
}
static TRequestSettings RequestSettings() {
return TRequestSettings()
.SetNoDelay(false)
.SetResolverType(EResolverType::EUNIXSOCKET);
}
};
struct TRequestUnixSocketFull: public TRequestFull {
static inline TStringBuf Name() noexcept {
return TStringBuf("full+unix");
}
static TRequestSettings RequestSettings() {
return TRequestSettings()
.SetNoDelay(false)
.SetResolverType(EResolverType::EUNIXSOCKET);
}
};
typedef TAutoPtr<THttpRequestBuffers> THttpRequestBuffersPtr;
class THttpRequest;
typedef TSharedPtrB<THttpRequest> THttpRequestRef;
class THttpConn;
typedef TIntrusivePtr<THttpConn> THttpConnRef;
typedef std::function<TRequestData::TPtr(const TMessage&, const TParsedLocation&)> TRequestBuilder;
class THttpRequest {
public:
class THandle: public TSimpleHandle {
public:
THandle(IOnRecv* f, const TMessage& msg, TStatCollector* s) noexcept
: TSimpleHandle(f, msg, s)
{
}
bool MessageSendedCompletely() const noexcept override {
if (TSimpleHandle::MessageSendedCompletely()) {
return true;
}
THttpRequestRef req(GetRequest());
if (!!req && req->RequestSendedCompletely()) {
const_cast<THandle*>(this)->SetSendComplete();
}
return TSimpleHandle::MessageSendedCompletely();
}
void Cancel() noexcept override {
if (TSimpleHandle::Canceled()) {
return;
}
THttpRequestRef req(GetRequest());
if (!!req) {
TSimpleHandle::Cancel();
req->Cancel();
}
}
void NotifyError(TErrorRef error, const THttpParser* rsp = nullptr) {
#ifdef DEBUG_STAT
++TDebugStat::RequestFailed;
#endif
if (rsp) {
TSimpleHandle::NotifyError(error, rsp->DecodedContent(), rsp->FirstLine(), rsp->Headers());
} else {
TSimpleHandle::NotifyError(error);
}
ReleaseRequest();
}
//not thread safe!
void SetRequest(const TWeakPtrB<THttpRequest>& r) noexcept {
Req_ = r;
}
private:
THttpRequestRef GetRequest() const noexcept {
TGuard<TSpinLock> g(SP_);
return Req_;
}
void ReleaseRequest() noexcept {
TWeakPtrB<THttpRequest> tmp;
TGuard<TSpinLock> g(SP_);
tmp.Swap(Req_);
}
mutable TSpinLock SP_;
TWeakPtrB<THttpRequest> Req_;
};
typedef TIntrusivePtr<THandle> THandleRef;
static void Run(THandleRef& h, const TMessage& msg, TRequestBuilder f, const TRequestSettings& s) {
THttpRequestRef req(new THttpRequest(h, msg, f, s));
req->WeakThis_ = req;
h->SetRequest(req->WeakThis_);
req->Run(req);
}
~THttpRequest() {
DBGOUT("~THttpRequest()");
}
private:
THttpRequest(THandleRef& h, TMessage msg, TRequestBuilder f, const TRequestSettings& s)
: Hndl_(h)
, RequestBuilder_(f)
, RequestSettings_(s)
, Msg_(std::move(msg))
, Loc_(Msg_.Addr)
, Addr_(Resolve(Loc_.Host, Loc_.GetPort(), RequestSettings_.ResolverType))
, AddrIter_(Addr_->Addr.Begin())
, Canceled_(false)
, RequestSendedCompletely_(false)
{
}
void Run(THttpRequestRef& req);
public:
THttpRequestBuffersPtr BuildRequest() {
return new THttpRequestBuffers(RequestBuilder_(Msg_, Loc_));
}
TRequestSettings RequestSettings() {
return RequestSettings_;
}
//can create a spare socket in an attempt to decrease connecting time
void OnDetectSlowConnecting();
//remove extra connection on success connec
void OnConnect(THttpConn* c);
//have some response input
void OnBeginRead() noexcept {
RequestSendedCompletely_ = true;
}
void OnResponse(TAutoPtr<THttpParser>& rsp);
void OnConnectFailed(THttpConn* c, const TErrorCode& ec);
void OnSystemError(THttpConn* c, const TErrorCode& ec);
void OnError(THttpConn* c, const TString& errorText);
bool RequestSendedCompletely() noexcept;
void Cancel() noexcept;
private:
void NotifyResponse(const TString& resp, const TString& firstLine, const THttpHeaders& headers) {
THandleRef h(ReleaseHandler());
if (!!h) {
h->NotifyResponse(resp, firstLine, headers);
}
}
void NotifyError(
const TString& errorText,
TError::TType errorType = TError::UnknownType,
i32 errorCode = 0, i32 systemErrorCode = 0) {
NotifyError(new TError(errorText, errorType, errorCode, systemErrorCode));
}
void NotifyError(TErrorRef error, const THttpParser* rsp = nullptr) {
THandleRef h(ReleaseHandler());
if (!!h) {
h->NotifyError(error, rsp);
}
}
void Finalize(THttpConn* skipConn = nullptr) noexcept;
inline THandleRef ReleaseHandler() noexcept {
THandleRef h;
{
TGuard<TSpinLock> g(SL_);
h.Swap(Hndl_);
}
return h;
}
inline THttpConnRef GetConn() noexcept {
TGuard<TSpinLock> g(SL_);
return Conn_;
}
inline THttpConnRef ReleaseConn() noexcept {
THttpConnRef c;
{
TGuard<TSpinLock> g(SL_);
c.Swap(Conn_);
}
return c;
}
inline THttpConnRef ReleaseConn2() noexcept {
THttpConnRef c;
{
TGuard<TSpinLock> g(SL_);
c.Swap(Conn2_);
}
return c;
}
TSpinLock SL_; //guaranted calling notify() only once (prevent race between asio thread and current)
THandleRef Hndl_;
TRequestBuilder RequestBuilder_;
TRequestSettings RequestSettings_;
const TMessage Msg_;
const TParsedLocation Loc_;
const TResolvedHost* Addr_;
TNetworkAddress::TIterator AddrIter_;
THttpConnRef Conn_;
THttpConnRef Conn2_; //concurrent connection used, if detected slow connecting on first connection
TWeakPtrB<THttpRequest> WeakThis_;
TAtomicBool Canceled_;
TAtomicBool RequestSendedCompletely_;
};
TAtomicCounter* HttpOutConnCounter();
class THttpConn: public TThrRefBase {
public:
static THttpConnRef Create(TIOService& srv);
~THttpConn() override {
DBGOUT("~THttpConn()");
Req_.Reset();
HttpOutConnCounter()->Dec();
#ifdef DEBUG_STAT
++TDebugStat::ConnDestroyed;
#endif
}
void StartRequest(THttpRequestRef req, const TEndpoint& ep, size_t addrId, TDuration slowConn) {
{
//thread safe linking connection->request
TGuard<TSpinLock> g(SL_);
Req_ = req;
}
AddrId_ = addrId;
try {
TDuration connectDeadline(THttp2Options::ConnectTimeout);
if (THttp2Options::ConnectTimeout > slowConn) {
//use append non fatal connect deadline, so on first timedout
//report about slow connecting to THttpRequest, and continue wait ConnectDeadline_ period
connectDeadline = slowConn;
ConnectDeadline_ = THttp2Options::ConnectTimeout - slowConn;
}
DBGOUT("AsyncConnect to " << ep.IpToString());
AS_.AsyncConnect(ep, std::bind(&THttpConn::OnConnect, THttpConnRef(this), _1, _2), connectDeadline);
} catch (...) {
ReleaseRequest();
throw;
}
}
//start next request on keep-alive connection
bool StartNextRequest(THttpRequestRef& req) {
if (Finalized_) {
return false;
}
{
//thread safe linking connection->request
TGuard<TSpinLock> g(SL_);
Req_ = req;
}
RequestWritten_ = false;
BeginReadResponse_ = false;
try {
TErrorCode ec;
SendRequest(req->BuildRequest(), ec); //throw std::bad_alloc
if (ec.Value() == ECANCELED) {
OnCancel();
} else if (ec) {
OnError(ec);
}
} catch (...) {
OnError(CurrentExceptionMessage());
throw;
}
return true;
}
//connection received from cache must be validated before using
//(process removing closed conection from cache consume some time)
inline bool IsValid() const noexcept {
return !Finalized_;
}
void SetCached(bool v) noexcept {
Cached_ = v;
}
void Close() noexcept {
try {
Cancel();
} catch (...) {
}
}
void DetachRequest() noexcept {
ReleaseRequest();
}
void Cancel() { //throw std::bad_alloc
if (!Canceled_) {
Canceled_ = true;
Finalized_ = true;
OnCancel();
AS_.AsyncCancel();
}
}
void OnCancel() {
THttpRequestRef r(ReleaseRequest());
if (!!r) {
static const TString reqCanceled("request canceled");
r->OnError(this, reqCanceled);
}
}
bool RequestSendedCompletely() const noexcept {
DBGOUT("RequestSendedCompletely()");
if (!Connected_ || !RequestWritten_) {
return false;
}
if (BeginReadResponse_) {
return true;
}
#if defined(FIONWRITE)
if (THttp2Options::EnsureSendingCompleteByAck) {
int nbytes = Max<int>();
int err = ioctl(AS_.Native(), FIONWRITE, &nbytes);
return err ? false : nbytes == 0;
}
#endif
return true;
}
TIOService& GetIOService() const noexcept {
return AS_.GetIOService();
}
private:
THttpConn(TIOService& srv)
: AddrId_(0)
, AS_(srv)
, BuffSize_(THttp2Options::InputBufferSize)
, Connected_(false)
, Cached_(false)
, Canceled_(false)
, Finalized_(false)
, InAsyncRead_(false)
, RequestWritten_(false)
, BeginReadResponse_(false)
{
HttpOutConnCounter()->Inc();
}
//can be called only from asio
void OnConnect(const TErrorCode& ec, IHandlingContext& ctx) {
DBGOUT("THttpConn::OnConnect: " << ec.Value());
if (Y_UNLIKELY(ec)) {
if (ec.Value() == ETIMEDOUT && ConnectDeadline_.GetValue()) {
//detect slow connecting (yet not reached final timeout)
DBGOUT("OnConnectTimingCheck");
THttpRequestRef req(GetRequest());
if (!req) {
return; //cancel from client thread can ahead us
}
TDuration newDeadline(ConnectDeadline_);
ConnectDeadline_ = TDuration::Zero(); //next timeout is final
req->OnDetectSlowConnecting();
//continue wait connect
ctx.ContinueUseHandler(newDeadline);
return;
}
#ifdef DEBUG_STAT
if (ec.Value() != ECANCELED) {
++TDebugStat::ConnFailed;
} else {
++TDebugStat::ConnConnCanceled;
}
#endif
if (ec.Value() == EIO) {
//try get more detail error info
char buf[1];
TErrorCode errConnect;
AS_.ReadSome(buf, 1, errConnect);
OnConnectFailed(errConnect.Value() ? errConnect : ec);
} else if (ec.Value() == ECANCELED) {
// not try connecting to next host ip addr, simple fail
OnError(ec);
} else {
OnConnectFailed(ec);
}
} else {
Connected_ = true;
THttpRequestRef req(GetRequest());
if (!req || Canceled_) {
return;
}
try {
PrepareSocket(AS_.Native(), req->RequestSettings());
if (THttp2Options::TcpKeepAlive) {
SetKeepAlive(AS_.Native(), true);
}
} catch (TSystemError& err) {
TErrorCode ec2(err.Status());
OnError(ec2);
return;
}
req->OnConnect(this);
THttpRequestBuffersPtr ptr(req->BuildRequest());
PrepareParser();
TErrorCode ec3;
SendRequest(ptr, ec3);
if (ec3) {
OnError(ec3);
}
}
}
void PrepareParser() {
Prs_ = new THttpParser();
Prs_->Prepare();
}
void SendRequest(const THttpRequestBuffersPtr& bfs, TErrorCode& ec) { //throw std::bad_alloc
if (!THttp2Options::UseAsyncSendRequest) {
size_t amount = AS_.WriteSome(*bfs->GetIOvec(), ec);
if (ec && ec.Value() != EAGAIN && ec.Value() != EWOULDBLOCK && ec.Value() != EINPROGRESS) {
return;
}
ec.Assign(0);
bfs->GetIOvec()->Proceed(amount);
if (bfs->GetIOvec()->Complete()) {
RequestWritten_ = true;
StartRead();
} else {
SendRequestAsync(bfs);
}
} else {
SendRequestAsync(bfs);
}
}
void SendRequestAsync(const THttpRequestBuffersPtr& bfs) {
NAsio::TTcpSocket::TSendedData sd(bfs.Release());
AS_.AsyncWrite(sd, std::bind(&THttpConn::OnWrite, THttpConnRef(this), _1, _2, _3), THttp2Options::OutputDeadline);
}
void OnWrite(const TErrorCode& err, size_t amount, IHandlingContext& ctx) {
Y_UNUSED(amount);
Y_UNUSED(ctx);
if (err) {
OnError(err);
} else {
DBGOUT("OnWrite()");
RequestWritten_ = true;
StartRead();
}
}
inline void StartRead() {
if (!InAsyncRead_ && !Canceled_) {
InAsyncRead_ = true;
AS_.AsyncPollRead(std::bind(&THttpConn::OnCanRead, THttpConnRef(this), _1, _2), THttp2Options::InputDeadline);
}
}
//can be called only from asio
void OnReadSome(const TErrorCode& err, size_t bytes, IHandlingContext& ctx) {
if (Y_UNLIKELY(err)) {
OnError(err);
return;
}
if (!BeginReadResponse_) {
//used in MessageSendedCompletely()
BeginReadResponse_ = true;
THttpRequestRef r(GetRequest());
if (!!r) {
r->OnBeginRead();
}
}
DBGOUT("receive:" << TStringBuf(Buff_.Get(), bytes));
try {
if (!Prs_) {
throw yexception() << TStringBuf("receive some data while not in request");
}
#if defined(_linux_)
if (THttp2Options::QuickAck) {
SetSockOpt(AS_.Native(), SOL_TCP, TCP_QUICKACK, (int)1);
}
#endif
DBGOUT("parse:");
while (!Prs_->Parse(Buff_.Get(), bytes)) {
if (BuffSize_ == bytes) {
TErrorCode ec;
bytes = AS_.ReadSome(Buff_.Get(), BuffSize_, ec);
if (!ec) {
continue;
}
if (ec.Value() != EAGAIN && ec.Value() != EWOULDBLOCK) {
OnError(ec);
return;
}
}
//continue async. read from socket
ctx.ContinueUseHandler(THttp2Options::InputDeadline);
return;
}
//succesfully reach end of http response
THttpRequestRef r(ReleaseRequest());
if (!r) {
//lost race to req. canceling
DBGOUT("connection failed");
return;
}
DBGOUT("response:");
bool keepALive = Prs_->IsKeepAlive();
r->OnResponse(Prs_);
if (!keepALive) {
return;
}
//continue use connection (keep-alive mode)
PrepareParser();
if (!THttp2Options::KeepInputBufferForCachedConnections) {
Buff_.Destroy();
}
//continue async. read from socket
ctx.ContinueUseHandler(THttp2Options::InputDeadline);
PutSelfToCache();
} catch (...) {
OnError(CurrentExceptionMessage());
}
}
void PutSelfToCache();
//method for reaction on input data for re-used keep-alive connection,
//which free/release buffer when was placed in cache
void OnCanRead(const TErrorCode& err, IHandlingContext& ctx) {
if (Y_UNLIKELY(err)) {
OnError(err);
} else {
if (!Buff_) {
Buff_.Reset(new char[BuffSize_]);
}
TErrorCode ec;
OnReadSome(ec, AS_.ReadSome(Buff_.Get(), BuffSize_, ec), ctx);
}
}
//unlink connection and request, thread-safe mark connection as non valid
inline THttpRequestRef GetRequest() noexcept {
TGuard<TSpinLock> g(SL_);
return Req_;
}
inline THttpRequestRef ReleaseRequest() noexcept {
THttpRequestRef r;
{
TGuard<TSpinLock> g(SL_);
r.Swap(Req_);
}
return r;
}
void OnConnectFailed(const TErrorCode& ec);
inline void OnError(const TErrorCode& ec) {
OnError(ec.Text());
}
inline void OnError(const TString& errText);
size_t AddrId_;
NAsio::TTcpSocket AS_;
TArrayHolder<char> Buff_; //input buffer
const size_t BuffSize_;
TAutoPtr<THttpParser> Prs_; //input data parser & parsed info storage
TSpinLock SL_;
THttpRequestRef Req_; //current request
TDuration ConnectDeadline_;
TAtomicBool Connected_;
TAtomicBool Cached_;
TAtomicBool Canceled_;
TAtomicBool Finalized_;
bool InAsyncRead_;
TAtomicBool RequestWritten_;
TAtomicBool BeginReadResponse_;
};
//conn limits monitoring, cache clean, contain used in http clients asio threads/executors
class THttpConnManager: public IThreadFactory::IThreadAble {
public:
THttpConnManager()
: TotalConn(0)
, EP_(THttp2Options::AsioThreads)
, InPurging_(0)
, MaxConnId_(0)
, Shutdown_(false)
{
T_ = SystemThreadFactory()->Run(this);
Limits.SetSoft(40000);
Limits.SetHard(50000);
}
~THttpConnManager() override {
{
TGuard<TMutex> g(PurgeMutex_);
Shutdown_ = true;
CondPurge_.Signal();
}
EP_.SyncShutdown();
T_->Join();
}
inline void SetLimits(size_t softLimit, size_t hardLimit) noexcept {
Limits.SetSoft(softLimit);
Limits.SetHard(hardLimit);
}
inline std::pair<size_t, size_t> GetLimits() const noexcept {
return {Limits.Soft(), Limits.Hard()};
}
inline void CheckLimits() {
if (ExceedSoftLimit()) {
SuggestPurgeCache();
if (ExceedHardLimit()) {
Y_ABORT("neh::http2 output connections limit reached");
//ythrow yexception() << "neh::http2 output connections limit reached";
}
}
}
inline bool Get(THttpConnRef& conn, size_t addrId) {
#ifdef DEBUG_STAT
TDebugStat::ConnTotal.store(TotalConn.Val(), std::memory_order_release);
TDebugStat::ConnActive.store(Active(), std::memory_order_release);
TDebugStat::ConnCached.store(Cache_.Size(), std::memory_order_release);
#endif
return Cache_.Get(conn, addrId);
}
inline void Put(THttpConnRef& conn, size_t addrId) {
if (Y_LIKELY(!Shutdown_ && !ExceedHardLimit() && !CacheDisabled())) {
if (Y_UNLIKELY(addrId > (size_t)AtomicGet(MaxConnId_))) {
AtomicSet(MaxConnId_, addrId);
}
Cache_.Put(conn, addrId);
} else {
conn->Close();
conn.Drop();
}
}
inline size_t OnConnError(size_t addrId) {
return Cache_.Validate(addrId);
}
TIOService& GetIOService() {
return EP_.GetExecutor().GetIOService();
}
bool CacheDisabled() const {
return Limits.Soft() == 0;
}
bool IsShutdown() const noexcept {
return Shutdown_;
}
TAtomicCounter TotalConn;
private:
inline size_t Total() const noexcept {
return TotalConn.Val();
}
inline size_t Active() const noexcept {
return TFdLimits::ExceedLimit(Total(), Cache_.Size());
}
inline size_t ExceedSoftLimit() const noexcept {
return TFdLimits::ExceedLimit(Total(), Limits.Soft());
}
inline size_t ExceedHardLimit() const noexcept {
return TFdLimits::ExceedLimit(Total(), Limits.Hard());
}
void SuggestPurgeCache() {
if (AtomicTryLock(&InPurging_)) {
//evaluate the usefulness of purging the cache
//если в кеше мало соединений (< MaxConnId_/16 или 64), не чистим кеш
if (Cache_.Size() > (Min((size_t)AtomicGet(MaxConnId_), (size_t)1024U) >> 4)) {
//по мере приближения к hardlimit нужда в чистке cache приближается к 100%
size_t closenessToHardLimit256 = ((Active() + 1) << 8) / (Limits.Delta() + 1);
//чем больше соединений в кеше, а не в работе, тем менее нужен кеш (можно его почистить)
size_t cacheUselessness256 = ((Cache_.Size() + 1) << 8) / (Active() + 1);
//итого, - пороги срабатывания:
//при достижении soft-limit, если соединения в кеше, а не в работе
//на полпути от soft-limit к hard-limit, если в кеше больше половины соединений
//при приближении к hardlimit пытаться почистить кеш почти постоянно
if ((closenessToHardLimit256 + cacheUselessness256) >= 256U) {
TGuard<TMutex> g(PurgeMutex_);
CondPurge_.Signal();
return; //memo: thread MUST unlock InPurging_ (see DoExecute())
}
}
AtomicUnlock(&InPurging_);
}
}
void DoExecute() override {
TThread::SetCurrentThreadName("NehHttpConnMngr");
while (true) {
{
TGuard<TMutex> g(PurgeMutex_);
if (Shutdown_)
return;
CondPurge_.WaitI(PurgeMutex_);
}
PurgeCache();
AtomicUnlock(&InPurging_);
}
}
void PurgeCache() noexcept {
//try remove at least ExceedSoftLimit() oldest connections from cache
//вычисляем долю кеша, которую нужно почистить (в 256 долях) (но не менее 1/32 кеша)
size_t frac256 = Min(size_t(Max(size_t(256U / 32U), (ExceedSoftLimit() << 8) / (Cache_.Size() + 1))), (size_t)256U);
size_t processed = 0;
size_t maxConnId = AtomicGet(MaxConnId_);
for (size_t i = 0; i <= maxConnId && !Shutdown_; ++i) {
processed += Cache_.Purge(i, frac256);
if (processed > 32) {
#ifdef DEBUG_STAT
TDebugStat::ConnPurgedInCache += processed;
#endif
processed = 0;
Sleep(TDuration::MilliSeconds(10)); //prevent big spike cpu/system usage
}
}
}
TFdLimits Limits;
TExecutorsPool EP_;
TConnCache<THttpConn> Cache_;
TAtomic InPurging_;
TAtomic MaxConnId_;
TAutoPtr<IThreadFactory::IThread> T_;
TCondVar CondPurge_;
TMutex PurgeMutex_;
TAtomicBool Shutdown_;
};
THttpConnManager* HttpConnManager() {
return Singleton<THttpConnManager>();
}
TAtomicCounter* HttpOutConnCounter() {
return &HttpConnManager()->TotalConn;
}
THttpConnRef THttpConn::Create(TIOService& srv) {
if (HttpConnManager()->IsShutdown()) {
throw yexception() << "can't create connection with shutdowned service";
}
return new THttpConn(srv);
}
void THttpConn::PutSelfToCache() {
THttpConnRef c(this);
HttpConnManager()->Put(c, AddrId_);
}
void THttpConn::OnConnectFailed(const TErrorCode& ec) {
THttpRequestRef r(GetRequest());
if (!!r) {
r->OnConnectFailed(this, ec);
}
OnError(ec);
}
void THttpConn::OnError(const TString& errText) {
Finalized_ = true;
if (Connected_) {
Connected_ = false;
TErrorCode ec;
AS_.Shutdown(NAsio::TTcpSocket::ShutdownBoth, ec);
} else {
if (AS_.IsOpen()) {
AS_.AsyncCancel();
}
}
THttpRequestRef r(ReleaseRequest());
if (!!r) {
r->OnError(this, errText);
} else {
if (Cached_) {
size_t res = HttpConnManager()->OnConnError(AddrId_);
Y_UNUSED(res);
#ifdef DEBUG_STAT
TDebugStat::ConnDestroyedInCache += res;
#endif
}
}
}
void THttpRequest::Run(THttpRequestRef& req) {
#ifdef DEBUG_STAT
if ((++TDebugStat::RequestTotal & 0xFFF) == 0) {
TDebugStat::Print();
}
#endif
try {
while (!Canceled_) {
THttpConnRef conn;
if (HttpConnManager()->Get(conn, Addr_->Id)) {
DBGOUT("Use connection from cache");
Conn_ = conn; //thread magic
if (!conn->StartNextRequest(req)) {
continue; //if use connection from cache, ignore write error and try another conn
}
} else {
HttpConnManager()->CheckLimits(); //here throw exception if reach hard limit (or atexit() state)
Conn_ = THttpConn::Create(HttpConnManager()->GetIOService());
TEndpoint ep(new NAddr::TAddrInfo(&*AddrIter_));
Conn_->StartRequest(req, ep, Addr_->Id, THttp2Options::SymptomSlowConnect); // can throw
}
break;
}
} catch (...) {
Conn_.Reset();
throw;
}
}
//it seems we have lost TCP SYN packet, create extra connection for decrease response time
void THttpRequest::OnDetectSlowConnecting() {
#ifdef DEBUG_STAT
++TDebugStat::ConnSlow;
#endif
//use some io_service (Run() thread-executor), from first conn. for more thread safety
THttpConnRef conn = GetConn();
if (!conn) {
return;
}
THttpConnRef conn2;
try {
conn2 = THttpConn::Create(conn->GetIOService());
} catch (...) {
return; // cant create spare connection, simple continue use only main
}
{
TGuard<TSpinLock> g(SL_);
Conn2_ = conn2;
}
if (Y_UNLIKELY(Canceled_)) {
ReleaseConn2();
} else {
//use connect timeout for disable detecting slow connecting on second conn.
TEndpoint ep(new NAddr::TAddrInfo(&*Addr_->Addr.Begin()));
try {
conn2->StartRequest(WeakThis_, ep, Addr_->Id, THttp2Options::ConnectTimeout);
} catch (...) {
// ignore errors on spare connection
ReleaseConn2();
}
}
}
void THttpRequest::OnConnect(THttpConn* c) {
THttpConnRef extraConn;
{
TGuard<TSpinLock> g(SL_);
if (Y_UNLIKELY(!!Conn2_)) {
//has pair concurrent conn, 'should stay only one'
if (Conn2_.Get() == c) {
#ifdef DEBUG_STAT
++TDebugStat::Conn2Success;
#endif
Conn2_.Swap(Conn_);
}
extraConn.Swap(Conn2_);
}
}
if (!!extraConn) {
extraConn->DetachRequest(); //prevent call OnError()
extraConn->Close();
}
}
void THttpRequest::OnResponse(TAutoPtr<THttpParser>& rsp) {
DBGOUT("THttpRequest::OnResponse()");
ReleaseConn();
if (Y_LIKELY(((rsp->RetCode() >= 200 && rsp->RetCode() < (!THttp2Options::RedirectionNotError ? 300 : 400)) || THttp2Options::AnyResponseIsNotError))) {
NotifyResponse(rsp->DecodedContent(), rsp->FirstLine(), rsp->Headers());
} else {
TString message;
if (THttp2Options::FullHeadersAsErrorMessage) {
TStringStream err;
err << rsp->FirstLine();
THttpHeaders hdrs = rsp->Headers();
for (auto h = hdrs.begin(); h < hdrs.end(); h++) {
err << h->ToString() << TStringBuf("\r\n");
}
message = err.Str();
} else if (THttp2Options::UseResponseAsErrorMessage) {
message = rsp->DecodedContent();
} else {
TStringStream err;
err << TStringBuf("request failed(") << rsp->FirstLine() << TStringBuf(")");
message = err.Str();
}
NotifyError(new TError(message, TError::ProtocolSpecific, rsp->RetCode()), rsp.Get());
}
}
void THttpRequest::OnConnectFailed(THttpConn* c, const TErrorCode& ec) {
DBGOUT("THttpRequest::OnConnectFailed()");
//detach/discard failed conn, try connect to next ip addr (if can)
THttpConnRef cc(GetConn());
if (c != cc.Get() || AddrIter_ == Addr_->Addr.End() || ++AddrIter_ == Addr_->Addr.End() || Canceled_) {
return OnSystemError(c, ec);
}
// can try next host addr
c->DetachRequest();
c->Close();
THttpConnRef nextConn;
try {
nextConn = THttpConn::Create(HttpConnManager()->GetIOService());
} catch (...) {
OnSystemError(nullptr, ec);
return;
}
{
THttpConnRef nc = nextConn;
TGuard<TSpinLock> g(SL_);
Conn_.Swap(nc);
}
TEndpoint ep(new NAddr::TAddrInfo(&*AddrIter_));
try {
nextConn->StartRequest(WeakThis_, ep, Addr_->Id, THttp2Options::SymptomSlowConnect);
} catch (...) {
OnError(nullptr, CurrentExceptionMessage());
return;
}
if (Canceled_) {
OnError(nullptr, "canceled");
}
}
void THttpRequest::OnSystemError(THttpConn* c, const TErrorCode& ec) {
DBGOUT("THttpRequest::OnSystemError()");
NotifyError(ec.Text(), TError::TType::UnknownType, 0, ec.Value());
Finalize(c);
}
void THttpRequest::OnError(THttpConn* c, const TString& errorText) {
DBGOUT("THttpRequest::OnError()");
NotifyError(errorText);
Finalize(c);
}
bool THttpRequest::RequestSendedCompletely() noexcept {
if (RequestSendedCompletely_) {
return true;
}
THttpConnRef c(GetConn());
return !!c ? c->RequestSendedCompletely() : false;
}
void THttpRequest::Cancel() noexcept {
if (!Canceled_) {
Canceled_ = true;
try {
static const TString canceled("Canceled");
NotifyError(canceled, TError::Cancelled);
Finalize();
} catch (...) {
}
}
}
inline void FinalizeConn(THttpConnRef& c, THttpConn* skipConn) noexcept {
if (!!c && c.Get() != skipConn) {
c->DetachRequest();
c->Close();
}
}
void THttpRequest::Finalize(THttpConn* skipConn) noexcept {
THttpConnRef c1(ReleaseConn());
FinalizeConn(c1, skipConn);
THttpConnRef c2(ReleaseConn2());
FinalizeConn(c2, skipConn);
}
/////////////////////////////////// server side ////////////////////////////////////
TAtomicCounter* HttpInConnCounter() {
return Singleton<TAtomicCounter>();
}
TFdLimits* HttpInConnLimits() {
return Singleton<TFdLimits>();
}
class THttpServer: public IRequester {
typedef TAutoPtr<TTcpAcceptor> TTcpAcceptorPtr;
typedef TAtomicSharedPtr<TTcpSocket> TTcpSocketRef;
class TConn;
typedef TSharedPtrB<TConn> TConnRef;
class TRequest: public IHttpRequest {
public:
TRequest(TWeakPtrB<TConn>& c, TAutoPtr<THttpParser>& p)
: C_(c)
, P_(p)
, RemoteHost_(C_->RemoteHost())
, CompressionScheme_(P_->GetBestCompressionScheme())
, H_(TStringBuf(P_->FirstLine()))
{
}
~TRequest() override {
if (!!C_) {
try {
C_->SendError(Id(), 503, "service unavailable (request ignored)", P_->HttpVersion(), {});
} catch (...) {
DBGOUT("~TRequest()::SendFail() exception");
}
}
}
TAtomicBase Id() const {
return Id_;
}
protected:
TStringBuf Scheme() const override {
return TStringBuf("http");
}
TString RemoteHost() const override {
return RemoteHost_;
}
TStringBuf Service() const override {
return TStringBuf(H_.Path).Skip(1);
}
const THttpHeaders& Headers() const override {
return P_->Headers();
}
TStringBuf Method() const override {
return H_.Method;
}
TStringBuf Body() const override {
return P_->DecodedContent();
}
TStringBuf Cgi() const override {
return H_.Cgi;
}
TStringBuf RequestId() const override {
return TStringBuf();
}
bool Canceled() const override {
if (!C_) {
return false;
}
return C_->IsCanceled();
}
void SendReply(TData& data) override {
SendReply(data, TString(), HttpCodes::HTTP_OK);
}
void SendReply(TData& data, const TString& headers, int httpCode) override {
if (!!C_) {
C_->Send(Id(), data, CompressionScheme_, P_->HttpVersion(), headers, httpCode);
C_.Reset();
}
}
void SendError(TResponseError err, const THttpErrorDetails& details) override {
static const unsigned errorToHttpCode[IRequest::MaxResponseError] =
{
400,
403,
404,
429,
500,
501,
502,
503,
509};
if (!!C_) {
C_->SendError(Id(), errorToHttpCode[err], details.Details, P_->HttpVersion(), details.Headers);
C_.Reset();
}
}
static TAtomicBase NextId() {
static TAtomic idGenerator = 0;
TAtomicBase id = 0;
do {
id = AtomicIncrement(idGenerator);
} while (!id);
return id;
}
TSharedPtrB<TConn> C_;
TAutoPtr<THttpParser> P_;
TString RemoteHost_;
TString CompressionScheme_;
TParsedHttpFull H_;
TAtomicBase Id_ = NextId();
};
class TRequestGet: public TRequest {
public:
TRequestGet(TWeakPtrB<TConn>& c, TAutoPtr<THttpParser> p)
: TRequest(c, p)
{
}
TStringBuf Data() const override {
return H_.Cgi;
}
};
class TRequestPost: public TRequest {
public:
TRequestPost(TWeakPtrB<TConn>& c, TAutoPtr<THttpParser> p)
: TRequest(c, p)
{
}
TStringBuf Data() const override {
return P_->DecodedContent();
}
};
class TConn {
private:
TConn(THttpServer& hs, const TTcpSocketRef& s)
: HS_(hs)
, AS_(s)
, RemoteHost_(NNeh::PrintHostByRfc(*AS_->RemoteEndpoint().Addr()))
, BuffSize_(THttp2Options::InputBufferSize)
, Buff_(new char[BuffSize_])
, Canceled_(false)
, LeftRequestsToDisconnect_(hs.LimitRequestsPerConnection)
{
DBGOUT("THttpServer::TConn()");
HS_.OnCreateConn();
}
inline TConnRef SelfRef() noexcept {
return WeakThis_;
}
public:
static void Create(THttpServer& hs, const TTcpSocketRef& s) {
TSharedPtrB<TConn> conn(new TConn(hs, s));
conn->WeakThis_ = conn;
conn->ExpectNewRequest();
conn->AS_->AsyncPollRead(std::bind(&TConn::OnCanRead, conn, _1, _2), THttp2Options::ServerInputDeadline);
}
~TConn() {
DBGOUT("~THttpServer::TConn(" << (!AS_ ? -666 : AS_->Native()) << ")");
HS_.OnDestroyConn();
}
private:
void ExpectNewRequest() {
P_.Reset(new THttpParser(THttpParser::Request));
P_->Prepare();
}
void OnCanRead(const TErrorCode& ec, IHandlingContext& ctx) {
if (ec) {
OnError();
} else {
TErrorCode ec2;
OnReadSome(ec2, AS_->ReadSome(Buff_.Get(), BuffSize_, ec2), ctx);
}
}
void OnError() {
DBGOUT("Srv OnError(" << (!AS_ ? -666 : AS_->Native()) << ")");
Canceled_ = true;
AS_->AsyncCancel();
}
void OnReadSome(const TErrorCode& ec, size_t amount, IHandlingContext& ctx) {
if (ec || !amount) {
OnError();
return;
}
DBGOUT("ReadSome(" << (!AS_ ? -666 : AS_->Native()) << "): " << amount);
try {
size_t buffPos = 0;
//DBGOUT("receive and parse: " << TStringBuf(Buff_.Get(), amount));
while (P_->Parse(Buff_.Get() + buffPos, amount - buffPos)) {
if (!P_->IsKeepAlive() || LeftRequestsToDisconnect_ == 1) {
SeenMessageWithoutKeepalive_ = true;
}
char rt = *P_->FirstLine().data();
const size_t extraDataSize = P_->GetExtraDataSize();
if (rt == 'P' || rt == 'p') {
OnRequest(new TRequestPost(WeakThis_, P_));
} else {
OnRequest(new TRequestGet(WeakThis_, P_));
}
if (extraDataSize) {
// has http pipelining
buffPos = amount - extraDataSize;
ExpectNewRequest();
} else {
ExpectNewRequest();
ctx.ContinueUseHandler(HS_.GetKeepAliveTimeout());
return;
}
}
ctx.ContinueUseHandler(THttp2Options::ServerInputDeadline);
} catch (...) {
OnError();
}
}
void OnRequest(TRequest* r) {
DBGOUT("OnRequest()");
if (AtomicGet(PrimaryResponse_)) {
// has pipelining
PipelineOrder_.Enqueue(r->Id());
} else {
AtomicSet(PrimaryResponse_, r->Id());
}
HS_.OnRequest(r);
OnRequestDone();
}
void OnRequestDone() {
DBGOUT("OnRequestDone()");
if (LeftRequestsToDisconnect_ > 0) {
--LeftRequestsToDisconnect_;
}
}
static void PrintHttpVersion(IOutputStream& out, const THttpVersion& ver) {
out << TStringBuf("HTTP/") << ver.Major << TStringBuf(".") << ver.Minor;
}
struct TResponseData : TThrRefBase {
TResponseData(size_t reqId, TTcpSocket::TSendedData data)
: RequestId_(reqId)
, Data_(data)
{
}
size_t RequestId_;
TTcpSocket::TSendedData Data_;
};
typedef TIntrusivePtr<TResponseData> TResponseDataRef;
public:
//called non thread-safe (from outside thread)
void Send(TAtomicBase requestId, TData& data, const TString& compressionScheme, const THttpVersion& ver, const TString& headers, int httpCode) {
class THttpResponseFormatter {
public:
THttpResponseFormatter(TData& theData, const TString& contentEncoding, const THttpVersion& theVer, const TString& theHeaders, int theHttpCode, bool closeConnection) {
Header.Reserve(128 + contentEncoding.size() + theHeaders.size());
PrintHttpVersion(Header, theVer);
Header << TStringBuf(" ") << theHttpCode << ' ' << HttpCodeStr(theHttpCode);
if (Compress(theData, contentEncoding)) {
Header << TStringBuf("\r\nContent-Encoding: ") << contentEncoding;
}
Header << TStringBuf("\r\nContent-Length: ") << theData.size();
if (closeConnection) {
Header << TStringBuf("\r\nConnection: close");
} else if (Y_LIKELY(theVer.Major > 1 || theVer.Minor > 0)) {
// since HTTP/1.1 Keep-Alive is default behaviour
Header << TStringBuf("\r\nConnection: Keep-Alive");
}
if (theHeaders) {
Header << theHeaders;
}
Header << TStringBuf("\r\n\r\n");
Body.swap(theData);
Parts[0].buf = Header.Data();
Parts[0].len = Header.Size();
Parts[1].buf = Body.data();
Parts[1].len = Body.size();
}
TStringStream Header;
TData Body;
IOutputStream::TPart Parts[2];
};
class TBuffers: public THttpResponseFormatter, public TTcpSocket::IBuffers {
public:
TBuffers(TData& theData, const TString& contentEncoding, const THttpVersion& theVer, const TString& theHeaders, int theHttpCode, bool closeConnection)
: THttpResponseFormatter(theData, contentEncoding, theVer, theHeaders, theHttpCode, closeConnection)
, IOVec(Parts, 2)
{
}
TContIOVector* GetIOvec() override {
return &IOVec;
}
TContIOVector IOVec;
};
TTcpSocket::TSendedData sd(new TBuffers(data, compressionScheme, ver, headers, httpCode, SeenMessageWithoutKeepalive_));
SendData(requestId, sd);
}
//called non thread-safe (from outside thread)
void SendError(TAtomicBase requestId, unsigned httpCode, const TString& descr, const THttpVersion& ver, const TString& headers) {
if (Canceled_) {
return;
}
class THttpErrorResponseFormatter {
public:
THttpErrorResponseFormatter(unsigned theHttpCode, const TString& theDescr, const THttpVersion& theVer, bool closeConnection, const TString& headers) {
PrintHttpVersion(Answer, theVer);
Answer << TStringBuf(" ") << theHttpCode << TStringBuf(" ");
if (theDescr.size() && !THttp2Options::ErrorDetailsAsResponseBody) {
// Reason-Phrase = *<TEXT, excluding CR, LF>
// replace bad chars to '.'
TString reasonPhrase = theDescr;
for (TString::iterator it = reasonPhrase.begin(); it != reasonPhrase.end(); ++it) {
char& ch = *it;
if (ch == ' ') {
continue;
}
if (((ch & 31) == ch) || static_cast<unsigned>(ch) == 127 || (static_cast<unsigned>(ch) & 0x80)) {
//CTLs || DEL(127) || non ascii
// (ch <= 32) || (ch >= 127)
ch = '.';
}
}
Answer << reasonPhrase;
} else {
Answer << HttpCodeStr(static_cast<int>(theHttpCode));
}
if (closeConnection) {
Answer << TStringBuf("\r\nConnection: close");
}
if (headers) {
Answer << "\r\n" << headers;
}
if (THttp2Options::ErrorDetailsAsResponseBody) {
Answer << TStringBuf("\r\nContent-Length:") << theDescr.size() << "\r\n\r\n" << theDescr;
} else {
Answer << "\r\n"
"Content-Length:0\r\n\r\n"sv;
}
Parts[0].buf = Answer.Data();
Parts[0].len = Answer.Size();
}
TStringStream Answer;
IOutputStream::TPart Parts[1];
};
class TBuffers: public THttpErrorResponseFormatter, public TTcpSocket::IBuffers {
public:
TBuffers(
unsigned theHttpCode,
const TString& theDescr,
const THttpVersion& theVer,
bool closeConnection,
const TString& headers
)
: THttpErrorResponseFormatter(theHttpCode, theDescr, theVer, closeConnection, headers)
, IOVec(Parts, 1)
{
}
TContIOVector* GetIOvec() override {
return &IOVec;
}
TContIOVector IOVec;
};
TTcpSocket::TSendedData sd(new TBuffers(httpCode, descr, ver, SeenMessageWithoutKeepalive_, headers));
SendData(requestId, sd);
}
void ProcessPipeline() {
// on successfull response to current (PrimaryResponse_) request
TAtomicBase requestId;
if (PipelineOrder_.Dequeue(&requestId)) {
TAtomicBase oldReqId;
do {
oldReqId = AtomicGet(PrimaryResponse_);
Y_ABORT_UNLESS(oldReqId, "race inside http pipelining");
} while (!AtomicCas(&PrimaryResponse_, requestId, oldReqId));
ProcessResponsesData();
} else {
TAtomicBase oldReqId = AtomicGet(PrimaryResponse_);
if (oldReqId) {
while (!AtomicCas(&PrimaryResponse_, 0, oldReqId)) {
Y_ABORT_UNLESS(oldReqId == AtomicGet(PrimaryResponse_), "race inside http pipelining [2]");
}
}
}
}
void ProcessResponsesData() {
// process responses data queue, send response (if already have next PrimaryResponse_)
TResponseDataRef rd;
while (ResponsesDataQueue_.Dequeue(&rd)) {
ResponsesData_[rd->RequestId_] = rd;
}
TAtomicBase requestId = AtomicGet(PrimaryResponse_);
if (requestId) {
THashMap<TAtomicBase, TResponseDataRef>::iterator it = ResponsesData_.find(requestId);
if (it != ResponsesData_.end()) {
// has next primary response
rd = it->second;
ResponsesData_.erase(it);
AS_->AsyncWrite(rd->Data_, std::bind(&TConn::OnSend, SelfRef(), _1, _2, _3), THttp2Options::ServerOutputDeadline);
}
}
}
private:
void SendData(TAtomicBase requestId, TTcpSocket::TSendedData sd) {
TContIOVector& vec = *sd->GetIOvec();
if (requestId != AtomicGet(PrimaryResponse_)) {
// already has another request for response first, so push this to queue
// + enqueue event for safe checking queue (at local/transport thread)
TResponseDataRef rdr = new TResponseData(requestId, sd);
ResponsesDataQueue_.Enqueue(rdr);
AS_->GetIOService().Post(std::bind(&TConn::ProcessResponsesData, SelfRef()));
return;
}
if (THttp2Options::ServerUseDirectWrite) {
vec.Proceed(AS_->WriteSome(vec));
}
if (!vec.Complete()) {
DBGOUT("AsyncWrite()");
AS_->AsyncWrite(sd, std::bind(&TConn::OnSend, SelfRef(), _1, _2, _3), THttp2Options::ServerOutputDeadline);
} else {
// run ProcessPipeline at safe thread
AS_->GetIOService().Post(std::bind(&TConn::ProcessPipeline, SelfRef()));
}
}
void OnSend(const TErrorCode& ec, size_t amount, IHandlingContext&) {
Y_UNUSED(amount);
if (ec) {
OnError();
} else {
ProcessPipeline();
}
if (SeenMessageWithoutKeepalive_) {
TErrorCode shutdown_ec;
AS_->Shutdown(TTcpSocket::ShutdownBoth, shutdown_ec);
}
}
public:
bool IsCanceled() const noexcept {
return Canceled_;
}
const TString& RemoteHost() const noexcept {
return RemoteHost_;
}
private:
TWeakPtrB<TConn> WeakThis_;
THttpServer& HS_;
TTcpSocketRef AS_;
TString RemoteHost_;
size_t BuffSize_;
TArrayHolder<char> Buff_;
TAutoPtr<THttpParser> P_;
// pipeline supporting
TAtomic PrimaryResponse_ = 0;
TLockFreeQueue<TAtomicBase> PipelineOrder_;
TLockFreeQueue<TResponseDataRef> ResponsesDataQueue_;
THashMap<TAtomicBase, TResponseDataRef> ResponsesData_;
TAtomicBool Canceled_;
TAtomicBool SeenMessageWithoutKeepalive_ = false;
i32 LeftRequestsToDisconnect_ = -1;
};
///////////////////////////////////////////////////////////
public:
THttpServer(IOnRequest* cb, const TParsedLocation& loc)
: E_(THttp2Options::AsioServerThreads)
, CB_(cb)
, LimitRequestsPerConnection(THttp2Options::LimitRequestsPerConnection)
{
TNetworkAddress addr = THttp2Options::RespectHostInHttpServerNetworkAddress ?
TNetworkAddress(TString(loc.Host), loc.GetPort())
: TNetworkAddress(loc.GetPort());
for (TNetworkAddress::TIterator it = addr.Begin(); it != addr.End(); ++it) {
TEndpoint ep(new NAddr::TAddrInfo(&*it));
TTcpAcceptorPtr a(new TTcpAcceptor(AcceptExecutor_.GetIOService()));
DBGOUT("bind:" << ep.IpToString() << ":" << ep.Port());
a->Bind(ep);
a->Listen(THttp2Options::Backlog);
StartAccept(a.Get());
A_.push_back(a);
}
}
~THttpServer() override {
AcceptExecutor_.SyncShutdown(); //cancel operation for all current sockets (include acceptors)
A_.clear(); //stop listening
E_.SyncShutdown();
}
void OnAccept(TTcpAcceptor* a, TAtomicSharedPtr<TTcpSocket> s, const TErrorCode& ec, IHandlingContext&) {
if (Y_UNLIKELY(ec)) {
if (ec.Value() == ECANCELED) {
return;
} else if (ec.Value() == EMFILE || ec.Value() == ENFILE || ec.Value() == ENOMEM || ec.Value() == ENOBUFS) {
//reach some os limit, suspend accepting
TAtomicSharedPtr<TDeadlineTimer> dt(new TDeadlineTimer(a->GetIOService()));
dt->AsyncWaitExpireAt(TDuration::Seconds(30), std::bind(&THttpServer::OnTimeoutSuspendAccept, this, a, dt, _1, _2));
return;
} else {
Cdbg << "acc: " << ec.Text() << Endl;
}
} else {
if (static_cast<size_t>(HttpInConnCounter()->Val()) < HttpInConnLimits()->Hard()) {
try {
SetNonBlock(s->Native());
PrepareSocket(s->Native());
TConn::Create(*this, s);
} catch (TSystemError& err) {
TErrorCode ec2(err.Status());
Cdbg << "acc: " << ec2.Text() << Endl;
}
} //else accepted socket will be closed
}
StartAccept(a); //continue accepting
}
void OnTimeoutSuspendAccept(TTcpAcceptor* a, TAtomicSharedPtr<TDeadlineTimer>, const TErrorCode& ec, IHandlingContext&) {
if (!ec) {
DBGOUT("resume acceptor")
StartAccept(a);
}
}
void OnRequest(IRequest* r) {
try {
CB_->OnRequest(r);
} catch (...) {
Cdbg << CurrentExceptionMessage() << Endl;
}
}
protected:
void OnCreateConn() noexcept {
HttpInConnCounter()->Inc();
}
void OnDestroyConn() noexcept {
HttpInConnCounter()->Dec();
}
TDuration GetKeepAliveTimeout() const noexcept {
size_t cc = HttpInConnCounter()->Val();
TFdLimits lim(*HttpInConnLimits());
if (!TFdLimits::ExceedLimit(cc, lim.Soft())) {
return THttp2Options::ServerInputDeadlineKeepAliveMax;
}
if (cc > lim.Hard()) {
cc = lim.Hard();
}
TDuration::TValue softTuneRange = THttp2Options::ServerInputDeadlineKeepAliveMax.Seconds() - THttp2Options::ServerInputDeadlineKeepAliveMin.Seconds();
return TDuration::Seconds((softTuneRange * (cc - lim.Soft())) / (lim.Hard() - lim.Soft() + 1)) + THttp2Options::ServerInputDeadlineKeepAliveMin;
}
private:
void StartAccept(TTcpAcceptor* a) {
TAtomicSharedPtr<TTcpSocket> s(new TTcpSocket(E_.Size() ? E_.GetExecutor().GetIOService() : AcceptExecutor_.GetIOService()));
a->AsyncAccept(*s, std::bind(&THttpServer::OnAccept, this, a, s, _1, _2));
}
TIOServiceExecutor AcceptExecutor_;
TVector<TTcpAcceptorPtr> A_;
TExecutorsPool E_;
IOnRequest* CB_;
public:
const i32 LimitRequestsPerConnection;
};
template <class T>
class THttp2Protocol: public IProtocol {
public:
IRequesterRef CreateRequester(IOnRequest* cb, const TParsedLocation& loc) override {
return new THttpServer(cb, loc);
}
THandleRef ScheduleRequest(const TMessage& msg, IOnRecv* fallback, TServiceStatRef& ss) override {
THttpRequest::THandleRef ret(new THttpRequest::THandle(fallback, msg, !ss ? nullptr : new TStatCollector(ss)));
try {
THttpRequest::Run(ret, msg, &T::Build, T::RequestSettings());
} catch (...) {
ret->ResetOnRecv();
throw;
}
return ret.Get();
}
TStringBuf Scheme() const noexcept override {
return T::Name();
}
bool SetOption(TStringBuf name, TStringBuf value) override {
return THttp2Options::Set(name, value);
}
};
}
namespace NNeh {
IProtocol* Http1Protocol() {
return Singleton<THttp2Protocol<TRequestGet1>>();
}
IProtocol* Post1Protocol() {
return Singleton<THttp2Protocol<TRequestPost1>>();
}
IProtocol* Full1Protocol() {
return Singleton<THttp2Protocol<TRequestFull1>>();
}
IProtocol* Http2Protocol() {
return Singleton<THttp2Protocol<TRequestGet2>>();
}
IProtocol* Post2Protocol() {
return Singleton<THttp2Protocol<TRequestPost2>>();
}
IProtocol* Full2Protocol() {
return Singleton<THttp2Protocol<TRequestFull2>>();
}
IProtocol* UnixSocketGetProtocol() {
return Singleton<THttp2Protocol<TRequestUnixSocketGet>>();
}
IProtocol* UnixSocketPostProtocol() {
return Singleton<THttp2Protocol<TRequestUnixSocketPost>>();
}
IProtocol* UnixSocketFullProtocol() {
return Singleton<THttp2Protocol<TRequestUnixSocketFull>>();
}
void SetHttp2OutputConnectionsLimits(size_t softLimit, size_t hardLimit) {
HttpConnManager()->SetLimits(softLimit, hardLimit);
}
void SetHttp2InputConnectionsLimits(size_t softLimit, size_t hardLimit) {
HttpInConnLimits()->SetSoft(softLimit);
HttpInConnLimits()->SetHard(hardLimit);
}
TAtomicBase GetHttpOutputConnectionCount() {
return HttpOutConnCounter()->Val();
}
std::pair<size_t, size_t> GetHttpOutputConnectionLimits() {
return HttpConnManager()->GetLimits();
}
TAtomicBase GetHttpInputConnectionCount() {
return HttpInConnCounter()->Val();
}
void SetHttp2InputConnectionsTimeouts(unsigned minSeconds, unsigned maxSeconds) {
THttp2Options::ServerInputDeadlineKeepAliveMin = TDuration::Seconds(minSeconds);
THttp2Options::ServerInputDeadlineKeepAliveMax = TDuration::Seconds(maxSeconds);
}
class TUnixSocketResolver {
public:
NDns::TResolvedHost* Resolve(const TString& path) {
TString unixSocketPath = path;
if (path.size() > 2 && path[0] == '[' && path[path.size() - 1] == ']') {
unixSocketPath = path.substr(1, path.size() - 2);
}
if (auto resolvedUnixSocket = ResolvedUnixSockets_.FindPtr(unixSocketPath)) {
return resolvedUnixSocket->Get();
}
TNetworkAddress na{TUnixSocketPath(unixSocketPath)};
ResolvedUnixSockets_[unixSocketPath] = MakeHolder<NDns::TResolvedHost>(unixSocketPath, na);
return ResolvedUnixSockets_[unixSocketPath].Get();
}
private:
THashMap<TString, THolder<NDns::TResolvedHost>> ResolvedUnixSockets_;
};
TUnixSocketResolver* UnixSocketResolver() {
return FastTlsSingleton<TUnixSocketResolver>();
}
const NDns::TResolvedHost* Resolve(const TStringBuf host, ui16 port, NHttp::EResolverType resolverType) {
if (resolverType == EResolverType::EUNIXSOCKET) {
return UnixSocketResolver()->Resolve(TString(host));
}
return NDns::CachedResolve(NDns::TResolveInfo(host, port));
}
}