diff options
author | monster <monster@ydb.tech> | 2022-07-07 14:41:37 +0300 |
---|---|---|
committer | monster <monster@ydb.tech> | 2022-07-07 14:41:37 +0300 |
commit | 06e5c21a835c0e923506c4ff27929f34e00761c2 (patch) | |
tree | 75efcbc6854ef9bd476eb8bf00cc5c900da436a2 /library/cpp/neh/http2.cpp | |
parent | 03f024c4412e3aa613bb543cf1660176320ba8f4 (diff) | |
download | ydb-06e5c21a835c0e923506c4ff27929f34e00761c2.tar.gz |
fix ya.make
Diffstat (limited to 'library/cpp/neh/http2.cpp')
-rw-r--r-- | library/cpp/neh/http2.cpp | 2102 |
1 files changed, 2102 insertions, 0 deletions
diff --git a/library/cpp/neh/http2.cpp b/library/cpp/neh/http2.cpp new file mode 100644 index 0000000000..0bba29cf22 --- /dev/null +++ b/library/cpp/neh/http2.cpp @@ -0,0 +1,2102 @@ +#include "http2.h" + +#include "conn_cache.h" +#include "details.h" +#include "factory.h" +#include "http_common.h" +#include "smart_ptr.h" +#include "utils.h" + +#include <library/cpp/http/push_parser/http_parser.h> +#include <library/cpp/http/misc/httpcodes.h> +#include <library/cpp/http/misc/parsed_request.h> +#include <library/cpp/neh/asio/executor.h> + +#include <util/generic/singleton.h> +#include <util/generic/vector.h> +#include <util/network/iovec.h> +#include <util/stream/output.h> +#include <util/stream/zlib.h> +#include <util/system/condvar.h> +#include <util/system/mutex.h> +#include <util/system/spinlock.h> +#include <util/system/yassert.h> +#include <util/thread/factory.h> +#include <util/thread/singleton.h> +#include <util/system/sanitizers.h> + +#include <atomic> + +#if defined(_unix_) +#include <sys/ioctl.h> +#endif + +#if defined(_linux_) +#undef SIOCGSTAMP +#undef SIOCGSTAMPNS +#include <linux/sockios.h> +#define FIONWRITE SIOCOUTQ +#endif + +//#define DEBUG_HTTP2 + +#ifdef DEBUG_HTTP2 +#define DBGOUT(args) Cout << args << Endl; +#else +#define DBGOUT(args) +#endif + +using namespace NDns; +using namespace NAsio; +using namespace NNeh; +using namespace NNeh::NHttp; +using namespace NNeh::NHttp2; +using namespace std::placeholders; + +// +// has complex keep-alive references between entities in multi-thread enviroment, +// this create risks for races/memory leak, etc.. +// so connecting/disconnecting entities must be doing carefully +// +// handler <=-> request <==> connection(socket) <= handlers, stored in io_service +// ^ +// +== connections_cache +// '=>' -- shared/intrusive ptr +// '->' -- weak_ptr +// + +static TDuration FixTimeoutForSanitizer(const TDuration timeout) { + ui64 multiplier = 1; + if (NSan::ASanIsOn()) { + // https://github.com/google/sanitizers/wiki/AddressSanitizer + multiplier = 4; + } else if (NSan::MSanIsOn()) { + // via https://github.com/google/sanitizers/wiki/MemorySanitizer + multiplier = 3; + } else if (NSan::TSanIsOn()) { + // via https://clang.llvm.org/docs/ThreadSanitizer.html + multiplier = 15; + } + + return TDuration::FromValue(timeout.GetValue() * multiplier); +} + +TDuration THttp2Options::ConnectTimeout = FixTimeoutForSanitizer(TDuration::MilliSeconds(1000)); +TDuration THttp2Options::InputDeadline = TDuration::Max(); +TDuration THttp2Options::OutputDeadline = TDuration::Max(); +TDuration THttp2Options::SymptomSlowConnect = FixTimeoutForSanitizer(TDuration::MilliSeconds(10)); +size_t THttp2Options::InputBufferSize = 16 * 1024; +bool THttp2Options::KeepInputBufferForCachedConnections = false; +size_t THttp2Options::AsioThreads = 4; +size_t THttp2Options::AsioServerThreads = 4; +bool THttp2Options::EnsureSendingCompleteByAck = false; +int THttp2Options::Backlog = 100; +TDuration THttp2Options::ServerInputDeadline = FixTimeoutForSanitizer(TDuration::MilliSeconds(500)); +TDuration THttp2Options::ServerOutputDeadline = TDuration::Max(); +TDuration THttp2Options::ServerInputDeadlineKeepAliveMax = FixTimeoutForSanitizer(TDuration::Seconds(120)); +TDuration THttp2Options::ServerInputDeadlineKeepAliveMin = FixTimeoutForSanitizer(TDuration::Seconds(10)); +bool THttp2Options::ServerUseDirectWrite = false; +bool THttp2Options::UseResponseAsErrorMessage = false; +bool THttp2Options::FullHeadersAsErrorMessage = false; +bool THttp2Options::ErrorDetailsAsResponseBody = false; +bool THttp2Options::RedirectionNotError = false; +bool THttp2Options::AnyResponseIsNotError = false; +bool THttp2Options::TcpKeepAlive = false; +i32 THttp2Options::LimitRequestsPerConnection = -1; +bool THttp2Options::QuickAck = false; +bool THttp2Options::UseAsyncSendRequest = false; + +bool THttp2Options::Set(TStringBuf name, TStringBuf value) { +#define HTTP2_TRY_SET(optType, optName) \ + if (name == TStringBuf(#optName)) { \ + optName = FromString<optType>(value); \ + } + + HTTP2_TRY_SET(TDuration, ConnectTimeout) + else HTTP2_TRY_SET(TDuration, InputDeadline) + else HTTP2_TRY_SET(TDuration, OutputDeadline) + else HTTP2_TRY_SET(TDuration, SymptomSlowConnect) else HTTP2_TRY_SET(size_t, InputBufferSize) else HTTP2_TRY_SET(bool, KeepInputBufferForCachedConnections) else HTTP2_TRY_SET(size_t, AsioThreads) else HTTP2_TRY_SET(size_t, AsioServerThreads) else HTTP2_TRY_SET(bool, EnsureSendingCompleteByAck) else HTTP2_TRY_SET(int, Backlog) else HTTP2_TRY_SET(TDuration, ServerInputDeadline) else HTTP2_TRY_SET(TDuration, ServerOutputDeadline) else HTTP2_TRY_SET(TDuration, ServerInputDeadlineKeepAliveMax) else HTTP2_TRY_SET(TDuration, ServerInputDeadlineKeepAliveMin) else HTTP2_TRY_SET(bool, ServerUseDirectWrite) else HTTP2_TRY_SET(bool, UseResponseAsErrorMessage) else HTTP2_TRY_SET(bool, FullHeadersAsErrorMessage) else HTTP2_TRY_SET(bool, ErrorDetailsAsResponseBody) else HTTP2_TRY_SET(bool, RedirectionNotError) else HTTP2_TRY_SET(bool, AnyResponseIsNotError) else HTTP2_TRY_SET(bool, TcpKeepAlive) else HTTP2_TRY_SET(i32, LimitRequestsPerConnection) else HTTP2_TRY_SET(bool, QuickAck) + else HTTP2_TRY_SET(bool, UseAsyncSendRequest) else { + return false; + } + return true; +} + +namespace NNeh { + const NDns::TResolvedHost* Resolve(const TStringBuf host, ui16 port, NHttp::EResolverType resolverType); +} + +namespace { +//#define DEBUG_STAT + +#ifdef DEBUG_STAT + struct TDebugStat { + static std::atomic<size_t> ConnTotal; + static std::atomic<size_t> ConnActive; + static std::atomic<size_t> ConnCached; + static std::atomic<size_t> ConnDestroyed; + static std::atomic<size_t> ConnFailed; + static std::atomic<size_t> ConnConnCanceled; + static std::atomic<size_t> ConnSlow; + static std::atomic<size_t> Conn2Success; + static std::atomic<size_t> ConnPurgedInCache; + static std::atomic<size_t> ConnDestroyedInCache; + static std::atomic<size_t> RequestTotal; + static std::atomic<size_t> RequestSuccessed; + static std::atomic<size_t> RequestFailed; + static void Print() { + Cout << "ct=" << ConnTotal.load(std::memory_order_acquire) + << " ca=" << ConnActive.load(std::memory_order_acquire) + << " cch=" << ConnCached.load(std::memory_order_acquire) + << " cd=" << ConnDestroyed.load(std::memory_order_acquire) + << " cf=" << ConnFailed.load(std::memory_order_acquire) + << " ccc=" << ConnConnCanceled.load(std::memory_order_acquire) + << " csl=" << ConnSlow.load(std::memory_order_acquire) + << " c2s=" << Conn2Success.load(std::memory_order_acquire) + << " cpc=" << ConnPurgedInCache.load(std::memory_order_acquire) + << " cdc=" << ConnDestroyedInCache.load(std::memory_order_acquire) + << " rt=" << RequestTotal.load(std::memory_order_acquire) + << " rs=" << RequestSuccessed.load(std::memory_order_acquire) + << " rf=" << RequestFailed.load(std::memory_order_acquire) + << Endl; + } + }; + std::atomic<size_t> TDebugStat::ConnTotal = 0; + std::atomic<size_t> TDebugStat::ConnActive = 0; + std::atomic<size_t> TDebugStat::ConnCached = 0; + std::atomic<size_t> TDebugStat::ConnDestroyed = 0; + std::atomic<size_t> TDebugStat::ConnFailed = 0; + std::atomic<size_t> TDebugStat::ConnConnCanceled = 0; + std::atomic<size_t> TDebugStat::ConnSlow = 0; + std::atomic<size_t> TDebugStat::Conn2Success = 0; + std::atomic<size_t> TDebugStat::ConnPurgedInCache = 0; + std::atomic<size_t> TDebugStat::ConnDestroyedInCache = 0; + std::atomic<size_t> TDebugStat::RequestTotal = 0; + std::atomic<size_t> TDebugStat::RequestSuccessed = 0; + std::atomic<size_t> TDebugStat::RequestFailed = 0; +#endif + + inline void PrepareSocket(SOCKET s, const TRequestSettings& requestSettings = TRequestSettings()) { + if (requestSettings.NoDelay) { + SetNoDelay(s, true); + } + } + + bool Compress(TData& data, const TString& compressionScheme) { + if (compressionScheme == "gzip") { + try { + TData gzipped(data.size()); + TMemoryOutput out(gzipped.data(), gzipped.size()); + TZLibCompress c(&out, ZLib::GZip); + c.Write(data.data(), data.size()); + c.Finish(); + gzipped.resize(out.Buf() - gzipped.data()); + data.swap(gzipped); + return true; + } catch (yexception&) { + // gzipped data occupies more space than original data + } + } + return false; + } + + class THttpRequestBuffers: public NAsio::TTcpSocket::IBuffers { + public: + THttpRequestBuffers(TRequestData::TPtr rd) + : Req_(rd) + , Parts_(Req_->Parts()) + , IOvec_(Parts_.data(), Parts_.size()) + { + } + + TContIOVector* GetIOvec() override { + return &IOvec_; + } + + private: + TRequestData::TPtr Req_; + TVector<IOutputStream::TPart> Parts_; + TContIOVector IOvec_; + }; + + struct TRequestGet1: public TRequestGet { + static inline TStringBuf Name() noexcept { + return TStringBuf("http"); + } + }; + + struct TRequestPost1: public TRequestPost { + static inline TStringBuf Name() noexcept { + return TStringBuf("post"); + } + }; + + struct TRequestFull1: public TRequestFull { + static inline TStringBuf Name() noexcept { + return TStringBuf("full"); + } + }; + + struct TRequestGet2: public TRequestGet { + static inline TStringBuf Name() noexcept { + return TStringBuf("http2"); + } + }; + + struct TRequestPost2: public TRequestPost { + static inline TStringBuf Name() noexcept { + return TStringBuf("post2"); + } + }; + + struct TRequestFull2: public TRequestFull { + static inline TStringBuf Name() noexcept { + return TStringBuf("full2"); + } + }; + + struct TRequestUnixSocketGet: public TRequestGet { + static inline TStringBuf Name() noexcept { + return TStringBuf("http+unix"); + } + + static TRequestSettings RequestSettings() { + return TRequestSettings() + .SetNoDelay(false) + .SetResolverType(EResolverType::EUNIXSOCKET); + } + }; + + struct TRequestUnixSocketPost: public TRequestPost { + static inline TStringBuf Name() noexcept { + return TStringBuf("post+unix"); + } + + static TRequestSettings RequestSettings() { + return TRequestSettings() + .SetNoDelay(false) + .SetResolverType(EResolverType::EUNIXSOCKET); + } + }; + + struct TRequestUnixSocketFull: public TRequestFull { + static inline TStringBuf Name() noexcept { + return TStringBuf("full+unix"); + } + + static TRequestSettings RequestSettings() { + return TRequestSettings() + .SetNoDelay(false) + .SetResolverType(EResolverType::EUNIXSOCKET); + } + }; + + typedef TAutoPtr<THttpRequestBuffers> THttpRequestBuffersPtr; + + class THttpRequest; + typedef TSharedPtrB<THttpRequest> THttpRequestRef; + + class THttpConn; + typedef TIntrusivePtr<THttpConn> THttpConnRef; + + typedef std::function<TRequestData::TPtr(const TMessage&, const TParsedLocation&)> TRequestBuilder; + + class THttpRequest { + public: + class THandle: public TSimpleHandle { + public: + THandle(IOnRecv* f, const TMessage& msg, TStatCollector* s) noexcept + : TSimpleHandle(f, msg, s) + { + } + + bool MessageSendedCompletely() const noexcept override { + if (TSimpleHandle::MessageSendedCompletely()) { + return true; + } + + THttpRequestRef req(GetRequest()); + if (!!req && req->RequestSendedCompletely()) { + const_cast<THandle*>(this)->SetSendComplete(); + } + + return TSimpleHandle::MessageSendedCompletely(); + } + + void Cancel() noexcept override { + if (TSimpleHandle::Canceled()) { + return; + } + + THttpRequestRef req(GetRequest()); + if (!!req) { + TSimpleHandle::Cancel(); + req->Cancel(); + } + } + + void NotifyError(TErrorRef error, const THttpParser* rsp = nullptr) { +#ifdef DEBUG_STAT + ++TDebugStat::RequestFailed; +#endif + if (rsp) { + TSimpleHandle::NotifyError(error, rsp->DecodedContent(), rsp->FirstLine(), rsp->Headers()); + } else { + TSimpleHandle::NotifyError(error); + } + + ReleaseRequest(); + } + + //not thread safe! + void SetRequest(const TWeakPtrB<THttpRequest>& r) noexcept { + Req_ = r; + } + + private: + THttpRequestRef GetRequest() const noexcept { + TGuard<TSpinLock> g(SP_); + return Req_; + } + + void ReleaseRequest() noexcept { + TWeakPtrB<THttpRequest> tmp; + TGuard<TSpinLock> g(SP_); + tmp.Swap(Req_); + } + + mutable TSpinLock SP_; + TWeakPtrB<THttpRequest> Req_; + }; + + typedef TIntrusivePtr<THandle> THandleRef; + + static void Run(THandleRef& h, const TMessage& msg, TRequestBuilder f, const TRequestSettings& s) { + THttpRequestRef req(new THttpRequest(h, msg, f, s)); + req->WeakThis_ = req; + h->SetRequest(req->WeakThis_); + req->Run(req); + } + + ~THttpRequest() { + DBGOUT("~THttpRequest()"); + } + + private: + THttpRequest(THandleRef& h, TMessage msg, TRequestBuilder f, const TRequestSettings& s) + : Hndl_(h) + , RequestBuilder_(f) + , RequestSettings_(s) + , Msg_(std::move(msg)) + , Loc_(Msg_.Addr) + , Addr_(Resolve(Loc_.Host, Loc_.GetPort(), RequestSettings_.ResolverType)) + , AddrIter_(Addr_->Addr.Begin()) + , Canceled_(false) + , RequestSendedCompletely_(false) + { + } + + void Run(THttpRequestRef& req); + + public: + THttpRequestBuffersPtr BuildRequest() { + return new THttpRequestBuffers(RequestBuilder_(Msg_, Loc_)); + } + + TRequestSettings RequestSettings() { + return RequestSettings_; + } + + //can create a spare socket in an attempt to decrease connecting time + void OnDetectSlowConnecting(); + + //remove extra connection on success connec + void OnConnect(THttpConn* c); + + //have some response input + void OnBeginRead() noexcept { + RequestSendedCompletely_ = true; + } + + void OnResponse(TAutoPtr<THttpParser>& rsp); + + void OnConnectFailed(THttpConn* c, const TErrorCode& ec); + void OnSystemError(THttpConn* c, const TErrorCode& ec); + void OnError(THttpConn* c, const TString& errorText); + + bool RequestSendedCompletely() noexcept; + + void Cancel() noexcept; + + private: + void NotifyResponse(const TString& resp, const TString& firstLine, const THttpHeaders& headers) { + THandleRef h(ReleaseHandler()); + if (!!h) { + h->NotifyResponse(resp, firstLine, headers); + } + } + + void NotifyError( + const TString& errorText, + TError::TType errorType = TError::UnknownType, + i32 errorCode = 0, i32 systemErrorCode = 0) { + NotifyError(new TError(errorText, errorType, errorCode, systemErrorCode)); + } + + void NotifyError(TErrorRef error, const THttpParser* rsp = nullptr) { + THandleRef h(ReleaseHandler()); + if (!!h) { + h->NotifyError(error, rsp); + } + } + + void Finalize(THttpConn* skipConn = nullptr) noexcept; + + inline THandleRef ReleaseHandler() noexcept { + THandleRef h; + { + TGuard<TSpinLock> g(SL_); + h.Swap(Hndl_); + } + return h; + } + + inline THttpConnRef GetConn() noexcept { + TGuard<TSpinLock> g(SL_); + return Conn_; + } + + inline THttpConnRef ReleaseConn() noexcept { + THttpConnRef c; + { + TGuard<TSpinLock> g(SL_); + c.Swap(Conn_); + } + return c; + } + + inline THttpConnRef ReleaseConn2() noexcept { + THttpConnRef c; + { + TGuard<TSpinLock> g(SL_); + c.Swap(Conn2_); + } + return c; + } + + TSpinLock SL_; //guaranted calling notify() only once (prevent race between asio thread and current) + THandleRef Hndl_; + TRequestBuilder RequestBuilder_; + TRequestSettings RequestSettings_; + const TMessage Msg_; + const TParsedLocation Loc_; + const TResolvedHost* Addr_; + TNetworkAddress::TIterator AddrIter_; + THttpConnRef Conn_; + THttpConnRef Conn2_; //concurrent connection used, if detected slow connecting on first connection + TWeakPtrB<THttpRequest> WeakThis_; + TAtomicBool Canceled_; + TAtomicBool RequestSendedCompletely_; + }; + + TAtomicCounter* HttpOutConnCounter(); + + class THttpConn: public TThrRefBase { + public: + static THttpConnRef Create(TIOService& srv); + + ~THttpConn() override { + DBGOUT("~THttpConn()"); + Req_.Reset(); + HttpOutConnCounter()->Dec(); +#ifdef DEBUG_STAT + ++TDebugStat::ConnDestroyed; +#endif + } + + void StartRequest(THttpRequestRef req, const TEndpoint& ep, size_t addrId, TDuration slowConn, bool useAsyncSendRequest = false) { + { + //thread safe linking connection->request + TGuard<TSpinLock> g(SL_); + Req_ = req; + } + AddrId_ = addrId; + try { + TDuration connectDeadline(THttp2Options::ConnectTimeout); + if (THttp2Options::ConnectTimeout > slowConn) { + //use append non fatal connect deadline, so on first timedout + //report about slow connecting to THttpRequest, and continue wait ConnectDeadline_ period + connectDeadline = slowConn; + ConnectDeadline_ = THttp2Options::ConnectTimeout - slowConn; + } + DBGOUT("AsyncConnect to " << ep.IpToString()); + AS_.AsyncConnect(ep, std::bind(&THttpConn::OnConnect, THttpConnRef(this), _1, _2, useAsyncSendRequest), connectDeadline); + } catch (...) { + ReleaseRequest(); + throw; + } + } + + //start next request on keep-alive connection + bool StartNextRequest(THttpRequestRef& req, bool useAsyncSendRequest = false) { + if (Finalized_) { + return false; + } + + { + //thread safe linking connection->request + TGuard<TSpinLock> g(SL_); + Req_ = req; + } + + RequestWritten_ = false; + BeginReadResponse_ = false; + + try { + if (!useAsyncSendRequest) { + TErrorCode ec; + SendRequest(req->BuildRequest(), ec); //throw std::bad_alloc + if (ec.Value() == ECANCELED) { + OnCancel(); + } else if (ec) { + OnError(ec); + } + } else { + SendRequestAsync(req->BuildRequest()); //throw std::bad_alloc + } + } catch (...) { + OnError(CurrentExceptionMessage()); + throw; + } + return true; + } + + //connection received from cache must be validated before using + //(process removing closed conection from cache consume some time) + inline bool IsValid() const noexcept { + return !Finalized_; + } + + void SetCached(bool v) noexcept { + Cached_ = v; + } + + void Close() noexcept { + try { + Cancel(); + } catch (...) { + } + } + + void DetachRequest() noexcept { + ReleaseRequest(); + } + + void Cancel() { //throw std::bad_alloc + if (!Canceled_) { + Canceled_ = true; + Finalized_ = true; + OnCancel(); + AS_.AsyncCancel(); + } + } + + void OnCancel() { + THttpRequestRef r(ReleaseRequest()); + if (!!r) { + static const TString reqCanceled("request canceled"); + r->OnError(this, reqCanceled); + } + } + + bool RequestSendedCompletely() const noexcept { + DBGOUT("RequestSendedCompletely()"); + if (!Connected_ || !RequestWritten_) { + return false; + } + if (BeginReadResponse_) { + return true; + } +#if defined(FIONWRITE) + if (THttp2Options::EnsureSendingCompleteByAck) { + int nbytes = Max<int>(); + int err = ioctl(AS_.Native(), FIONWRITE, &nbytes); + return err ? false : nbytes == 0; + } +#endif + return true; + } + + TIOService& GetIOService() const noexcept { + return AS_.GetIOService(); + } + + private: + THttpConn(TIOService& srv) + : AddrId_(0) + , AS_(srv) + , BuffSize_(THttp2Options::InputBufferSize) + , Connected_(false) + , Cached_(false) + , Canceled_(false) + , Finalized_(false) + , InAsyncRead_(false) + , RequestWritten_(false) + , BeginReadResponse_(false) + { + HttpOutConnCounter()->Inc(); + } + + //can be called only from asio + void OnConnect(const TErrorCode& ec, IHandlingContext& ctx, bool useAsyncSendRequest = false) { + DBGOUT("THttpConn::OnConnect: " << ec.Value()); + if (Y_UNLIKELY(ec)) { + if (ec.Value() == ETIMEDOUT && ConnectDeadline_.GetValue()) { + //detect slow connecting (yet not reached final timeout) + DBGOUT("OnConnectTimingCheck"); + THttpRequestRef req(GetRequest()); + if (!req) { + return; //cancel from client thread can ahead us + } + TDuration newDeadline(ConnectDeadline_); + ConnectDeadline_ = TDuration::Zero(); //next timeout is final + + req->OnDetectSlowConnecting(); + //continue wait connect + ctx.ContinueUseHandler(newDeadline); + + return; + } +#ifdef DEBUG_STAT + if (ec.Value() != ECANCELED) { + ++TDebugStat::ConnFailed; + } else { + ++TDebugStat::ConnConnCanceled; + } +#endif + if (ec.Value() == EIO) { + //try get more detail error info + char buf[1]; + TErrorCode errConnect; + AS_.ReadSome(buf, 1, errConnect); + OnConnectFailed(errConnect.Value() ? errConnect : ec); + } else if (ec.Value() == ECANCELED) { + // not try connecting to next host ip addr, simple fail + OnError(ec); + } else { + OnConnectFailed(ec); + } + } else { + Connected_ = true; + + THttpRequestRef req(GetRequest()); + if (!req || Canceled_) { + return; + } + + try { + PrepareSocket(AS_.Native(), req->RequestSettings()); + if (THttp2Options::TcpKeepAlive) { + SetKeepAlive(AS_.Native(), true); + } + } catch (TSystemError& err) { + TErrorCode ec2(err.Status()); + OnError(ec2); + return; + } + + req->OnConnect(this); + + THttpRequestBuffersPtr ptr(req->BuildRequest()); + PrepareParser(); + + if (!useAsyncSendRequest) { + TErrorCode ec3; + SendRequest(ptr, ec3); + if (ec3) { + OnError(ec3); + } + } else { + SendRequestAsync(ptr); + } + } + } + + void PrepareParser() { + Prs_ = new THttpParser(); + Prs_->Prepare(); + } + + void SendRequest(const THttpRequestBuffersPtr& bfs, TErrorCode& ec) { //throw std::bad_alloc + if (!THttp2Options::UseAsyncSendRequest) { + size_t amount = AS_.WriteSome(*bfs->GetIOvec(), ec); + + if (ec && ec.Value() != EAGAIN && ec.Value() != EWOULDBLOCK && ec.Value() != EINPROGRESS) { + return; + } + ec.Assign(0); + + bfs->GetIOvec()->Proceed(amount); + + if (bfs->GetIOvec()->Complete()) { + RequestWritten_ = true; + StartRead(); + } else { + SendRequestAsync(bfs); + } + } else { + SendRequestAsync(bfs); + } + } + + void SendRequestAsync(const THttpRequestBuffersPtr& bfs) { + NAsio::TTcpSocket::TSendedData sd(bfs.Release()); + AS_.AsyncWrite(sd, std::bind(&THttpConn::OnWrite, THttpConnRef(this), _1, _2, _3), THttp2Options::OutputDeadline); + } + + void OnWrite(const TErrorCode& err, size_t amount, IHandlingContext& ctx) { + Y_UNUSED(amount); + Y_UNUSED(ctx); + if (err) { + OnError(err); + } else { + DBGOUT("OnWrite()"); + RequestWritten_ = true; + StartRead(); + } + } + + inline void StartRead() { + if (!InAsyncRead_ && !Canceled_) { + InAsyncRead_ = true; + AS_.AsyncPollRead(std::bind(&THttpConn::OnCanRead, THttpConnRef(this), _1, _2), THttp2Options::InputDeadline); + } + } + + //can be called only from asio + void OnReadSome(const TErrorCode& err, size_t bytes, IHandlingContext& ctx) { + if (Y_UNLIKELY(err)) { + OnError(err); + return; + } + if (!BeginReadResponse_) { + //used in MessageSendedCompletely() + BeginReadResponse_ = true; + THttpRequestRef r(GetRequest()); + if (!!r) { + r->OnBeginRead(); + } + } + DBGOUT("receive:" << TStringBuf(Buff_.Get(), bytes)); + try { + if (!Prs_) { + throw yexception() << TStringBuf("receive some data while not in request"); + } + +#if defined(_linux_) + if (THttp2Options::QuickAck) { + SetSockOpt(AS_.Native(), SOL_TCP, TCP_QUICKACK, (int)1); + } +#endif + + DBGOUT("parse:"); + while (!Prs_->Parse(Buff_.Get(), bytes)) { + if (BuffSize_ == bytes) { + TErrorCode ec; + bytes = AS_.ReadSome(Buff_.Get(), BuffSize_, ec); + + if (!ec) { + continue; + } + + if (ec.Value() != EAGAIN && ec.Value() != EWOULDBLOCK) { + OnError(ec); + + return; + } + } + //continue async. read from socket + ctx.ContinueUseHandler(THttp2Options::InputDeadline); + + return; + } + + //succesfully reach end of http response + THttpRequestRef r(ReleaseRequest()); + if (!r) { + //lost race to req. canceling + DBGOUT("connection failed"); + return; + } + + DBGOUT("response:"); + bool keepALive = Prs_->IsKeepAlive(); + + r->OnResponse(Prs_); + + if (!keepALive) { + return; + } + + //continue use connection (keep-alive mode) + PrepareParser(); + + if (!THttp2Options::KeepInputBufferForCachedConnections) { + Buff_.Destroy(); + } + //continue async. read from socket + ctx.ContinueUseHandler(THttp2Options::InputDeadline); + + PutSelfToCache(); + } catch (...) { + OnError(CurrentExceptionMessage()); + } + } + + void PutSelfToCache(); + + //method for reaction on input data for re-used keep-alive connection, + //which free/release buffer when was placed in cache + void OnCanRead(const TErrorCode& err, IHandlingContext& ctx) { + if (Y_UNLIKELY(err)) { + OnError(err); + } else { + if (!Buff_) { + Buff_.Reset(new char[BuffSize_]); + } + TErrorCode ec; + OnReadSome(ec, AS_.ReadSome(Buff_.Get(), BuffSize_, ec), ctx); + } + } + + //unlink connection and request, thread-safe mark connection as non valid + inline THttpRequestRef GetRequest() noexcept { + TGuard<TSpinLock> g(SL_); + return Req_; + } + + inline THttpRequestRef ReleaseRequest() noexcept { + THttpRequestRef r; + { + TGuard<TSpinLock> g(SL_); + r.Swap(Req_); + } + return r; + } + + void OnConnectFailed(const TErrorCode& ec); + + inline void OnError(const TErrorCode& ec) { + OnError(ec.Text()); + } + + inline void OnError(const TString& errText); + + size_t AddrId_; + NAsio::TTcpSocket AS_; + TArrayHolder<char> Buff_; //input buffer + const size_t BuffSize_; + + TAutoPtr<THttpParser> Prs_; //input data parser & parsed info storage + + TSpinLock SL_; + THttpRequestRef Req_; //current request + TDuration ConnectDeadline_; + TAtomicBool Connected_; + TAtomicBool Cached_; + TAtomicBool Canceled_; + TAtomicBool Finalized_; + + bool InAsyncRead_; + TAtomicBool RequestWritten_; + TAtomicBool BeginReadResponse_; + }; + + //conn limits monitoring, cache clean, contain used in http clients asio threads/executors + class THttpConnManager: public IThreadFactory::IThreadAble { + public: + THttpConnManager() + : TotalConn(0) + , EP_(THttp2Options::AsioThreads) + , InPurging_(0) + , MaxConnId_(0) + , Shutdown_(false) + { + T_ = SystemThreadFactory()->Run(this); + Limits.SetSoft(40000); + Limits.SetHard(50000); + } + + ~THttpConnManager() override { + { + TGuard<TMutex> g(PurgeMutex_); + + Shutdown_ = true; + CondPurge_.Signal(); + } + + EP_.SyncShutdown(); + + T_->Join(); + } + + inline void SetLimits(size_t softLimit, size_t hardLimit) noexcept { + Limits.SetSoft(softLimit); + Limits.SetHard(hardLimit); + } + + inline std::pair<size_t, size_t> GetLimits() const noexcept { + return {Limits.Soft(), Limits.Hard()}; + } + + inline void CheckLimits() { + if (ExceedSoftLimit()) { + SuggestPurgeCache(); + + if (ExceedHardLimit()) { + Y_FAIL("neh::http2 output connections limit reached"); + //ythrow yexception() << "neh::http2 output connections limit reached"; + } + } + } + + inline bool Get(THttpConnRef& conn, size_t addrId) { +#ifdef DEBUG_STAT + TDebugStat::ConnTotal.store(TotalConn.Val(), std::memory_order_release); + TDebugStat::ConnActive(Active(), std::memory_order_release); + TDebugStat::ConnCached(Cache_.Size(), std::memory_order_release); +#endif + return Cache_.Get(conn, addrId); + } + + inline void Put(THttpConnRef& conn, size_t addrId) { + if (Y_LIKELY(!Shutdown_ && !ExceedHardLimit() && !CacheDisabled())) { + if (Y_UNLIKELY(addrId > (size_t)AtomicGet(MaxConnId_))) { + AtomicSet(MaxConnId_, addrId); + } + Cache_.Put(conn, addrId); + } else { + conn->Close(); + conn.Drop(); + } + } + + inline size_t OnConnError(size_t addrId) { + return Cache_.Validate(addrId); + } + + TIOService& GetIOService() { + return EP_.GetExecutor().GetIOService(); + } + + bool CacheDisabled() const { + return Limits.Soft() == 0; + } + + bool IsShutdown() const noexcept { + return Shutdown_; + } + + TAtomicCounter TotalConn; + + private: + inline size_t Total() const noexcept { + return TotalConn.Val(); + } + + inline size_t Active() const noexcept { + return TFdLimits::ExceedLimit(Total(), Cache_.Size()); + } + + inline size_t ExceedSoftLimit() const noexcept { + return TFdLimits::ExceedLimit(Total(), Limits.Soft()); + } + + inline size_t ExceedHardLimit() const noexcept { + return TFdLimits::ExceedLimit(Total(), Limits.Hard()); + } + + void SuggestPurgeCache() { + if (AtomicTryLock(&InPurging_)) { + //evaluate the usefulness of purging the cache + //если в кеше мало соединений (< MaxConnId_/16 или 64), не чистим кеш + if (Cache_.Size() > (Min((size_t)AtomicGet(MaxConnId_), (size_t)1024U) >> 4)) { + //по мере приближения к hardlimit нужда в чистке cache приближается к 100% + size_t closenessToHardLimit256 = ((Active() + 1) << 8) / (Limits.Delta() + 1); + //чем больше соединений в кеше, а не в работе, тем менее нужен кеш (можно его почистить) + size_t cacheUselessness256 = ((Cache_.Size() + 1) << 8) / (Active() + 1); + + //итого, - пороги срабатывания: + //при достижении soft-limit, если соединения в кеше, а не в работе + //на полпути от soft-limit к hard-limit, если в кеше больше половины соединений + //при приближении к hardlimit пытаться почистить кеш почти постоянно + if ((closenessToHardLimit256 + cacheUselessness256) >= 256U) { + TGuard<TMutex> g(PurgeMutex_); + + CondPurge_.Signal(); + return; //memo: thread MUST unlock InPurging_ (see DoExecute()) + } + } + AtomicUnlock(&InPurging_); + } + } + + void DoExecute() override { + while (true) { + { + TGuard<TMutex> g(PurgeMutex_); + + if (Shutdown_) + return; + + CondPurge_.WaitI(PurgeMutex_); + } + + PurgeCache(); + + AtomicUnlock(&InPurging_); + } + } + + void PurgeCache() noexcept { + //try remove at least ExceedSoftLimit() oldest connections from cache + //вычисляем долю кеша, которую нужно почистить (в 256 долях) (но не менее 1/32 кеша) + size_t frac256 = Min(size_t(Max(size_t(256U / 32U), (ExceedSoftLimit() << 8) / (Cache_.Size() + 1))), (size_t)256U); + + size_t processed = 0; + size_t maxConnId = AtomicGet(MaxConnId_); + for (size_t i = 0; i <= maxConnId && !Shutdown_; ++i) { + processed += Cache_.Purge(i, frac256); + if (processed > 32) { +#ifdef DEBUG_STAT + TDebugStat::ConnPurgedInCache += processed; +#endif + processed = 0; + Sleep(TDuration::MilliSeconds(10)); //prevent big spike cpu/system usage + } + } + } + + TFdLimits Limits; + TExecutorsPool EP_; + + TConnCache<THttpConn> Cache_; + TAtomic InPurging_; + TAtomic MaxConnId_; + + TAutoPtr<IThreadFactory::IThread> T_; + TCondVar CondPurge_; + TMutex PurgeMutex_; + TAtomicBool Shutdown_; + }; + + THttpConnManager* HttpConnManager() { + return Singleton<THttpConnManager>(); + } + + TAtomicCounter* HttpOutConnCounter() { + return &HttpConnManager()->TotalConn; + } + + THttpConnRef THttpConn::Create(TIOService& srv) { + if (HttpConnManager()->IsShutdown()) { + throw yexception() << "can't create connection with shutdowned service"; + } + + return new THttpConn(srv); + } + + void THttpConn::PutSelfToCache() { + THttpConnRef c(this); + HttpConnManager()->Put(c, AddrId_); + } + + void THttpConn::OnConnectFailed(const TErrorCode& ec) { + THttpRequestRef r(GetRequest()); + if (!!r) { + r->OnConnectFailed(this, ec); + } + OnError(ec); + } + + void THttpConn::OnError(const TString& errText) { + Finalized_ = true; + if (Connected_) { + Connected_ = false; + TErrorCode ec; + AS_.Shutdown(NAsio::TTcpSocket::ShutdownBoth, ec); + } else { + if (AS_.IsOpen()) { + AS_.AsyncCancel(); + } + } + THttpRequestRef r(ReleaseRequest()); + if (!!r) { + r->OnError(this, errText); + } else { + if (Cached_) { + size_t res = HttpConnManager()->OnConnError(AddrId_); + Y_UNUSED(res); +#ifdef DEBUG_STAT + TDebugStat::ConnDestroyedInCache += res; +#endif + } + } + } + + void THttpRequest::Run(THttpRequestRef& req) { +#ifdef DEBUG_STAT + if ((++TDebugStat::RequestTotal & 0xFFF) == 0) { + TDebugStat::Print(); + } +#endif + try { + while (!Canceled_) { + THttpConnRef conn; + if (HttpConnManager()->Get(conn, Addr_->Id)) { + DBGOUT("Use connection from cache"); + Conn_ = conn; //thread magic + if (!conn->StartNextRequest(req, RequestSettings_.UseAsyncSendRequest)) { + continue; //if use connection from cache, ignore write error and try another conn + } + } else { + HttpConnManager()->CheckLimits(); //here throw exception if reach hard limit (or atexit() state) + Conn_ = THttpConn::Create(HttpConnManager()->GetIOService()); + TEndpoint ep(new NAddr::TAddrInfo(&*AddrIter_)); + Conn_->StartRequest(req, ep, Addr_->Id, THttp2Options::SymptomSlowConnect); // can throw + } + break; + } + } catch (...) { + Conn_.Reset(); + throw; + } + } + + //it seems we have lost TCP SYN packet, create extra connection for decrease response time + void THttpRequest::OnDetectSlowConnecting() { +#ifdef DEBUG_STAT + ++TDebugStat::ConnSlow; +#endif + //use some io_service (Run() thread-executor), from first conn. for more thread safety + THttpConnRef conn = GetConn(); + + if (!conn) { + return; + } + + THttpConnRef conn2; + try { + conn2 = THttpConn::Create(conn->GetIOService()); + } catch (...) { + return; // cant create spare connection, simple continue use only main + } + + { + TGuard<TSpinLock> g(SL_); + Conn2_ = conn2; + } + + if (Y_UNLIKELY(Canceled_)) { + ReleaseConn2(); + } else { + //use connect timeout for disable detecting slow connecting on second conn. + TEndpoint ep(new NAddr::TAddrInfo(&*Addr_->Addr.Begin())); + try { + conn2->StartRequest(WeakThis_, ep, Addr_->Id, THttp2Options::ConnectTimeout); + } catch (...) { + // ignore errors on spare connection + ReleaseConn2(); + } + } + } + + void THttpRequest::OnConnect(THttpConn* c) { + THttpConnRef extraConn; + { + TGuard<TSpinLock> g(SL_); + if (Y_UNLIKELY(!!Conn2_)) { + //has pair concurrent conn, 'should stay only one' + if (Conn2_.Get() == c) { +#ifdef DEBUG_STAT + ++TDebugStat::Conn2Success; +#endif + Conn2_.Swap(Conn_); + } + extraConn.Swap(Conn2_); + } + } + if (!!extraConn) { + extraConn->DetachRequest(); //prevent call OnError() + extraConn->Close(); + } + } + + void THttpRequest::OnResponse(TAutoPtr<THttpParser>& rsp) { + DBGOUT("THttpRequest::OnResponse()"); + ReleaseConn(); + if (Y_LIKELY(((rsp->RetCode() >= 200 && rsp->RetCode() < (!THttp2Options::RedirectionNotError ? 300 : 400)) || THttp2Options::AnyResponseIsNotError))) { + NotifyResponse(rsp->DecodedContent(), rsp->FirstLine(), rsp->Headers()); + } else { + TString message; + + if (THttp2Options::FullHeadersAsErrorMessage) { + TStringStream err; + err << rsp->FirstLine(); + + THttpHeaders hdrs = rsp->Headers(); + for (auto h = hdrs.begin(); h < hdrs.end(); h++) { + err << h->ToString() << TStringBuf("\r\n"); + } + + message = err.Str(); + } else if (THttp2Options::UseResponseAsErrorMessage) { + message = rsp->DecodedContent(); + } else { + TStringStream err; + err << TStringBuf("request failed(") << rsp->FirstLine() << TStringBuf(")"); + message = err.Str(); + } + + NotifyError(new TError(message, TError::ProtocolSpecific, rsp->RetCode()), rsp.Get()); + } + } + + void THttpRequest::OnConnectFailed(THttpConn* c, const TErrorCode& ec) { + DBGOUT("THttpRequest::OnConnectFailed()"); + //detach/discard failed conn, try connect to next ip addr (if can) + THttpConnRef cc(GetConn()); + if (c != cc.Get() || AddrIter_ == Addr_->Addr.End() || ++AddrIter_ == Addr_->Addr.End() || Canceled_) { + return OnSystemError(c, ec); + } + // can try next host addr + c->DetachRequest(); + c->Close(); + THttpConnRef nextConn; + try { + nextConn = THttpConn::Create(HttpConnManager()->GetIOService()); + } catch (...) { + OnSystemError(nullptr, ec); + return; + } + { + THttpConnRef nc = nextConn; + TGuard<TSpinLock> g(SL_); + Conn_.Swap(nc); + } + TEndpoint ep(new NAddr::TAddrInfo(&*AddrIter_)); + try { + nextConn->StartRequest(WeakThis_, ep, Addr_->Id, THttp2Options::SymptomSlowConnect); + } catch (...) { + OnError(nullptr, CurrentExceptionMessage()); + return; + } + + if (Canceled_) { + OnError(nullptr, "canceled"); + } + } + + void THttpRequest::OnSystemError(THttpConn* c, const TErrorCode& ec) { + DBGOUT("THttpRequest::OnSystemError()"); + NotifyError(ec.Text(), TError::TType::UnknownType, 0, ec.Value()); + Finalize(c); + } + + void THttpRequest::OnError(THttpConn* c, const TString& errorText) { + DBGOUT("THttpRequest::OnError()"); + NotifyError(errorText); + Finalize(c); + } + + bool THttpRequest::RequestSendedCompletely() noexcept { + if (RequestSendedCompletely_) { + return true; + } + + THttpConnRef c(GetConn()); + return !!c ? c->RequestSendedCompletely() : false; + } + + void THttpRequest::Cancel() noexcept { + if (!Canceled_) { + Canceled_ = true; + try { + static const TString canceled("Canceled"); + NotifyError(canceled, TError::Cancelled); + Finalize(); + } catch (...) { + } + } + } + + inline void FinalizeConn(THttpConnRef& c, THttpConn* skipConn) noexcept { + if (!!c && c.Get() != skipConn) { + c->DetachRequest(); + c->Close(); + } + } + + void THttpRequest::Finalize(THttpConn* skipConn) noexcept { + THttpConnRef c1(ReleaseConn()); + FinalizeConn(c1, skipConn); + THttpConnRef c2(ReleaseConn2()); + FinalizeConn(c2, skipConn); + } + + /////////////////////////////////// server side //////////////////////////////////// + + TAtomicCounter* HttpInConnCounter() { + return Singleton<TAtomicCounter>(); + } + + TFdLimits* HttpInConnLimits() { + return Singleton<TFdLimits>(); + } + + class THttpServer: public IRequester { + typedef TAutoPtr<TTcpAcceptor> TTcpAcceptorPtr; + typedef TAtomicSharedPtr<TTcpSocket> TTcpSocketRef; + class TConn; + typedef TSharedPtrB<TConn> TConnRef; + + class TRequest: public IHttpRequest { + public: + TRequest(TWeakPtrB<TConn>& c, TAutoPtr<THttpParser>& p) + : C_(c) + , P_(p) + , RemoteHost_(C_->RemoteHost()) + , CompressionScheme_(P_->GetBestCompressionScheme()) + , H_(TStringBuf(P_->FirstLine())) + { + } + + ~TRequest() override { + if (!!C_) { + try { + C_->SendError(Id(), 503, "service unavailable (request ignored)", P_->HttpVersion(), {}); + } catch (...) { + DBGOUT("~TRequest()::SendFail() exception"); + } + } + } + + TAtomicBase Id() const { + return Id_; + } + + protected: + TStringBuf Scheme() const override { + return TStringBuf("http"); + } + + TString RemoteHost() const override { + return RemoteHost_; + } + + TStringBuf Service() const override { + return TStringBuf(H_.Path).Skip(1); + } + + const THttpHeaders& Headers() const override { + return P_->Headers(); + } + + TStringBuf Method() const override { + return H_.Method; + } + + TStringBuf Body() const override { + return P_->DecodedContent(); + } + + TStringBuf Cgi() const override { + return H_.Cgi; + } + + TStringBuf RequestId() const override { + return TStringBuf(); + } + + bool Canceled() const override { + if (!C_) { + return false; + } + return C_->IsCanceled(); + } + + void SendReply(TData& data) override { + SendReply(data, TString(), HttpCodes::HTTP_OK); + } + + void SendReply(TData& data, const TString& headers, int httpCode) override { + if (!!C_) { + C_->Send(Id(), data, CompressionScheme_, P_->HttpVersion(), headers, httpCode); + C_.Reset(); + } + } + + void SendError(TResponseError err, const THttpErrorDetails& details) override { + static const unsigned errorToHttpCode[IRequest::MaxResponseError] = + { + 400, + 403, + 404, + 429, + 500, + 501, + 502, + 503, + 509}; + + if (!!C_) { + C_->SendError(Id(), errorToHttpCode[err], details.Details, P_->HttpVersion(), details.Headers); + C_.Reset(); + } + } + + static TAtomicBase NextId() { + static TAtomic idGenerator = 0; + TAtomicBase id = 0; + do { + id = AtomicIncrement(idGenerator); + } while (!id); + return id; + } + + TSharedPtrB<TConn> C_; + TAutoPtr<THttpParser> P_; + TString RemoteHost_; + TString CompressionScheme_; + TParsedHttpFull H_; + TAtomicBase Id_ = NextId(); + }; + + class TRequestGet: public TRequest { + public: + TRequestGet(TWeakPtrB<TConn>& c, TAutoPtr<THttpParser> p) + : TRequest(c, p) + { + } + + TStringBuf Data() const override { + return H_.Cgi; + } + }; + + class TRequestPost: public TRequest { + public: + TRequestPost(TWeakPtrB<TConn>& c, TAutoPtr<THttpParser> p) + : TRequest(c, p) + { + } + + TStringBuf Data() const override { + return P_->DecodedContent(); + } + }; + + class TConn { + private: + TConn(THttpServer& hs, const TTcpSocketRef& s) + : HS_(hs) + , AS_(s) + , RemoteHost_(NNeh::PrintHostByRfc(*AS_->RemoteEndpoint().Addr())) + , BuffSize_(THttp2Options::InputBufferSize) + , Buff_(new char[BuffSize_]) + , Canceled_(false) + , LeftRequestsToDisconnect_(hs.LimitRequestsPerConnection) + { + DBGOUT("THttpServer::TConn()"); + HS_.OnCreateConn(); + } + + inline TConnRef SelfRef() noexcept { + return WeakThis_; + } + + public: + static void Create(THttpServer& hs, const TTcpSocketRef& s) { + TSharedPtrB<TConn> conn(new TConn(hs, s)); + conn->WeakThis_ = conn; + conn->ExpectNewRequest(); + conn->AS_->AsyncPollRead(std::bind(&TConn::OnCanRead, conn, _1, _2), THttp2Options::ServerInputDeadline); + } + + ~TConn() { + DBGOUT("~THttpServer::TConn(" << (!AS_ ? -666 : AS_->Native()) << ")"); + HS_.OnDestroyConn(); + } + + private: + void ExpectNewRequest() { + P_.Reset(new THttpParser(THttpParser::Request)); + P_->Prepare(); + } + + void OnCanRead(const TErrorCode& ec, IHandlingContext& ctx) { + if (ec) { + OnError(); + } else { + TErrorCode ec2; + OnReadSome(ec2, AS_->ReadSome(Buff_.Get(), BuffSize_, ec2), ctx); + } + } + + void OnError() { + DBGOUT("Srv OnError(" << (!AS_ ? -666 : AS_->Native()) << ")"); + Canceled_ = true; + AS_->AsyncCancel(); + } + + void OnReadSome(const TErrorCode& ec, size_t amount, IHandlingContext& ctx) { + if (ec || !amount) { + OnError(); + + return; + } + + DBGOUT("ReadSome(" << (!AS_ ? -666 : AS_->Native()) << "): " << amount); + try { + size_t buffPos = 0; + //DBGOUT("receive and parse: " << TStringBuf(Buff_.Get(), amount)); + while (P_->Parse(Buff_.Get() + buffPos, amount - buffPos)) { + SeenMessageWithoutKeepalive_ |= !P_->IsKeepAlive() || LeftRequestsToDisconnect_ == 1; + char rt = *P_->FirstLine().data(); + const size_t extraDataSize = P_->GetExtraDataSize(); + if (rt == 'P' || rt == 'p') { + OnRequest(new TRequestPost(WeakThis_, P_)); + } else { + OnRequest(new TRequestGet(WeakThis_, P_)); + } + if (extraDataSize) { + // has http pipelining + buffPos = amount - extraDataSize; + ExpectNewRequest(); + } else { + ExpectNewRequest(); + ctx.ContinueUseHandler(HS_.GetKeepAliveTimeout()); + return; + } + } + ctx.ContinueUseHandler(THttp2Options::ServerInputDeadline); + } catch (...) { + OnError(); + } + } + + void OnRequest(TRequest* r) { + DBGOUT("OnRequest()"); + if (AtomicGet(PrimaryResponse_)) { + // has pipelining + PipelineOrder_.Enqueue(r->Id()); + } else { + AtomicSet(PrimaryResponse_, r->Id()); + } + HS_.OnRequest(r); + OnRequestDone(); + } + + void OnRequestDone() { + DBGOUT("OnRequestDone()"); + if (LeftRequestsToDisconnect_ > 0) { + --LeftRequestsToDisconnect_; + } + } + + static void PrintHttpVersion(IOutputStream& out, const THttpVersion& ver) { + out << TStringBuf("HTTP/") << ver.Major << TStringBuf(".") << ver.Minor; + } + + struct TResponseData : TThrRefBase { + TResponseData(size_t reqId, TTcpSocket::TSendedData data) + : RequestId_(reqId) + , Data_(data) + { + } + + size_t RequestId_; + TTcpSocket::TSendedData Data_; + }; + typedef TIntrusivePtr<TResponseData> TResponseDataRef; + + public: + //called non thread-safe (from outside thread) + void Send(TAtomicBase requestId, TData& data, const TString& compressionScheme, const THttpVersion& ver, const TString& headers, int httpCode) { + class THttpResponseFormatter { + public: + THttpResponseFormatter(TData& theData, const TString& contentEncoding, const THttpVersion& theVer, const TString& theHeaders, int theHttpCode, bool closeConnection) { + Header.Reserve(128 + contentEncoding.size() + theHeaders.size()); + PrintHttpVersion(Header, theVer); + Header << TStringBuf(" ") << theHttpCode << ' ' << HttpCodeStr(theHttpCode); + if (Compress(theData, contentEncoding)) { + Header << TStringBuf("\r\nContent-Encoding: ") << contentEncoding; + } + Header << TStringBuf("\r\nContent-Length: ") << theData.size(); + if (closeConnection) { + Header << TStringBuf("\r\nConnection: close"); + } else if (Y_LIKELY(theVer.Major > 1 || theVer.Minor > 0)) { + // since HTTP/1.1 Keep-Alive is default behaviour + Header << TStringBuf("\r\nConnection: Keep-Alive"); + } + if (theHeaders) { + Header << theHeaders; + } + Header << TStringBuf("\r\n\r\n"); + + Body.swap(theData); + + Parts[0].buf = Header.Data(); + Parts[0].len = Header.Size(); + Parts[1].buf = Body.data(); + Parts[1].len = Body.size(); + } + + TStringStream Header; + TData Body; + IOutputStream::TPart Parts[2]; + }; + + class TBuffers: public THttpResponseFormatter, public TTcpSocket::IBuffers { + public: + TBuffers(TData& theData, const TString& contentEncoding, const THttpVersion& theVer, const TString& theHeaders, int theHttpCode, bool closeConnection) + : THttpResponseFormatter(theData, contentEncoding, theVer, theHeaders, theHttpCode, closeConnection) + , IOVec(Parts, 2) + { + } + + TContIOVector* GetIOvec() override { + return &IOVec; + } + + TContIOVector IOVec; + }; + + TTcpSocket::TSendedData sd(new TBuffers(data, compressionScheme, ver, headers, httpCode, SeenMessageWithoutKeepalive_)); + SendData(requestId, sd); + } + + //called non thread-safe (from outside thread) + void SendError(TAtomicBase requestId, unsigned httpCode, const TString& descr, const THttpVersion& ver, const TString& headers) { + if (Canceled_) { + return; + } + + class THttpErrorResponseFormatter { + public: + THttpErrorResponseFormatter(unsigned theHttpCode, const TString& theDescr, const THttpVersion& theVer, bool closeConnection, const TString& headers) { + PrintHttpVersion(Answer, theVer); + Answer << TStringBuf(" ") << theHttpCode << TStringBuf(" "); + if (theDescr.size() && !THttp2Options::ErrorDetailsAsResponseBody) { + // Reason-Phrase = *<TEXT, excluding CR, LF> + // replace bad chars to '.' + TString reasonPhrase = theDescr; + for (TString::iterator it = reasonPhrase.begin(); it != reasonPhrase.end(); ++it) { + char& ch = *it; + if (ch == ' ') { + continue; + } + if (((ch & 31) == ch) || static_cast<unsigned>(ch) == 127 || (static_cast<unsigned>(ch) & 0x80)) { + //CTLs || DEL(127) || non ascii + // (ch <= 32) || (ch >= 127) + ch = '.'; + } + } + Answer << reasonPhrase; + } else { + Answer << HttpCodeStr(static_cast<int>(theHttpCode)); + } + + if (closeConnection) { + Answer << TStringBuf("\r\nConnection: close"); + } + + if (headers) { + Answer << "\r\n" << headers; + } + + if (THttp2Options::ErrorDetailsAsResponseBody) { + Answer << TStringBuf("\r\nContent-Length:") << theDescr.size() << "\r\n\r\n" << theDescr; + } else { + Answer << "\r\n" + "Content-Length:0\r\n\r\n"sv; + } + + Parts[0].buf = Answer.Data(); + Parts[0].len = Answer.Size(); + } + + TStringStream Answer; + IOutputStream::TPart Parts[1]; + }; + + class TBuffers: public THttpErrorResponseFormatter, public TTcpSocket::IBuffers { + public: + TBuffers( + unsigned theHttpCode, + const TString& theDescr, + const THttpVersion& theVer, + bool closeConnection, + const TString& headers + ) + : THttpErrorResponseFormatter(theHttpCode, theDescr, theVer, closeConnection, headers) + , IOVec(Parts, 1) + { + } + + TContIOVector* GetIOvec() override { + return &IOVec; + } + + TContIOVector IOVec; + }; + + TTcpSocket::TSendedData sd(new TBuffers(httpCode, descr, ver, SeenMessageWithoutKeepalive_, headers)); + SendData(requestId, sd); + } + + void ProcessPipeline() { + // on successfull response to current (PrimaryResponse_) request + TAtomicBase requestId; + if (PipelineOrder_.Dequeue(&requestId)) { + TAtomicBase oldReqId; + do { + oldReqId = AtomicGet(PrimaryResponse_); + Y_VERIFY(oldReqId, "race inside http pipelining"); + } while (!AtomicCas(&PrimaryResponse_, requestId, oldReqId)); + + ProcessResponsesData(); + } else { + TAtomicBase oldReqId = AtomicGet(PrimaryResponse_); + if (oldReqId) { + while (!AtomicCas(&PrimaryResponse_, 0, oldReqId)) { + Y_VERIFY(oldReqId == AtomicGet(PrimaryResponse_), "race inside http pipelining [2]"); + } + } + } + } + + void ProcessResponsesData() { + // process responses data queue, send response (if already have next PrimaryResponse_) + TResponseDataRef rd; + while (ResponsesDataQueue_.Dequeue(&rd)) { + ResponsesData_[rd->RequestId_] = rd; + } + TAtomicBase requestId = AtomicGet(PrimaryResponse_); + if (requestId) { + THashMap<TAtomicBase, TResponseDataRef>::iterator it = ResponsesData_.find(requestId); + if (it != ResponsesData_.end()) { + // has next primary response + rd = it->second; + ResponsesData_.erase(it); + AS_->AsyncWrite(rd->Data_, std::bind(&TConn::OnSend, SelfRef(), _1, _2, _3), THttp2Options::ServerOutputDeadline); + } + } + } + + private: + void SendData(TAtomicBase requestId, TTcpSocket::TSendedData sd) { + TContIOVector& vec = *sd->GetIOvec(); + + if (requestId != AtomicGet(PrimaryResponse_)) { + // already has another request for response first, so push this to queue + // + enqueue event for safe checking queue (at local/transport thread) + TResponseDataRef rdr = new TResponseData(requestId, sd); + ResponsesDataQueue_.Enqueue(rdr); + AS_->GetIOService().Post(std::bind(&TConn::ProcessResponsesData, SelfRef())); + return; + } + if (THttp2Options::ServerUseDirectWrite) { + vec.Proceed(AS_->WriteSome(vec)); + } + if (!vec.Complete()) { + DBGOUT("AsyncWrite()"); + AS_->AsyncWrite(sd, std::bind(&TConn::OnSend, SelfRef(), _1, _2, _3), THttp2Options::ServerOutputDeadline); + } else { + // run ProcessPipeline at safe thread + AS_->GetIOService().Post(std::bind(&TConn::ProcessPipeline, SelfRef())); + } + } + + void OnSend(const TErrorCode& ec, size_t amount, IHandlingContext&) { + Y_UNUSED(amount); + if (ec) { + OnError(); + } else { + ProcessPipeline(); + } + + if (SeenMessageWithoutKeepalive_) { + TErrorCode shutdown_ec; + AS_->Shutdown(TTcpSocket::ShutdownBoth, shutdown_ec); + } + } + + public: + bool IsCanceled() const noexcept { + return Canceled_; + } + + const TString& RemoteHost() const noexcept { + return RemoteHost_; + } + + private: + TWeakPtrB<TConn> WeakThis_; + THttpServer& HS_; + TTcpSocketRef AS_; + TString RemoteHost_; + size_t BuffSize_; + TArrayHolder<char> Buff_; + TAutoPtr<THttpParser> P_; + // pipeline supporting + TAtomic PrimaryResponse_ = 0; + TLockFreeQueue<TAtomicBase> PipelineOrder_; + TLockFreeQueue<TResponseDataRef> ResponsesDataQueue_; + THashMap<TAtomicBase, TResponseDataRef> ResponsesData_; + + TAtomicBool Canceled_; + bool SeenMessageWithoutKeepalive_ = false; + + i32 LeftRequestsToDisconnect_ = -1; + }; + + /////////////////////////////////////////////////////////// + + public: + THttpServer(IOnRequest* cb, const TParsedLocation& loc) + : E_(THttp2Options::AsioServerThreads) + , CB_(cb) + , LimitRequestsPerConnection(THttp2Options::LimitRequestsPerConnection) + { + TNetworkAddress addr(loc.GetPort()); + + for (TNetworkAddress::TIterator it = addr.Begin(); it != addr.End(); ++it) { + TEndpoint ep(new NAddr::TAddrInfo(&*it)); + TTcpAcceptorPtr a(new TTcpAcceptor(AcceptExecutor_.GetIOService())); + DBGOUT("bind:" << ep.IpToString() << ":" << ep.Port()); + a->Bind(ep); + a->Listen(THttp2Options::Backlog); + StartAccept(a.Get()); + A_.push_back(a); + } + } + + ~THttpServer() override { + AcceptExecutor_.SyncShutdown(); //cancel operation for all current sockets (include acceptors) + A_.clear(); //stop listening + E_.SyncShutdown(); + } + + void OnAccept(TTcpAcceptor* a, TAtomicSharedPtr<TTcpSocket> s, const TErrorCode& ec, IHandlingContext&) { + if (Y_UNLIKELY(ec)) { + if (ec.Value() == ECANCELED) { + return; + } else if (ec.Value() == EMFILE || ec.Value() == ENFILE || ec.Value() == ENOMEM || ec.Value() == ENOBUFS) { + //reach some os limit, suspend accepting + TAtomicSharedPtr<TDeadlineTimer> dt(new TDeadlineTimer(a->GetIOService())); + dt->AsyncWaitExpireAt(TDuration::Seconds(30), std::bind(&THttpServer::OnTimeoutSuspendAccept, this, a, dt, _1, _2)); + return; + } else { + Cdbg << "acc: " << ec.Text() << Endl; + } + } else { + if (static_cast<size_t>(HttpInConnCounter()->Val()) < HttpInConnLimits()->Hard()) { + try { + SetNonBlock(s->Native()); + PrepareSocket(s->Native()); + TConn::Create(*this, s); + } catch (TSystemError& err) { + TErrorCode ec2(err.Status()); + Cdbg << "acc: " << ec2.Text() << Endl; + } + } //else accepted socket will be closed + } + StartAccept(a); //continue accepting + } + + void OnTimeoutSuspendAccept(TTcpAcceptor* a, TAtomicSharedPtr<TDeadlineTimer>, const TErrorCode& ec, IHandlingContext&) { + if (!ec) { + DBGOUT("resume acceptor") + StartAccept(a); + } + } + + void OnRequest(IRequest* r) { + try { + CB_->OnRequest(r); + } catch (...) { + Cdbg << CurrentExceptionMessage() << Endl; + } + } + + protected: + void OnCreateConn() noexcept { + HttpInConnCounter()->Inc(); + } + + void OnDestroyConn() noexcept { + HttpInConnCounter()->Dec(); + } + + TDuration GetKeepAliveTimeout() const noexcept { + size_t cc = HttpInConnCounter()->Val(); + TFdLimits lim(*HttpInConnLimits()); + + if (!TFdLimits::ExceedLimit(cc, lim.Soft())) { + return THttp2Options::ServerInputDeadlineKeepAliveMax; + } + + if (cc > lim.Hard()) { + cc = lim.Hard(); + } + TDuration::TValue softTuneRange = THttp2Options::ServerInputDeadlineKeepAliveMax.Seconds() - THttp2Options::ServerInputDeadlineKeepAliveMin.Seconds(); + + return TDuration::Seconds((softTuneRange * (cc - lim.Soft())) / (lim.Hard() - lim.Soft() + 1)) + THttp2Options::ServerInputDeadlineKeepAliveMin; + } + + private: + void StartAccept(TTcpAcceptor* a) { + TAtomicSharedPtr<TTcpSocket> s(new TTcpSocket(E_.Size() ? E_.GetExecutor().GetIOService() : AcceptExecutor_.GetIOService())); + a->AsyncAccept(*s, std::bind(&THttpServer::OnAccept, this, a, s, _1, _2)); + } + + TIOServiceExecutor AcceptExecutor_; + TVector<TTcpAcceptorPtr> A_; + TExecutorsPool E_; + IOnRequest* CB_; + + public: + const i32 LimitRequestsPerConnection; + }; + + template <class T> + class THttp2Protocol: public IProtocol { + public: + IRequesterRef CreateRequester(IOnRequest* cb, const TParsedLocation& loc) override { + return new THttpServer(cb, loc); + } + + THandleRef ScheduleRequest(const TMessage& msg, IOnRecv* fallback, TServiceStatRef& ss) override { + THttpRequest::THandleRef ret(new THttpRequest::THandle(fallback, msg, !ss ? nullptr : new TStatCollector(ss))); + try { + THttpRequest::Run(ret, msg, &T::Build, T::RequestSettings()); + } catch (...) { + ret->ResetOnRecv(); + throw; + } + return ret.Get(); + } + + THandleRef ScheduleAsyncRequest(const TMessage& msg, IOnRecv* fallback, TServiceStatRef& ss, bool useAsyncSendRequest) override { + THttpRequest::THandleRef ret(new THttpRequest::THandle(fallback, msg, !ss ? nullptr : new TStatCollector(ss))); + try { + auto requestSettings = T::RequestSettings(); + requestSettings.SetUseAsyncSendRequest(useAsyncSendRequest); + THttpRequest::Run(ret, msg, &T::Build, requestSettings); + } catch (...) { + ret->ResetOnRecv(); + throw; + } + return ret.Get(); + } + + TStringBuf Scheme() const noexcept override { + return T::Name(); + } + + bool SetOption(TStringBuf name, TStringBuf value) override { + return THttp2Options::Set(name, value); + } + }; +} + +namespace NNeh { + IProtocol* Http1Protocol() { + return Singleton<THttp2Protocol<TRequestGet1>>(); + } + IProtocol* Post1Protocol() { + return Singleton<THttp2Protocol<TRequestPost1>>(); + } + IProtocol* Full1Protocol() { + return Singleton<THttp2Protocol<TRequestFull1>>(); + } + IProtocol* Http2Protocol() { + return Singleton<THttp2Protocol<TRequestGet2>>(); + } + IProtocol* Post2Protocol() { + return Singleton<THttp2Protocol<TRequestPost2>>(); + } + IProtocol* Full2Protocol() { + return Singleton<THttp2Protocol<TRequestFull2>>(); + } + IProtocol* UnixSocketGetProtocol() { + return Singleton<THttp2Protocol<TRequestUnixSocketGet>>(); + } + IProtocol* UnixSocketPostProtocol() { + return Singleton<THttp2Protocol<TRequestUnixSocketPost>>(); + } + IProtocol* UnixSocketFullProtocol() { + return Singleton<THttp2Protocol<TRequestUnixSocketFull>>(); + } + + void SetHttp2OutputConnectionsLimits(size_t softLimit, size_t hardLimit) { + HttpConnManager()->SetLimits(softLimit, hardLimit); + } + + void SetHttp2InputConnectionsLimits(size_t softLimit, size_t hardLimit) { + HttpInConnLimits()->SetSoft(softLimit); + HttpInConnLimits()->SetHard(hardLimit); + } + + TAtomicBase GetHttpOutputConnectionCount() { + return HttpOutConnCounter()->Val(); + } + + std::pair<size_t, size_t> GetHttpOutputConnectionLimits() { + return HttpConnManager()->GetLimits(); + } + + TAtomicBase GetHttpInputConnectionCount() { + return HttpInConnCounter()->Val(); + } + + void SetHttp2InputConnectionsTimeouts(unsigned minSeconds, unsigned maxSeconds) { + THttp2Options::ServerInputDeadlineKeepAliveMin = TDuration::Seconds(minSeconds); + THttp2Options::ServerInputDeadlineKeepAliveMax = TDuration::Seconds(maxSeconds); + } + + class TUnixSocketResolver { + public: + NDns::TResolvedHost* Resolve(const TString& path) { + TString unixSocketPath = path; + if (path.size() > 2 && path[0] == '[' && path[path.size() - 1] == ']') { + unixSocketPath = path.substr(1, path.size() - 2); + } + + if (auto resolvedUnixSocket = ResolvedUnixSockets_.FindPtr(unixSocketPath)) { + return resolvedUnixSocket->Get(); + } + + TNetworkAddress na{TUnixSocketPath(unixSocketPath)}; + ResolvedUnixSockets_[unixSocketPath] = MakeHolder<NDns::TResolvedHost>(unixSocketPath, na); + + return ResolvedUnixSockets_[unixSocketPath].Get(); + } + + private: + THashMap<TString, THolder<NDns::TResolvedHost>> ResolvedUnixSockets_; + }; + + TUnixSocketResolver* UnixSocketResolver() { + return FastTlsSingleton<TUnixSocketResolver>(); + } + + const NDns::TResolvedHost* Resolve(const TStringBuf host, ui16 port, NHttp::EResolverType resolverType) { + if (resolverType == EResolverType::EUNIXSOCKET) { + return UnixSocketResolver()->Resolve(TString(host)); + } + return NDns::CachedResolve(NDns::TResolveInfo(host, port)); + + } +} |