aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/neh/http2.cpp
diff options
context:
space:
mode:
authormonster <monster@ydb.tech>2022-07-07 14:41:37 +0300
committermonster <monster@ydb.tech>2022-07-07 14:41:37 +0300
commit06e5c21a835c0e923506c4ff27929f34e00761c2 (patch)
tree75efcbc6854ef9bd476eb8bf00cc5c900da436a2 /library/cpp/neh/http2.cpp
parent03f024c4412e3aa613bb543cf1660176320ba8f4 (diff)
downloadydb-06e5c21a835c0e923506c4ff27929f34e00761c2.tar.gz
fix ya.make
Diffstat (limited to 'library/cpp/neh/http2.cpp')
-rw-r--r--library/cpp/neh/http2.cpp2102
1 files changed, 2102 insertions, 0 deletions
diff --git a/library/cpp/neh/http2.cpp b/library/cpp/neh/http2.cpp
new file mode 100644
index 0000000000..0bba29cf22
--- /dev/null
+++ b/library/cpp/neh/http2.cpp
@@ -0,0 +1,2102 @@
+#include "http2.h"
+
+#include "conn_cache.h"
+#include "details.h"
+#include "factory.h"
+#include "http_common.h"
+#include "smart_ptr.h"
+#include "utils.h"
+
+#include <library/cpp/http/push_parser/http_parser.h>
+#include <library/cpp/http/misc/httpcodes.h>
+#include <library/cpp/http/misc/parsed_request.h>
+#include <library/cpp/neh/asio/executor.h>
+
+#include <util/generic/singleton.h>
+#include <util/generic/vector.h>
+#include <util/network/iovec.h>
+#include <util/stream/output.h>
+#include <util/stream/zlib.h>
+#include <util/system/condvar.h>
+#include <util/system/mutex.h>
+#include <util/system/spinlock.h>
+#include <util/system/yassert.h>
+#include <util/thread/factory.h>
+#include <util/thread/singleton.h>
+#include <util/system/sanitizers.h>
+
+#include <atomic>
+
+#if defined(_unix_)
+#include <sys/ioctl.h>
+#endif
+
+#if defined(_linux_)
+#undef SIOCGSTAMP
+#undef SIOCGSTAMPNS
+#include <linux/sockios.h>
+#define FIONWRITE SIOCOUTQ
+#endif
+
+//#define DEBUG_HTTP2
+
+#ifdef DEBUG_HTTP2
+#define DBGOUT(args) Cout << args << Endl;
+#else
+#define DBGOUT(args)
+#endif
+
+using namespace NDns;
+using namespace NAsio;
+using namespace NNeh;
+using namespace NNeh::NHttp;
+using namespace NNeh::NHttp2;
+using namespace std::placeholders;
+
+//
+// has complex keep-alive references between entities in multi-thread enviroment,
+// this create risks for races/memory leak, etc..
+// so connecting/disconnecting entities must be doing carefully
+//
+// handler <=-> request <==> connection(socket) <= handlers, stored in io_service
+// ^
+// +== connections_cache
+// '=>' -- shared/intrusive ptr
+// '->' -- weak_ptr
+//
+
+static TDuration FixTimeoutForSanitizer(const TDuration timeout) {
+ ui64 multiplier = 1;
+ if (NSan::ASanIsOn()) {
+ // https://github.com/google/sanitizers/wiki/AddressSanitizer
+ multiplier = 4;
+ } else if (NSan::MSanIsOn()) {
+ // via https://github.com/google/sanitizers/wiki/MemorySanitizer
+ multiplier = 3;
+ } else if (NSan::TSanIsOn()) {
+ // via https://clang.llvm.org/docs/ThreadSanitizer.html
+ multiplier = 15;
+ }
+
+ return TDuration::FromValue(timeout.GetValue() * multiplier);
+}
+
+TDuration THttp2Options::ConnectTimeout = FixTimeoutForSanitizer(TDuration::MilliSeconds(1000));
+TDuration THttp2Options::InputDeadline = TDuration::Max();
+TDuration THttp2Options::OutputDeadline = TDuration::Max();
+TDuration THttp2Options::SymptomSlowConnect = FixTimeoutForSanitizer(TDuration::MilliSeconds(10));
+size_t THttp2Options::InputBufferSize = 16 * 1024;
+bool THttp2Options::KeepInputBufferForCachedConnections = false;
+size_t THttp2Options::AsioThreads = 4;
+size_t THttp2Options::AsioServerThreads = 4;
+bool THttp2Options::EnsureSendingCompleteByAck = false;
+int THttp2Options::Backlog = 100;
+TDuration THttp2Options::ServerInputDeadline = FixTimeoutForSanitizer(TDuration::MilliSeconds(500));
+TDuration THttp2Options::ServerOutputDeadline = TDuration::Max();
+TDuration THttp2Options::ServerInputDeadlineKeepAliveMax = FixTimeoutForSanitizer(TDuration::Seconds(120));
+TDuration THttp2Options::ServerInputDeadlineKeepAliveMin = FixTimeoutForSanitizer(TDuration::Seconds(10));
+bool THttp2Options::ServerUseDirectWrite = false;
+bool THttp2Options::UseResponseAsErrorMessage = false;
+bool THttp2Options::FullHeadersAsErrorMessage = false;
+bool THttp2Options::ErrorDetailsAsResponseBody = false;
+bool THttp2Options::RedirectionNotError = false;
+bool THttp2Options::AnyResponseIsNotError = false;
+bool THttp2Options::TcpKeepAlive = false;
+i32 THttp2Options::LimitRequestsPerConnection = -1;
+bool THttp2Options::QuickAck = false;
+bool THttp2Options::UseAsyncSendRequest = false;
+
+bool THttp2Options::Set(TStringBuf name, TStringBuf value) {
+#define HTTP2_TRY_SET(optType, optName) \
+ if (name == TStringBuf(#optName)) { \
+ optName = FromString<optType>(value); \
+ }
+
+ HTTP2_TRY_SET(TDuration, ConnectTimeout)
+ else HTTP2_TRY_SET(TDuration, InputDeadline)
+ else HTTP2_TRY_SET(TDuration, OutputDeadline)
+ else HTTP2_TRY_SET(TDuration, SymptomSlowConnect) else HTTP2_TRY_SET(size_t, InputBufferSize) else HTTP2_TRY_SET(bool, KeepInputBufferForCachedConnections) else HTTP2_TRY_SET(size_t, AsioThreads) else HTTP2_TRY_SET(size_t, AsioServerThreads) else HTTP2_TRY_SET(bool, EnsureSendingCompleteByAck) else HTTP2_TRY_SET(int, Backlog) else HTTP2_TRY_SET(TDuration, ServerInputDeadline) else HTTP2_TRY_SET(TDuration, ServerOutputDeadline) else HTTP2_TRY_SET(TDuration, ServerInputDeadlineKeepAliveMax) else HTTP2_TRY_SET(TDuration, ServerInputDeadlineKeepAliveMin) else HTTP2_TRY_SET(bool, ServerUseDirectWrite) else HTTP2_TRY_SET(bool, UseResponseAsErrorMessage) else HTTP2_TRY_SET(bool, FullHeadersAsErrorMessage) else HTTP2_TRY_SET(bool, ErrorDetailsAsResponseBody) else HTTP2_TRY_SET(bool, RedirectionNotError) else HTTP2_TRY_SET(bool, AnyResponseIsNotError) else HTTP2_TRY_SET(bool, TcpKeepAlive) else HTTP2_TRY_SET(i32, LimitRequestsPerConnection) else HTTP2_TRY_SET(bool, QuickAck)
+ else HTTP2_TRY_SET(bool, UseAsyncSendRequest) else {
+ return false;
+ }
+ return true;
+}
+
+namespace NNeh {
+ const NDns::TResolvedHost* Resolve(const TStringBuf host, ui16 port, NHttp::EResolverType resolverType);
+}
+
+namespace {
+//#define DEBUG_STAT
+
+#ifdef DEBUG_STAT
+ struct TDebugStat {
+ static std::atomic<size_t> ConnTotal;
+ static std::atomic<size_t> ConnActive;
+ static std::atomic<size_t> ConnCached;
+ static std::atomic<size_t> ConnDestroyed;
+ static std::atomic<size_t> ConnFailed;
+ static std::atomic<size_t> ConnConnCanceled;
+ static std::atomic<size_t> ConnSlow;
+ static std::atomic<size_t> Conn2Success;
+ static std::atomic<size_t> ConnPurgedInCache;
+ static std::atomic<size_t> ConnDestroyedInCache;
+ static std::atomic<size_t> RequestTotal;
+ static std::atomic<size_t> RequestSuccessed;
+ static std::atomic<size_t> RequestFailed;
+ static void Print() {
+ Cout << "ct=" << ConnTotal.load(std::memory_order_acquire)
+ << " ca=" << ConnActive.load(std::memory_order_acquire)
+ << " cch=" << ConnCached.load(std::memory_order_acquire)
+ << " cd=" << ConnDestroyed.load(std::memory_order_acquire)
+ << " cf=" << ConnFailed.load(std::memory_order_acquire)
+ << " ccc=" << ConnConnCanceled.load(std::memory_order_acquire)
+ << " csl=" << ConnSlow.load(std::memory_order_acquire)
+ << " c2s=" << Conn2Success.load(std::memory_order_acquire)
+ << " cpc=" << ConnPurgedInCache.load(std::memory_order_acquire)
+ << " cdc=" << ConnDestroyedInCache.load(std::memory_order_acquire)
+ << " rt=" << RequestTotal.load(std::memory_order_acquire)
+ << " rs=" << RequestSuccessed.load(std::memory_order_acquire)
+ << " rf=" << RequestFailed.load(std::memory_order_acquire)
+ << Endl;
+ }
+ };
+ std::atomic<size_t> TDebugStat::ConnTotal = 0;
+ std::atomic<size_t> TDebugStat::ConnActive = 0;
+ std::atomic<size_t> TDebugStat::ConnCached = 0;
+ std::atomic<size_t> TDebugStat::ConnDestroyed = 0;
+ std::atomic<size_t> TDebugStat::ConnFailed = 0;
+ std::atomic<size_t> TDebugStat::ConnConnCanceled = 0;
+ std::atomic<size_t> TDebugStat::ConnSlow = 0;
+ std::atomic<size_t> TDebugStat::Conn2Success = 0;
+ std::atomic<size_t> TDebugStat::ConnPurgedInCache = 0;
+ std::atomic<size_t> TDebugStat::ConnDestroyedInCache = 0;
+ std::atomic<size_t> TDebugStat::RequestTotal = 0;
+ std::atomic<size_t> TDebugStat::RequestSuccessed = 0;
+ std::atomic<size_t> TDebugStat::RequestFailed = 0;
+#endif
+
+ inline void PrepareSocket(SOCKET s, const TRequestSettings& requestSettings = TRequestSettings()) {
+ if (requestSettings.NoDelay) {
+ SetNoDelay(s, true);
+ }
+ }
+
+ bool Compress(TData& data, const TString& compressionScheme) {
+ if (compressionScheme == "gzip") {
+ try {
+ TData gzipped(data.size());
+ TMemoryOutput out(gzipped.data(), gzipped.size());
+ TZLibCompress c(&out, ZLib::GZip);
+ c.Write(data.data(), data.size());
+ c.Finish();
+ gzipped.resize(out.Buf() - gzipped.data());
+ data.swap(gzipped);
+ return true;
+ } catch (yexception&) {
+ // gzipped data occupies more space than original data
+ }
+ }
+ return false;
+ }
+
+ class THttpRequestBuffers: public NAsio::TTcpSocket::IBuffers {
+ public:
+ THttpRequestBuffers(TRequestData::TPtr rd)
+ : Req_(rd)
+ , Parts_(Req_->Parts())
+ , IOvec_(Parts_.data(), Parts_.size())
+ {
+ }
+
+ TContIOVector* GetIOvec() override {
+ return &IOvec_;
+ }
+
+ private:
+ TRequestData::TPtr Req_;
+ TVector<IOutputStream::TPart> Parts_;
+ TContIOVector IOvec_;
+ };
+
+ struct TRequestGet1: public TRequestGet {
+ static inline TStringBuf Name() noexcept {
+ return TStringBuf("http");
+ }
+ };
+
+ struct TRequestPost1: public TRequestPost {
+ static inline TStringBuf Name() noexcept {
+ return TStringBuf("post");
+ }
+ };
+
+ struct TRequestFull1: public TRequestFull {
+ static inline TStringBuf Name() noexcept {
+ return TStringBuf("full");
+ }
+ };
+
+ struct TRequestGet2: public TRequestGet {
+ static inline TStringBuf Name() noexcept {
+ return TStringBuf("http2");
+ }
+ };
+
+ struct TRequestPost2: public TRequestPost {
+ static inline TStringBuf Name() noexcept {
+ return TStringBuf("post2");
+ }
+ };
+
+ struct TRequestFull2: public TRequestFull {
+ static inline TStringBuf Name() noexcept {
+ return TStringBuf("full2");
+ }
+ };
+
+ struct TRequestUnixSocketGet: public TRequestGet {
+ static inline TStringBuf Name() noexcept {
+ return TStringBuf("http+unix");
+ }
+
+ static TRequestSettings RequestSettings() {
+ return TRequestSettings()
+ .SetNoDelay(false)
+ .SetResolverType(EResolverType::EUNIXSOCKET);
+ }
+ };
+
+ struct TRequestUnixSocketPost: public TRequestPost {
+ static inline TStringBuf Name() noexcept {
+ return TStringBuf("post+unix");
+ }
+
+ static TRequestSettings RequestSettings() {
+ return TRequestSettings()
+ .SetNoDelay(false)
+ .SetResolverType(EResolverType::EUNIXSOCKET);
+ }
+ };
+
+ struct TRequestUnixSocketFull: public TRequestFull {
+ static inline TStringBuf Name() noexcept {
+ return TStringBuf("full+unix");
+ }
+
+ static TRequestSettings RequestSettings() {
+ return TRequestSettings()
+ .SetNoDelay(false)
+ .SetResolverType(EResolverType::EUNIXSOCKET);
+ }
+ };
+
+ typedef TAutoPtr<THttpRequestBuffers> THttpRequestBuffersPtr;
+
+ class THttpRequest;
+ typedef TSharedPtrB<THttpRequest> THttpRequestRef;
+
+ class THttpConn;
+ typedef TIntrusivePtr<THttpConn> THttpConnRef;
+
+ typedef std::function<TRequestData::TPtr(const TMessage&, const TParsedLocation&)> TRequestBuilder;
+
+ class THttpRequest {
+ public:
+ class THandle: public TSimpleHandle {
+ public:
+ THandle(IOnRecv* f, const TMessage& msg, TStatCollector* s) noexcept
+ : TSimpleHandle(f, msg, s)
+ {
+ }
+
+ bool MessageSendedCompletely() const noexcept override {
+ if (TSimpleHandle::MessageSendedCompletely()) {
+ return true;
+ }
+
+ THttpRequestRef req(GetRequest());
+ if (!!req && req->RequestSendedCompletely()) {
+ const_cast<THandle*>(this)->SetSendComplete();
+ }
+
+ return TSimpleHandle::MessageSendedCompletely();
+ }
+
+ void Cancel() noexcept override {
+ if (TSimpleHandle::Canceled()) {
+ return;
+ }
+
+ THttpRequestRef req(GetRequest());
+ if (!!req) {
+ TSimpleHandle::Cancel();
+ req->Cancel();
+ }
+ }
+
+ void NotifyError(TErrorRef error, const THttpParser* rsp = nullptr) {
+#ifdef DEBUG_STAT
+ ++TDebugStat::RequestFailed;
+#endif
+ if (rsp) {
+ TSimpleHandle::NotifyError(error, rsp->DecodedContent(), rsp->FirstLine(), rsp->Headers());
+ } else {
+ TSimpleHandle::NotifyError(error);
+ }
+
+ ReleaseRequest();
+ }
+
+ //not thread safe!
+ void SetRequest(const TWeakPtrB<THttpRequest>& r) noexcept {
+ Req_ = r;
+ }
+
+ private:
+ THttpRequestRef GetRequest() const noexcept {
+ TGuard<TSpinLock> g(SP_);
+ return Req_;
+ }
+
+ void ReleaseRequest() noexcept {
+ TWeakPtrB<THttpRequest> tmp;
+ TGuard<TSpinLock> g(SP_);
+ tmp.Swap(Req_);
+ }
+
+ mutable TSpinLock SP_;
+ TWeakPtrB<THttpRequest> Req_;
+ };
+
+ typedef TIntrusivePtr<THandle> THandleRef;
+
+ static void Run(THandleRef& h, const TMessage& msg, TRequestBuilder f, const TRequestSettings& s) {
+ THttpRequestRef req(new THttpRequest(h, msg, f, s));
+ req->WeakThis_ = req;
+ h->SetRequest(req->WeakThis_);
+ req->Run(req);
+ }
+
+ ~THttpRequest() {
+ DBGOUT("~THttpRequest()");
+ }
+
+ private:
+ THttpRequest(THandleRef& h, TMessage msg, TRequestBuilder f, const TRequestSettings& s)
+ : Hndl_(h)
+ , RequestBuilder_(f)
+ , RequestSettings_(s)
+ , Msg_(std::move(msg))
+ , Loc_(Msg_.Addr)
+ , Addr_(Resolve(Loc_.Host, Loc_.GetPort(), RequestSettings_.ResolverType))
+ , AddrIter_(Addr_->Addr.Begin())
+ , Canceled_(false)
+ , RequestSendedCompletely_(false)
+ {
+ }
+
+ void Run(THttpRequestRef& req);
+
+ public:
+ THttpRequestBuffersPtr BuildRequest() {
+ return new THttpRequestBuffers(RequestBuilder_(Msg_, Loc_));
+ }
+
+ TRequestSettings RequestSettings() {
+ return RequestSettings_;
+ }
+
+ //can create a spare socket in an attempt to decrease connecting time
+ void OnDetectSlowConnecting();
+
+ //remove extra connection on success connec
+ void OnConnect(THttpConn* c);
+
+ //have some response input
+ void OnBeginRead() noexcept {
+ RequestSendedCompletely_ = true;
+ }
+
+ void OnResponse(TAutoPtr<THttpParser>& rsp);
+
+ void OnConnectFailed(THttpConn* c, const TErrorCode& ec);
+ void OnSystemError(THttpConn* c, const TErrorCode& ec);
+ void OnError(THttpConn* c, const TString& errorText);
+
+ bool RequestSendedCompletely() noexcept;
+
+ void Cancel() noexcept;
+
+ private:
+ void NotifyResponse(const TString& resp, const TString& firstLine, const THttpHeaders& headers) {
+ THandleRef h(ReleaseHandler());
+ if (!!h) {
+ h->NotifyResponse(resp, firstLine, headers);
+ }
+ }
+
+ void NotifyError(
+ const TString& errorText,
+ TError::TType errorType = TError::UnknownType,
+ i32 errorCode = 0, i32 systemErrorCode = 0) {
+ NotifyError(new TError(errorText, errorType, errorCode, systemErrorCode));
+ }
+
+ void NotifyError(TErrorRef error, const THttpParser* rsp = nullptr) {
+ THandleRef h(ReleaseHandler());
+ if (!!h) {
+ h->NotifyError(error, rsp);
+ }
+ }
+
+ void Finalize(THttpConn* skipConn = nullptr) noexcept;
+
+ inline THandleRef ReleaseHandler() noexcept {
+ THandleRef h;
+ {
+ TGuard<TSpinLock> g(SL_);
+ h.Swap(Hndl_);
+ }
+ return h;
+ }
+
+ inline THttpConnRef GetConn() noexcept {
+ TGuard<TSpinLock> g(SL_);
+ return Conn_;
+ }
+
+ inline THttpConnRef ReleaseConn() noexcept {
+ THttpConnRef c;
+ {
+ TGuard<TSpinLock> g(SL_);
+ c.Swap(Conn_);
+ }
+ return c;
+ }
+
+ inline THttpConnRef ReleaseConn2() noexcept {
+ THttpConnRef c;
+ {
+ TGuard<TSpinLock> g(SL_);
+ c.Swap(Conn2_);
+ }
+ return c;
+ }
+
+ TSpinLock SL_; //guaranted calling notify() only once (prevent race between asio thread and current)
+ THandleRef Hndl_;
+ TRequestBuilder RequestBuilder_;
+ TRequestSettings RequestSettings_;
+ const TMessage Msg_;
+ const TParsedLocation Loc_;
+ const TResolvedHost* Addr_;
+ TNetworkAddress::TIterator AddrIter_;
+ THttpConnRef Conn_;
+ THttpConnRef Conn2_; //concurrent connection used, if detected slow connecting on first connection
+ TWeakPtrB<THttpRequest> WeakThis_;
+ TAtomicBool Canceled_;
+ TAtomicBool RequestSendedCompletely_;
+ };
+
+ TAtomicCounter* HttpOutConnCounter();
+
+ class THttpConn: public TThrRefBase {
+ public:
+ static THttpConnRef Create(TIOService& srv);
+
+ ~THttpConn() override {
+ DBGOUT("~THttpConn()");
+ Req_.Reset();
+ HttpOutConnCounter()->Dec();
+#ifdef DEBUG_STAT
+ ++TDebugStat::ConnDestroyed;
+#endif
+ }
+
+ void StartRequest(THttpRequestRef req, const TEndpoint& ep, size_t addrId, TDuration slowConn, bool useAsyncSendRequest = false) {
+ {
+ //thread safe linking connection->request
+ TGuard<TSpinLock> g(SL_);
+ Req_ = req;
+ }
+ AddrId_ = addrId;
+ try {
+ TDuration connectDeadline(THttp2Options::ConnectTimeout);
+ if (THttp2Options::ConnectTimeout > slowConn) {
+ //use append non fatal connect deadline, so on first timedout
+ //report about slow connecting to THttpRequest, and continue wait ConnectDeadline_ period
+ connectDeadline = slowConn;
+ ConnectDeadline_ = THttp2Options::ConnectTimeout - slowConn;
+ }
+ DBGOUT("AsyncConnect to " << ep.IpToString());
+ AS_.AsyncConnect(ep, std::bind(&THttpConn::OnConnect, THttpConnRef(this), _1, _2, useAsyncSendRequest), connectDeadline);
+ } catch (...) {
+ ReleaseRequest();
+ throw;
+ }
+ }
+
+ //start next request on keep-alive connection
+ bool StartNextRequest(THttpRequestRef& req, bool useAsyncSendRequest = false) {
+ if (Finalized_) {
+ return false;
+ }
+
+ {
+ //thread safe linking connection->request
+ TGuard<TSpinLock> g(SL_);
+ Req_ = req;
+ }
+
+ RequestWritten_ = false;
+ BeginReadResponse_ = false;
+
+ try {
+ if (!useAsyncSendRequest) {
+ TErrorCode ec;
+ SendRequest(req->BuildRequest(), ec); //throw std::bad_alloc
+ if (ec.Value() == ECANCELED) {
+ OnCancel();
+ } else if (ec) {
+ OnError(ec);
+ }
+ } else {
+ SendRequestAsync(req->BuildRequest()); //throw std::bad_alloc
+ }
+ } catch (...) {
+ OnError(CurrentExceptionMessage());
+ throw;
+ }
+ return true;
+ }
+
+ //connection received from cache must be validated before using
+ //(process removing closed conection from cache consume some time)
+ inline bool IsValid() const noexcept {
+ return !Finalized_;
+ }
+
+ void SetCached(bool v) noexcept {
+ Cached_ = v;
+ }
+
+ void Close() noexcept {
+ try {
+ Cancel();
+ } catch (...) {
+ }
+ }
+
+ void DetachRequest() noexcept {
+ ReleaseRequest();
+ }
+
+ void Cancel() { //throw std::bad_alloc
+ if (!Canceled_) {
+ Canceled_ = true;
+ Finalized_ = true;
+ OnCancel();
+ AS_.AsyncCancel();
+ }
+ }
+
+ void OnCancel() {
+ THttpRequestRef r(ReleaseRequest());
+ if (!!r) {
+ static const TString reqCanceled("request canceled");
+ r->OnError(this, reqCanceled);
+ }
+ }
+
+ bool RequestSendedCompletely() const noexcept {
+ DBGOUT("RequestSendedCompletely()");
+ if (!Connected_ || !RequestWritten_) {
+ return false;
+ }
+ if (BeginReadResponse_) {
+ return true;
+ }
+#if defined(FIONWRITE)
+ if (THttp2Options::EnsureSendingCompleteByAck) {
+ int nbytes = Max<int>();
+ int err = ioctl(AS_.Native(), FIONWRITE, &nbytes);
+ return err ? false : nbytes == 0;
+ }
+#endif
+ return true;
+ }
+
+ TIOService& GetIOService() const noexcept {
+ return AS_.GetIOService();
+ }
+
+ private:
+ THttpConn(TIOService& srv)
+ : AddrId_(0)
+ , AS_(srv)
+ , BuffSize_(THttp2Options::InputBufferSize)
+ , Connected_(false)
+ , Cached_(false)
+ , Canceled_(false)
+ , Finalized_(false)
+ , InAsyncRead_(false)
+ , RequestWritten_(false)
+ , BeginReadResponse_(false)
+ {
+ HttpOutConnCounter()->Inc();
+ }
+
+ //can be called only from asio
+ void OnConnect(const TErrorCode& ec, IHandlingContext& ctx, bool useAsyncSendRequest = false) {
+ DBGOUT("THttpConn::OnConnect: " << ec.Value());
+ if (Y_UNLIKELY(ec)) {
+ if (ec.Value() == ETIMEDOUT && ConnectDeadline_.GetValue()) {
+ //detect slow connecting (yet not reached final timeout)
+ DBGOUT("OnConnectTimingCheck");
+ THttpRequestRef req(GetRequest());
+ if (!req) {
+ return; //cancel from client thread can ahead us
+ }
+ TDuration newDeadline(ConnectDeadline_);
+ ConnectDeadline_ = TDuration::Zero(); //next timeout is final
+
+ req->OnDetectSlowConnecting();
+ //continue wait connect
+ ctx.ContinueUseHandler(newDeadline);
+
+ return;
+ }
+#ifdef DEBUG_STAT
+ if (ec.Value() != ECANCELED) {
+ ++TDebugStat::ConnFailed;
+ } else {
+ ++TDebugStat::ConnConnCanceled;
+ }
+#endif
+ if (ec.Value() == EIO) {
+ //try get more detail error info
+ char buf[1];
+ TErrorCode errConnect;
+ AS_.ReadSome(buf, 1, errConnect);
+ OnConnectFailed(errConnect.Value() ? errConnect : ec);
+ } else if (ec.Value() == ECANCELED) {
+ // not try connecting to next host ip addr, simple fail
+ OnError(ec);
+ } else {
+ OnConnectFailed(ec);
+ }
+ } else {
+ Connected_ = true;
+
+ THttpRequestRef req(GetRequest());
+ if (!req || Canceled_) {
+ return;
+ }
+
+ try {
+ PrepareSocket(AS_.Native(), req->RequestSettings());
+ if (THttp2Options::TcpKeepAlive) {
+ SetKeepAlive(AS_.Native(), true);
+ }
+ } catch (TSystemError& err) {
+ TErrorCode ec2(err.Status());
+ OnError(ec2);
+ return;
+ }
+
+ req->OnConnect(this);
+
+ THttpRequestBuffersPtr ptr(req->BuildRequest());
+ PrepareParser();
+
+ if (!useAsyncSendRequest) {
+ TErrorCode ec3;
+ SendRequest(ptr, ec3);
+ if (ec3) {
+ OnError(ec3);
+ }
+ } else {
+ SendRequestAsync(ptr);
+ }
+ }
+ }
+
+ void PrepareParser() {
+ Prs_ = new THttpParser();
+ Prs_->Prepare();
+ }
+
+ void SendRequest(const THttpRequestBuffersPtr& bfs, TErrorCode& ec) { //throw std::bad_alloc
+ if (!THttp2Options::UseAsyncSendRequest) {
+ size_t amount = AS_.WriteSome(*bfs->GetIOvec(), ec);
+
+ if (ec && ec.Value() != EAGAIN && ec.Value() != EWOULDBLOCK && ec.Value() != EINPROGRESS) {
+ return;
+ }
+ ec.Assign(0);
+
+ bfs->GetIOvec()->Proceed(amount);
+
+ if (bfs->GetIOvec()->Complete()) {
+ RequestWritten_ = true;
+ StartRead();
+ } else {
+ SendRequestAsync(bfs);
+ }
+ } else {
+ SendRequestAsync(bfs);
+ }
+ }
+
+ void SendRequestAsync(const THttpRequestBuffersPtr& bfs) {
+ NAsio::TTcpSocket::TSendedData sd(bfs.Release());
+ AS_.AsyncWrite(sd, std::bind(&THttpConn::OnWrite, THttpConnRef(this), _1, _2, _3), THttp2Options::OutputDeadline);
+ }
+
+ void OnWrite(const TErrorCode& err, size_t amount, IHandlingContext& ctx) {
+ Y_UNUSED(amount);
+ Y_UNUSED(ctx);
+ if (err) {
+ OnError(err);
+ } else {
+ DBGOUT("OnWrite()");
+ RequestWritten_ = true;
+ StartRead();
+ }
+ }
+
+ inline void StartRead() {
+ if (!InAsyncRead_ && !Canceled_) {
+ InAsyncRead_ = true;
+ AS_.AsyncPollRead(std::bind(&THttpConn::OnCanRead, THttpConnRef(this), _1, _2), THttp2Options::InputDeadline);
+ }
+ }
+
+ //can be called only from asio
+ void OnReadSome(const TErrorCode& err, size_t bytes, IHandlingContext& ctx) {
+ if (Y_UNLIKELY(err)) {
+ OnError(err);
+ return;
+ }
+ if (!BeginReadResponse_) {
+ //used in MessageSendedCompletely()
+ BeginReadResponse_ = true;
+ THttpRequestRef r(GetRequest());
+ if (!!r) {
+ r->OnBeginRead();
+ }
+ }
+ DBGOUT("receive:" << TStringBuf(Buff_.Get(), bytes));
+ try {
+ if (!Prs_) {
+ throw yexception() << TStringBuf("receive some data while not in request");
+ }
+
+#if defined(_linux_)
+ if (THttp2Options::QuickAck) {
+ SetSockOpt(AS_.Native(), SOL_TCP, TCP_QUICKACK, (int)1);
+ }
+#endif
+
+ DBGOUT("parse:");
+ while (!Prs_->Parse(Buff_.Get(), bytes)) {
+ if (BuffSize_ == bytes) {
+ TErrorCode ec;
+ bytes = AS_.ReadSome(Buff_.Get(), BuffSize_, ec);
+
+ if (!ec) {
+ continue;
+ }
+
+ if (ec.Value() != EAGAIN && ec.Value() != EWOULDBLOCK) {
+ OnError(ec);
+
+ return;
+ }
+ }
+ //continue async. read from socket
+ ctx.ContinueUseHandler(THttp2Options::InputDeadline);
+
+ return;
+ }
+
+ //succesfully reach end of http response
+ THttpRequestRef r(ReleaseRequest());
+ if (!r) {
+ //lost race to req. canceling
+ DBGOUT("connection failed");
+ return;
+ }
+
+ DBGOUT("response:");
+ bool keepALive = Prs_->IsKeepAlive();
+
+ r->OnResponse(Prs_);
+
+ if (!keepALive) {
+ return;
+ }
+
+ //continue use connection (keep-alive mode)
+ PrepareParser();
+
+ if (!THttp2Options::KeepInputBufferForCachedConnections) {
+ Buff_.Destroy();
+ }
+ //continue async. read from socket
+ ctx.ContinueUseHandler(THttp2Options::InputDeadline);
+
+ PutSelfToCache();
+ } catch (...) {
+ OnError(CurrentExceptionMessage());
+ }
+ }
+
+ void PutSelfToCache();
+
+ //method for reaction on input data for re-used keep-alive connection,
+ //which free/release buffer when was placed in cache
+ void OnCanRead(const TErrorCode& err, IHandlingContext& ctx) {
+ if (Y_UNLIKELY(err)) {
+ OnError(err);
+ } else {
+ if (!Buff_) {
+ Buff_.Reset(new char[BuffSize_]);
+ }
+ TErrorCode ec;
+ OnReadSome(ec, AS_.ReadSome(Buff_.Get(), BuffSize_, ec), ctx);
+ }
+ }
+
+ //unlink connection and request, thread-safe mark connection as non valid
+ inline THttpRequestRef GetRequest() noexcept {
+ TGuard<TSpinLock> g(SL_);
+ return Req_;
+ }
+
+ inline THttpRequestRef ReleaseRequest() noexcept {
+ THttpRequestRef r;
+ {
+ TGuard<TSpinLock> g(SL_);
+ r.Swap(Req_);
+ }
+ return r;
+ }
+
+ void OnConnectFailed(const TErrorCode& ec);
+
+ inline void OnError(const TErrorCode& ec) {
+ OnError(ec.Text());
+ }
+
+ inline void OnError(const TString& errText);
+
+ size_t AddrId_;
+ NAsio::TTcpSocket AS_;
+ TArrayHolder<char> Buff_; //input buffer
+ const size_t BuffSize_;
+
+ TAutoPtr<THttpParser> Prs_; //input data parser & parsed info storage
+
+ TSpinLock SL_;
+ THttpRequestRef Req_; //current request
+ TDuration ConnectDeadline_;
+ TAtomicBool Connected_;
+ TAtomicBool Cached_;
+ TAtomicBool Canceled_;
+ TAtomicBool Finalized_;
+
+ bool InAsyncRead_;
+ TAtomicBool RequestWritten_;
+ TAtomicBool BeginReadResponse_;
+ };
+
+ //conn limits monitoring, cache clean, contain used in http clients asio threads/executors
+ class THttpConnManager: public IThreadFactory::IThreadAble {
+ public:
+ THttpConnManager()
+ : TotalConn(0)
+ , EP_(THttp2Options::AsioThreads)
+ , InPurging_(0)
+ , MaxConnId_(0)
+ , Shutdown_(false)
+ {
+ T_ = SystemThreadFactory()->Run(this);
+ Limits.SetSoft(40000);
+ Limits.SetHard(50000);
+ }
+
+ ~THttpConnManager() override {
+ {
+ TGuard<TMutex> g(PurgeMutex_);
+
+ Shutdown_ = true;
+ CondPurge_.Signal();
+ }
+
+ EP_.SyncShutdown();
+
+ T_->Join();
+ }
+
+ inline void SetLimits(size_t softLimit, size_t hardLimit) noexcept {
+ Limits.SetSoft(softLimit);
+ Limits.SetHard(hardLimit);
+ }
+
+ inline std::pair<size_t, size_t> GetLimits() const noexcept {
+ return {Limits.Soft(), Limits.Hard()};
+ }
+
+ inline void CheckLimits() {
+ if (ExceedSoftLimit()) {
+ SuggestPurgeCache();
+
+ if (ExceedHardLimit()) {
+ Y_FAIL("neh::http2 output connections limit reached");
+ //ythrow yexception() << "neh::http2 output connections limit reached";
+ }
+ }
+ }
+
+ inline bool Get(THttpConnRef& conn, size_t addrId) {
+#ifdef DEBUG_STAT
+ TDebugStat::ConnTotal.store(TotalConn.Val(), std::memory_order_release);
+ TDebugStat::ConnActive(Active(), std::memory_order_release);
+ TDebugStat::ConnCached(Cache_.Size(), std::memory_order_release);
+#endif
+ return Cache_.Get(conn, addrId);
+ }
+
+ inline void Put(THttpConnRef& conn, size_t addrId) {
+ if (Y_LIKELY(!Shutdown_ && !ExceedHardLimit() && !CacheDisabled())) {
+ if (Y_UNLIKELY(addrId > (size_t)AtomicGet(MaxConnId_))) {
+ AtomicSet(MaxConnId_, addrId);
+ }
+ Cache_.Put(conn, addrId);
+ } else {
+ conn->Close();
+ conn.Drop();
+ }
+ }
+
+ inline size_t OnConnError(size_t addrId) {
+ return Cache_.Validate(addrId);
+ }
+
+ TIOService& GetIOService() {
+ return EP_.GetExecutor().GetIOService();
+ }
+
+ bool CacheDisabled() const {
+ return Limits.Soft() == 0;
+ }
+
+ bool IsShutdown() const noexcept {
+ return Shutdown_;
+ }
+
+ TAtomicCounter TotalConn;
+
+ private:
+ inline size_t Total() const noexcept {
+ return TotalConn.Val();
+ }
+
+ inline size_t Active() const noexcept {
+ return TFdLimits::ExceedLimit(Total(), Cache_.Size());
+ }
+
+ inline size_t ExceedSoftLimit() const noexcept {
+ return TFdLimits::ExceedLimit(Total(), Limits.Soft());
+ }
+
+ inline size_t ExceedHardLimit() const noexcept {
+ return TFdLimits::ExceedLimit(Total(), Limits.Hard());
+ }
+
+ void SuggestPurgeCache() {
+ if (AtomicTryLock(&InPurging_)) {
+ //evaluate the usefulness of purging the cache
+ //если в кеше мало соединений (< MaxConnId_/16 или 64), не чистим кеш
+ if (Cache_.Size() > (Min((size_t)AtomicGet(MaxConnId_), (size_t)1024U) >> 4)) {
+ //по мере приближения к hardlimit нужда в чистке cache приближается к 100%
+ size_t closenessToHardLimit256 = ((Active() + 1) << 8) / (Limits.Delta() + 1);
+ //чем больше соединений в кеше, а не в работе, тем менее нужен кеш (можно его почистить)
+ size_t cacheUselessness256 = ((Cache_.Size() + 1) << 8) / (Active() + 1);
+
+ //итого, - пороги срабатывания:
+ //при достижении soft-limit, если соединения в кеше, а не в работе
+ //на полпути от soft-limit к hard-limit, если в кеше больше половины соединений
+ //при приближении к hardlimit пытаться почистить кеш почти постоянно
+ if ((closenessToHardLimit256 + cacheUselessness256) >= 256U) {
+ TGuard<TMutex> g(PurgeMutex_);
+
+ CondPurge_.Signal();
+ return; //memo: thread MUST unlock InPurging_ (see DoExecute())
+ }
+ }
+ AtomicUnlock(&InPurging_);
+ }
+ }
+
+ void DoExecute() override {
+ while (true) {
+ {
+ TGuard<TMutex> g(PurgeMutex_);
+
+ if (Shutdown_)
+ return;
+
+ CondPurge_.WaitI(PurgeMutex_);
+ }
+
+ PurgeCache();
+
+ AtomicUnlock(&InPurging_);
+ }
+ }
+
+ void PurgeCache() noexcept {
+ //try remove at least ExceedSoftLimit() oldest connections from cache
+ //вычисляем долю кеша, которую нужно почистить (в 256 долях) (но не менее 1/32 кеша)
+ size_t frac256 = Min(size_t(Max(size_t(256U / 32U), (ExceedSoftLimit() << 8) / (Cache_.Size() + 1))), (size_t)256U);
+
+ size_t processed = 0;
+ size_t maxConnId = AtomicGet(MaxConnId_);
+ for (size_t i = 0; i <= maxConnId && !Shutdown_; ++i) {
+ processed += Cache_.Purge(i, frac256);
+ if (processed > 32) {
+#ifdef DEBUG_STAT
+ TDebugStat::ConnPurgedInCache += processed;
+#endif
+ processed = 0;
+ Sleep(TDuration::MilliSeconds(10)); //prevent big spike cpu/system usage
+ }
+ }
+ }
+
+ TFdLimits Limits;
+ TExecutorsPool EP_;
+
+ TConnCache<THttpConn> Cache_;
+ TAtomic InPurging_;
+ TAtomic MaxConnId_;
+
+ TAutoPtr<IThreadFactory::IThread> T_;
+ TCondVar CondPurge_;
+ TMutex PurgeMutex_;
+ TAtomicBool Shutdown_;
+ };
+
+ THttpConnManager* HttpConnManager() {
+ return Singleton<THttpConnManager>();
+ }
+
+ TAtomicCounter* HttpOutConnCounter() {
+ return &HttpConnManager()->TotalConn;
+ }
+
+ THttpConnRef THttpConn::Create(TIOService& srv) {
+ if (HttpConnManager()->IsShutdown()) {
+ throw yexception() << "can't create connection with shutdowned service";
+ }
+
+ return new THttpConn(srv);
+ }
+
+ void THttpConn::PutSelfToCache() {
+ THttpConnRef c(this);
+ HttpConnManager()->Put(c, AddrId_);
+ }
+
+ void THttpConn::OnConnectFailed(const TErrorCode& ec) {
+ THttpRequestRef r(GetRequest());
+ if (!!r) {
+ r->OnConnectFailed(this, ec);
+ }
+ OnError(ec);
+ }
+
+ void THttpConn::OnError(const TString& errText) {
+ Finalized_ = true;
+ if (Connected_) {
+ Connected_ = false;
+ TErrorCode ec;
+ AS_.Shutdown(NAsio::TTcpSocket::ShutdownBoth, ec);
+ } else {
+ if (AS_.IsOpen()) {
+ AS_.AsyncCancel();
+ }
+ }
+ THttpRequestRef r(ReleaseRequest());
+ if (!!r) {
+ r->OnError(this, errText);
+ } else {
+ if (Cached_) {
+ size_t res = HttpConnManager()->OnConnError(AddrId_);
+ Y_UNUSED(res);
+#ifdef DEBUG_STAT
+ TDebugStat::ConnDestroyedInCache += res;
+#endif
+ }
+ }
+ }
+
+ void THttpRequest::Run(THttpRequestRef& req) {
+#ifdef DEBUG_STAT
+ if ((++TDebugStat::RequestTotal & 0xFFF) == 0) {
+ TDebugStat::Print();
+ }
+#endif
+ try {
+ while (!Canceled_) {
+ THttpConnRef conn;
+ if (HttpConnManager()->Get(conn, Addr_->Id)) {
+ DBGOUT("Use connection from cache");
+ Conn_ = conn; //thread magic
+ if (!conn->StartNextRequest(req, RequestSettings_.UseAsyncSendRequest)) {
+ continue; //if use connection from cache, ignore write error and try another conn
+ }
+ } else {
+ HttpConnManager()->CheckLimits(); //here throw exception if reach hard limit (or atexit() state)
+ Conn_ = THttpConn::Create(HttpConnManager()->GetIOService());
+ TEndpoint ep(new NAddr::TAddrInfo(&*AddrIter_));
+ Conn_->StartRequest(req, ep, Addr_->Id, THttp2Options::SymptomSlowConnect); // can throw
+ }
+ break;
+ }
+ } catch (...) {
+ Conn_.Reset();
+ throw;
+ }
+ }
+
+ //it seems we have lost TCP SYN packet, create extra connection for decrease response time
+ void THttpRequest::OnDetectSlowConnecting() {
+#ifdef DEBUG_STAT
+ ++TDebugStat::ConnSlow;
+#endif
+ //use some io_service (Run() thread-executor), from first conn. for more thread safety
+ THttpConnRef conn = GetConn();
+
+ if (!conn) {
+ return;
+ }
+
+ THttpConnRef conn2;
+ try {
+ conn2 = THttpConn::Create(conn->GetIOService());
+ } catch (...) {
+ return; // cant create spare connection, simple continue use only main
+ }
+
+ {
+ TGuard<TSpinLock> g(SL_);
+ Conn2_ = conn2;
+ }
+
+ if (Y_UNLIKELY(Canceled_)) {
+ ReleaseConn2();
+ } else {
+ //use connect timeout for disable detecting slow connecting on second conn.
+ TEndpoint ep(new NAddr::TAddrInfo(&*Addr_->Addr.Begin()));
+ try {
+ conn2->StartRequest(WeakThis_, ep, Addr_->Id, THttp2Options::ConnectTimeout);
+ } catch (...) {
+ // ignore errors on spare connection
+ ReleaseConn2();
+ }
+ }
+ }
+
+ void THttpRequest::OnConnect(THttpConn* c) {
+ THttpConnRef extraConn;
+ {
+ TGuard<TSpinLock> g(SL_);
+ if (Y_UNLIKELY(!!Conn2_)) {
+ //has pair concurrent conn, 'should stay only one'
+ if (Conn2_.Get() == c) {
+#ifdef DEBUG_STAT
+ ++TDebugStat::Conn2Success;
+#endif
+ Conn2_.Swap(Conn_);
+ }
+ extraConn.Swap(Conn2_);
+ }
+ }
+ if (!!extraConn) {
+ extraConn->DetachRequest(); //prevent call OnError()
+ extraConn->Close();
+ }
+ }
+
+ void THttpRequest::OnResponse(TAutoPtr<THttpParser>& rsp) {
+ DBGOUT("THttpRequest::OnResponse()");
+ ReleaseConn();
+ if (Y_LIKELY(((rsp->RetCode() >= 200 && rsp->RetCode() < (!THttp2Options::RedirectionNotError ? 300 : 400)) || THttp2Options::AnyResponseIsNotError))) {
+ NotifyResponse(rsp->DecodedContent(), rsp->FirstLine(), rsp->Headers());
+ } else {
+ TString message;
+
+ if (THttp2Options::FullHeadersAsErrorMessage) {
+ TStringStream err;
+ err << rsp->FirstLine();
+
+ THttpHeaders hdrs = rsp->Headers();
+ for (auto h = hdrs.begin(); h < hdrs.end(); h++) {
+ err << h->ToString() << TStringBuf("\r\n");
+ }
+
+ message = err.Str();
+ } else if (THttp2Options::UseResponseAsErrorMessage) {
+ message = rsp->DecodedContent();
+ } else {
+ TStringStream err;
+ err << TStringBuf("request failed(") << rsp->FirstLine() << TStringBuf(")");
+ message = err.Str();
+ }
+
+ NotifyError(new TError(message, TError::ProtocolSpecific, rsp->RetCode()), rsp.Get());
+ }
+ }
+
+ void THttpRequest::OnConnectFailed(THttpConn* c, const TErrorCode& ec) {
+ DBGOUT("THttpRequest::OnConnectFailed()");
+ //detach/discard failed conn, try connect to next ip addr (if can)
+ THttpConnRef cc(GetConn());
+ if (c != cc.Get() || AddrIter_ == Addr_->Addr.End() || ++AddrIter_ == Addr_->Addr.End() || Canceled_) {
+ return OnSystemError(c, ec);
+ }
+ // can try next host addr
+ c->DetachRequest();
+ c->Close();
+ THttpConnRef nextConn;
+ try {
+ nextConn = THttpConn::Create(HttpConnManager()->GetIOService());
+ } catch (...) {
+ OnSystemError(nullptr, ec);
+ return;
+ }
+ {
+ THttpConnRef nc = nextConn;
+ TGuard<TSpinLock> g(SL_);
+ Conn_.Swap(nc);
+ }
+ TEndpoint ep(new NAddr::TAddrInfo(&*AddrIter_));
+ try {
+ nextConn->StartRequest(WeakThis_, ep, Addr_->Id, THttp2Options::SymptomSlowConnect);
+ } catch (...) {
+ OnError(nullptr, CurrentExceptionMessage());
+ return;
+ }
+
+ if (Canceled_) {
+ OnError(nullptr, "canceled");
+ }
+ }
+
+ void THttpRequest::OnSystemError(THttpConn* c, const TErrorCode& ec) {
+ DBGOUT("THttpRequest::OnSystemError()");
+ NotifyError(ec.Text(), TError::TType::UnknownType, 0, ec.Value());
+ Finalize(c);
+ }
+
+ void THttpRequest::OnError(THttpConn* c, const TString& errorText) {
+ DBGOUT("THttpRequest::OnError()");
+ NotifyError(errorText);
+ Finalize(c);
+ }
+
+ bool THttpRequest::RequestSendedCompletely() noexcept {
+ if (RequestSendedCompletely_) {
+ return true;
+ }
+
+ THttpConnRef c(GetConn());
+ return !!c ? c->RequestSendedCompletely() : false;
+ }
+
+ void THttpRequest::Cancel() noexcept {
+ if (!Canceled_) {
+ Canceled_ = true;
+ try {
+ static const TString canceled("Canceled");
+ NotifyError(canceled, TError::Cancelled);
+ Finalize();
+ } catch (...) {
+ }
+ }
+ }
+
+ inline void FinalizeConn(THttpConnRef& c, THttpConn* skipConn) noexcept {
+ if (!!c && c.Get() != skipConn) {
+ c->DetachRequest();
+ c->Close();
+ }
+ }
+
+ void THttpRequest::Finalize(THttpConn* skipConn) noexcept {
+ THttpConnRef c1(ReleaseConn());
+ FinalizeConn(c1, skipConn);
+ THttpConnRef c2(ReleaseConn2());
+ FinalizeConn(c2, skipConn);
+ }
+
+ /////////////////////////////////// server side ////////////////////////////////////
+
+ TAtomicCounter* HttpInConnCounter() {
+ return Singleton<TAtomicCounter>();
+ }
+
+ TFdLimits* HttpInConnLimits() {
+ return Singleton<TFdLimits>();
+ }
+
+ class THttpServer: public IRequester {
+ typedef TAutoPtr<TTcpAcceptor> TTcpAcceptorPtr;
+ typedef TAtomicSharedPtr<TTcpSocket> TTcpSocketRef;
+ class TConn;
+ typedef TSharedPtrB<TConn> TConnRef;
+
+ class TRequest: public IHttpRequest {
+ public:
+ TRequest(TWeakPtrB<TConn>& c, TAutoPtr<THttpParser>& p)
+ : C_(c)
+ , P_(p)
+ , RemoteHost_(C_->RemoteHost())
+ , CompressionScheme_(P_->GetBestCompressionScheme())
+ , H_(TStringBuf(P_->FirstLine()))
+ {
+ }
+
+ ~TRequest() override {
+ if (!!C_) {
+ try {
+ C_->SendError(Id(), 503, "service unavailable (request ignored)", P_->HttpVersion(), {});
+ } catch (...) {
+ DBGOUT("~TRequest()::SendFail() exception");
+ }
+ }
+ }
+
+ TAtomicBase Id() const {
+ return Id_;
+ }
+
+ protected:
+ TStringBuf Scheme() const override {
+ return TStringBuf("http");
+ }
+
+ TString RemoteHost() const override {
+ return RemoteHost_;
+ }
+
+ TStringBuf Service() const override {
+ return TStringBuf(H_.Path).Skip(1);
+ }
+
+ const THttpHeaders& Headers() const override {
+ return P_->Headers();
+ }
+
+ TStringBuf Method() const override {
+ return H_.Method;
+ }
+
+ TStringBuf Body() const override {
+ return P_->DecodedContent();
+ }
+
+ TStringBuf Cgi() const override {
+ return H_.Cgi;
+ }
+
+ TStringBuf RequestId() const override {
+ return TStringBuf();
+ }
+
+ bool Canceled() const override {
+ if (!C_) {
+ return false;
+ }
+ return C_->IsCanceled();
+ }
+
+ void SendReply(TData& data) override {
+ SendReply(data, TString(), HttpCodes::HTTP_OK);
+ }
+
+ void SendReply(TData& data, const TString& headers, int httpCode) override {
+ if (!!C_) {
+ C_->Send(Id(), data, CompressionScheme_, P_->HttpVersion(), headers, httpCode);
+ C_.Reset();
+ }
+ }
+
+ void SendError(TResponseError err, const THttpErrorDetails& details) override {
+ static const unsigned errorToHttpCode[IRequest::MaxResponseError] =
+ {
+ 400,
+ 403,
+ 404,
+ 429,
+ 500,
+ 501,
+ 502,
+ 503,
+ 509};
+
+ if (!!C_) {
+ C_->SendError(Id(), errorToHttpCode[err], details.Details, P_->HttpVersion(), details.Headers);
+ C_.Reset();
+ }
+ }
+
+ static TAtomicBase NextId() {
+ static TAtomic idGenerator = 0;
+ TAtomicBase id = 0;
+ do {
+ id = AtomicIncrement(idGenerator);
+ } while (!id);
+ return id;
+ }
+
+ TSharedPtrB<TConn> C_;
+ TAutoPtr<THttpParser> P_;
+ TString RemoteHost_;
+ TString CompressionScheme_;
+ TParsedHttpFull H_;
+ TAtomicBase Id_ = NextId();
+ };
+
+ class TRequestGet: public TRequest {
+ public:
+ TRequestGet(TWeakPtrB<TConn>& c, TAutoPtr<THttpParser> p)
+ : TRequest(c, p)
+ {
+ }
+
+ TStringBuf Data() const override {
+ return H_.Cgi;
+ }
+ };
+
+ class TRequestPost: public TRequest {
+ public:
+ TRequestPost(TWeakPtrB<TConn>& c, TAutoPtr<THttpParser> p)
+ : TRequest(c, p)
+ {
+ }
+
+ TStringBuf Data() const override {
+ return P_->DecodedContent();
+ }
+ };
+
+ class TConn {
+ private:
+ TConn(THttpServer& hs, const TTcpSocketRef& s)
+ : HS_(hs)
+ , AS_(s)
+ , RemoteHost_(NNeh::PrintHostByRfc(*AS_->RemoteEndpoint().Addr()))
+ , BuffSize_(THttp2Options::InputBufferSize)
+ , Buff_(new char[BuffSize_])
+ , Canceled_(false)
+ , LeftRequestsToDisconnect_(hs.LimitRequestsPerConnection)
+ {
+ DBGOUT("THttpServer::TConn()");
+ HS_.OnCreateConn();
+ }
+
+ inline TConnRef SelfRef() noexcept {
+ return WeakThis_;
+ }
+
+ public:
+ static void Create(THttpServer& hs, const TTcpSocketRef& s) {
+ TSharedPtrB<TConn> conn(new TConn(hs, s));
+ conn->WeakThis_ = conn;
+ conn->ExpectNewRequest();
+ conn->AS_->AsyncPollRead(std::bind(&TConn::OnCanRead, conn, _1, _2), THttp2Options::ServerInputDeadline);
+ }
+
+ ~TConn() {
+ DBGOUT("~THttpServer::TConn(" << (!AS_ ? -666 : AS_->Native()) << ")");
+ HS_.OnDestroyConn();
+ }
+
+ private:
+ void ExpectNewRequest() {
+ P_.Reset(new THttpParser(THttpParser::Request));
+ P_->Prepare();
+ }
+
+ void OnCanRead(const TErrorCode& ec, IHandlingContext& ctx) {
+ if (ec) {
+ OnError();
+ } else {
+ TErrorCode ec2;
+ OnReadSome(ec2, AS_->ReadSome(Buff_.Get(), BuffSize_, ec2), ctx);
+ }
+ }
+
+ void OnError() {
+ DBGOUT("Srv OnError(" << (!AS_ ? -666 : AS_->Native()) << ")");
+ Canceled_ = true;
+ AS_->AsyncCancel();
+ }
+
+ void OnReadSome(const TErrorCode& ec, size_t amount, IHandlingContext& ctx) {
+ if (ec || !amount) {
+ OnError();
+
+ return;
+ }
+
+ DBGOUT("ReadSome(" << (!AS_ ? -666 : AS_->Native()) << "): " << amount);
+ try {
+ size_t buffPos = 0;
+ //DBGOUT("receive and parse: " << TStringBuf(Buff_.Get(), amount));
+ while (P_->Parse(Buff_.Get() + buffPos, amount - buffPos)) {
+ SeenMessageWithoutKeepalive_ |= !P_->IsKeepAlive() || LeftRequestsToDisconnect_ == 1;
+ char rt = *P_->FirstLine().data();
+ const size_t extraDataSize = P_->GetExtraDataSize();
+ if (rt == 'P' || rt == 'p') {
+ OnRequest(new TRequestPost(WeakThis_, P_));
+ } else {
+ OnRequest(new TRequestGet(WeakThis_, P_));
+ }
+ if (extraDataSize) {
+ // has http pipelining
+ buffPos = amount - extraDataSize;
+ ExpectNewRequest();
+ } else {
+ ExpectNewRequest();
+ ctx.ContinueUseHandler(HS_.GetKeepAliveTimeout());
+ return;
+ }
+ }
+ ctx.ContinueUseHandler(THttp2Options::ServerInputDeadline);
+ } catch (...) {
+ OnError();
+ }
+ }
+
+ void OnRequest(TRequest* r) {
+ DBGOUT("OnRequest()");
+ if (AtomicGet(PrimaryResponse_)) {
+ // has pipelining
+ PipelineOrder_.Enqueue(r->Id());
+ } else {
+ AtomicSet(PrimaryResponse_, r->Id());
+ }
+ HS_.OnRequest(r);
+ OnRequestDone();
+ }
+
+ void OnRequestDone() {
+ DBGOUT("OnRequestDone()");
+ if (LeftRequestsToDisconnect_ > 0) {
+ --LeftRequestsToDisconnect_;
+ }
+ }
+
+ static void PrintHttpVersion(IOutputStream& out, const THttpVersion& ver) {
+ out << TStringBuf("HTTP/") << ver.Major << TStringBuf(".") << ver.Minor;
+ }
+
+ struct TResponseData : TThrRefBase {
+ TResponseData(size_t reqId, TTcpSocket::TSendedData data)
+ : RequestId_(reqId)
+ , Data_(data)
+ {
+ }
+
+ size_t RequestId_;
+ TTcpSocket::TSendedData Data_;
+ };
+ typedef TIntrusivePtr<TResponseData> TResponseDataRef;
+
+ public:
+ //called non thread-safe (from outside thread)
+ void Send(TAtomicBase requestId, TData& data, const TString& compressionScheme, const THttpVersion& ver, const TString& headers, int httpCode) {
+ class THttpResponseFormatter {
+ public:
+ THttpResponseFormatter(TData& theData, const TString& contentEncoding, const THttpVersion& theVer, const TString& theHeaders, int theHttpCode, bool closeConnection) {
+ Header.Reserve(128 + contentEncoding.size() + theHeaders.size());
+ PrintHttpVersion(Header, theVer);
+ Header << TStringBuf(" ") << theHttpCode << ' ' << HttpCodeStr(theHttpCode);
+ if (Compress(theData, contentEncoding)) {
+ Header << TStringBuf("\r\nContent-Encoding: ") << contentEncoding;
+ }
+ Header << TStringBuf("\r\nContent-Length: ") << theData.size();
+ if (closeConnection) {
+ Header << TStringBuf("\r\nConnection: close");
+ } else if (Y_LIKELY(theVer.Major > 1 || theVer.Minor > 0)) {
+ // since HTTP/1.1 Keep-Alive is default behaviour
+ Header << TStringBuf("\r\nConnection: Keep-Alive");
+ }
+ if (theHeaders) {
+ Header << theHeaders;
+ }
+ Header << TStringBuf("\r\n\r\n");
+
+ Body.swap(theData);
+
+ Parts[0].buf = Header.Data();
+ Parts[0].len = Header.Size();
+ Parts[1].buf = Body.data();
+ Parts[1].len = Body.size();
+ }
+
+ TStringStream Header;
+ TData Body;
+ IOutputStream::TPart Parts[2];
+ };
+
+ class TBuffers: public THttpResponseFormatter, public TTcpSocket::IBuffers {
+ public:
+ TBuffers(TData& theData, const TString& contentEncoding, const THttpVersion& theVer, const TString& theHeaders, int theHttpCode, bool closeConnection)
+ : THttpResponseFormatter(theData, contentEncoding, theVer, theHeaders, theHttpCode, closeConnection)
+ , IOVec(Parts, 2)
+ {
+ }
+
+ TContIOVector* GetIOvec() override {
+ return &IOVec;
+ }
+
+ TContIOVector IOVec;
+ };
+
+ TTcpSocket::TSendedData sd(new TBuffers(data, compressionScheme, ver, headers, httpCode, SeenMessageWithoutKeepalive_));
+ SendData(requestId, sd);
+ }
+
+ //called non thread-safe (from outside thread)
+ void SendError(TAtomicBase requestId, unsigned httpCode, const TString& descr, const THttpVersion& ver, const TString& headers) {
+ if (Canceled_) {
+ return;
+ }
+
+ class THttpErrorResponseFormatter {
+ public:
+ THttpErrorResponseFormatter(unsigned theHttpCode, const TString& theDescr, const THttpVersion& theVer, bool closeConnection, const TString& headers) {
+ PrintHttpVersion(Answer, theVer);
+ Answer << TStringBuf(" ") << theHttpCode << TStringBuf(" ");
+ if (theDescr.size() && !THttp2Options::ErrorDetailsAsResponseBody) {
+ // Reason-Phrase = *<TEXT, excluding CR, LF>
+ // replace bad chars to '.'
+ TString reasonPhrase = theDescr;
+ for (TString::iterator it = reasonPhrase.begin(); it != reasonPhrase.end(); ++it) {
+ char& ch = *it;
+ if (ch == ' ') {
+ continue;
+ }
+ if (((ch & 31) == ch) || static_cast<unsigned>(ch) == 127 || (static_cast<unsigned>(ch) & 0x80)) {
+ //CTLs || DEL(127) || non ascii
+ // (ch <= 32) || (ch >= 127)
+ ch = '.';
+ }
+ }
+ Answer << reasonPhrase;
+ } else {
+ Answer << HttpCodeStr(static_cast<int>(theHttpCode));
+ }
+
+ if (closeConnection) {
+ Answer << TStringBuf("\r\nConnection: close");
+ }
+
+ if (headers) {
+ Answer << "\r\n" << headers;
+ }
+
+ if (THttp2Options::ErrorDetailsAsResponseBody) {
+ Answer << TStringBuf("\r\nContent-Length:") << theDescr.size() << "\r\n\r\n" << theDescr;
+ } else {
+ Answer << "\r\n"
+ "Content-Length:0\r\n\r\n"sv;
+ }
+
+ Parts[0].buf = Answer.Data();
+ Parts[0].len = Answer.Size();
+ }
+
+ TStringStream Answer;
+ IOutputStream::TPart Parts[1];
+ };
+
+ class TBuffers: public THttpErrorResponseFormatter, public TTcpSocket::IBuffers {
+ public:
+ TBuffers(
+ unsigned theHttpCode,
+ const TString& theDescr,
+ const THttpVersion& theVer,
+ bool closeConnection,
+ const TString& headers
+ )
+ : THttpErrorResponseFormatter(theHttpCode, theDescr, theVer, closeConnection, headers)
+ , IOVec(Parts, 1)
+ {
+ }
+
+ TContIOVector* GetIOvec() override {
+ return &IOVec;
+ }
+
+ TContIOVector IOVec;
+ };
+
+ TTcpSocket::TSendedData sd(new TBuffers(httpCode, descr, ver, SeenMessageWithoutKeepalive_, headers));
+ SendData(requestId, sd);
+ }
+
+ void ProcessPipeline() {
+ // on successfull response to current (PrimaryResponse_) request
+ TAtomicBase requestId;
+ if (PipelineOrder_.Dequeue(&requestId)) {
+ TAtomicBase oldReqId;
+ do {
+ oldReqId = AtomicGet(PrimaryResponse_);
+ Y_VERIFY(oldReqId, "race inside http pipelining");
+ } while (!AtomicCas(&PrimaryResponse_, requestId, oldReqId));
+
+ ProcessResponsesData();
+ } else {
+ TAtomicBase oldReqId = AtomicGet(PrimaryResponse_);
+ if (oldReqId) {
+ while (!AtomicCas(&PrimaryResponse_, 0, oldReqId)) {
+ Y_VERIFY(oldReqId == AtomicGet(PrimaryResponse_), "race inside http pipelining [2]");
+ }
+ }
+ }
+ }
+
+ void ProcessResponsesData() {
+ // process responses data queue, send response (if already have next PrimaryResponse_)
+ TResponseDataRef rd;
+ while (ResponsesDataQueue_.Dequeue(&rd)) {
+ ResponsesData_[rd->RequestId_] = rd;
+ }
+ TAtomicBase requestId = AtomicGet(PrimaryResponse_);
+ if (requestId) {
+ THashMap<TAtomicBase, TResponseDataRef>::iterator it = ResponsesData_.find(requestId);
+ if (it != ResponsesData_.end()) {
+ // has next primary response
+ rd = it->second;
+ ResponsesData_.erase(it);
+ AS_->AsyncWrite(rd->Data_, std::bind(&TConn::OnSend, SelfRef(), _1, _2, _3), THttp2Options::ServerOutputDeadline);
+ }
+ }
+ }
+
+ private:
+ void SendData(TAtomicBase requestId, TTcpSocket::TSendedData sd) {
+ TContIOVector& vec = *sd->GetIOvec();
+
+ if (requestId != AtomicGet(PrimaryResponse_)) {
+ // already has another request for response first, so push this to queue
+ // + enqueue event for safe checking queue (at local/transport thread)
+ TResponseDataRef rdr = new TResponseData(requestId, sd);
+ ResponsesDataQueue_.Enqueue(rdr);
+ AS_->GetIOService().Post(std::bind(&TConn::ProcessResponsesData, SelfRef()));
+ return;
+ }
+ if (THttp2Options::ServerUseDirectWrite) {
+ vec.Proceed(AS_->WriteSome(vec));
+ }
+ if (!vec.Complete()) {
+ DBGOUT("AsyncWrite()");
+ AS_->AsyncWrite(sd, std::bind(&TConn::OnSend, SelfRef(), _1, _2, _3), THttp2Options::ServerOutputDeadline);
+ } else {
+ // run ProcessPipeline at safe thread
+ AS_->GetIOService().Post(std::bind(&TConn::ProcessPipeline, SelfRef()));
+ }
+ }
+
+ void OnSend(const TErrorCode& ec, size_t amount, IHandlingContext&) {
+ Y_UNUSED(amount);
+ if (ec) {
+ OnError();
+ } else {
+ ProcessPipeline();
+ }
+
+ if (SeenMessageWithoutKeepalive_) {
+ TErrorCode shutdown_ec;
+ AS_->Shutdown(TTcpSocket::ShutdownBoth, shutdown_ec);
+ }
+ }
+
+ public:
+ bool IsCanceled() const noexcept {
+ return Canceled_;
+ }
+
+ const TString& RemoteHost() const noexcept {
+ return RemoteHost_;
+ }
+
+ private:
+ TWeakPtrB<TConn> WeakThis_;
+ THttpServer& HS_;
+ TTcpSocketRef AS_;
+ TString RemoteHost_;
+ size_t BuffSize_;
+ TArrayHolder<char> Buff_;
+ TAutoPtr<THttpParser> P_;
+ // pipeline supporting
+ TAtomic PrimaryResponse_ = 0;
+ TLockFreeQueue<TAtomicBase> PipelineOrder_;
+ TLockFreeQueue<TResponseDataRef> ResponsesDataQueue_;
+ THashMap<TAtomicBase, TResponseDataRef> ResponsesData_;
+
+ TAtomicBool Canceled_;
+ bool SeenMessageWithoutKeepalive_ = false;
+
+ i32 LeftRequestsToDisconnect_ = -1;
+ };
+
+ ///////////////////////////////////////////////////////////
+
+ public:
+ THttpServer(IOnRequest* cb, const TParsedLocation& loc)
+ : E_(THttp2Options::AsioServerThreads)
+ , CB_(cb)
+ , LimitRequestsPerConnection(THttp2Options::LimitRequestsPerConnection)
+ {
+ TNetworkAddress addr(loc.GetPort());
+
+ for (TNetworkAddress::TIterator it = addr.Begin(); it != addr.End(); ++it) {
+ TEndpoint ep(new NAddr::TAddrInfo(&*it));
+ TTcpAcceptorPtr a(new TTcpAcceptor(AcceptExecutor_.GetIOService()));
+ DBGOUT("bind:" << ep.IpToString() << ":" << ep.Port());
+ a->Bind(ep);
+ a->Listen(THttp2Options::Backlog);
+ StartAccept(a.Get());
+ A_.push_back(a);
+ }
+ }
+
+ ~THttpServer() override {
+ AcceptExecutor_.SyncShutdown(); //cancel operation for all current sockets (include acceptors)
+ A_.clear(); //stop listening
+ E_.SyncShutdown();
+ }
+
+ void OnAccept(TTcpAcceptor* a, TAtomicSharedPtr<TTcpSocket> s, const TErrorCode& ec, IHandlingContext&) {
+ if (Y_UNLIKELY(ec)) {
+ if (ec.Value() == ECANCELED) {
+ return;
+ } else if (ec.Value() == EMFILE || ec.Value() == ENFILE || ec.Value() == ENOMEM || ec.Value() == ENOBUFS) {
+ //reach some os limit, suspend accepting
+ TAtomicSharedPtr<TDeadlineTimer> dt(new TDeadlineTimer(a->GetIOService()));
+ dt->AsyncWaitExpireAt(TDuration::Seconds(30), std::bind(&THttpServer::OnTimeoutSuspendAccept, this, a, dt, _1, _2));
+ return;
+ } else {
+ Cdbg << "acc: " << ec.Text() << Endl;
+ }
+ } else {
+ if (static_cast<size_t>(HttpInConnCounter()->Val()) < HttpInConnLimits()->Hard()) {
+ try {
+ SetNonBlock(s->Native());
+ PrepareSocket(s->Native());
+ TConn::Create(*this, s);
+ } catch (TSystemError& err) {
+ TErrorCode ec2(err.Status());
+ Cdbg << "acc: " << ec2.Text() << Endl;
+ }
+ } //else accepted socket will be closed
+ }
+ StartAccept(a); //continue accepting
+ }
+
+ void OnTimeoutSuspendAccept(TTcpAcceptor* a, TAtomicSharedPtr<TDeadlineTimer>, const TErrorCode& ec, IHandlingContext&) {
+ if (!ec) {
+ DBGOUT("resume acceptor")
+ StartAccept(a);
+ }
+ }
+
+ void OnRequest(IRequest* r) {
+ try {
+ CB_->OnRequest(r);
+ } catch (...) {
+ Cdbg << CurrentExceptionMessage() << Endl;
+ }
+ }
+
+ protected:
+ void OnCreateConn() noexcept {
+ HttpInConnCounter()->Inc();
+ }
+
+ void OnDestroyConn() noexcept {
+ HttpInConnCounter()->Dec();
+ }
+
+ TDuration GetKeepAliveTimeout() const noexcept {
+ size_t cc = HttpInConnCounter()->Val();
+ TFdLimits lim(*HttpInConnLimits());
+
+ if (!TFdLimits::ExceedLimit(cc, lim.Soft())) {
+ return THttp2Options::ServerInputDeadlineKeepAliveMax;
+ }
+
+ if (cc > lim.Hard()) {
+ cc = lim.Hard();
+ }
+ TDuration::TValue softTuneRange = THttp2Options::ServerInputDeadlineKeepAliveMax.Seconds() - THttp2Options::ServerInputDeadlineKeepAliveMin.Seconds();
+
+ return TDuration::Seconds((softTuneRange * (cc - lim.Soft())) / (lim.Hard() - lim.Soft() + 1)) + THttp2Options::ServerInputDeadlineKeepAliveMin;
+ }
+
+ private:
+ void StartAccept(TTcpAcceptor* a) {
+ TAtomicSharedPtr<TTcpSocket> s(new TTcpSocket(E_.Size() ? E_.GetExecutor().GetIOService() : AcceptExecutor_.GetIOService()));
+ a->AsyncAccept(*s, std::bind(&THttpServer::OnAccept, this, a, s, _1, _2));
+ }
+
+ TIOServiceExecutor AcceptExecutor_;
+ TVector<TTcpAcceptorPtr> A_;
+ TExecutorsPool E_;
+ IOnRequest* CB_;
+
+ public:
+ const i32 LimitRequestsPerConnection;
+ };
+
+ template <class T>
+ class THttp2Protocol: public IProtocol {
+ public:
+ IRequesterRef CreateRequester(IOnRequest* cb, const TParsedLocation& loc) override {
+ return new THttpServer(cb, loc);
+ }
+
+ THandleRef ScheduleRequest(const TMessage& msg, IOnRecv* fallback, TServiceStatRef& ss) override {
+ THttpRequest::THandleRef ret(new THttpRequest::THandle(fallback, msg, !ss ? nullptr : new TStatCollector(ss)));
+ try {
+ THttpRequest::Run(ret, msg, &T::Build, T::RequestSettings());
+ } catch (...) {
+ ret->ResetOnRecv();
+ throw;
+ }
+ return ret.Get();
+ }
+
+ THandleRef ScheduleAsyncRequest(const TMessage& msg, IOnRecv* fallback, TServiceStatRef& ss, bool useAsyncSendRequest) override {
+ THttpRequest::THandleRef ret(new THttpRequest::THandle(fallback, msg, !ss ? nullptr : new TStatCollector(ss)));
+ try {
+ auto requestSettings = T::RequestSettings();
+ requestSettings.SetUseAsyncSendRequest(useAsyncSendRequest);
+ THttpRequest::Run(ret, msg, &T::Build, requestSettings);
+ } catch (...) {
+ ret->ResetOnRecv();
+ throw;
+ }
+ return ret.Get();
+ }
+
+ TStringBuf Scheme() const noexcept override {
+ return T::Name();
+ }
+
+ bool SetOption(TStringBuf name, TStringBuf value) override {
+ return THttp2Options::Set(name, value);
+ }
+ };
+}
+
+namespace NNeh {
+ IProtocol* Http1Protocol() {
+ return Singleton<THttp2Protocol<TRequestGet1>>();
+ }
+ IProtocol* Post1Protocol() {
+ return Singleton<THttp2Protocol<TRequestPost1>>();
+ }
+ IProtocol* Full1Protocol() {
+ return Singleton<THttp2Protocol<TRequestFull1>>();
+ }
+ IProtocol* Http2Protocol() {
+ return Singleton<THttp2Protocol<TRequestGet2>>();
+ }
+ IProtocol* Post2Protocol() {
+ return Singleton<THttp2Protocol<TRequestPost2>>();
+ }
+ IProtocol* Full2Protocol() {
+ return Singleton<THttp2Protocol<TRequestFull2>>();
+ }
+ IProtocol* UnixSocketGetProtocol() {
+ return Singleton<THttp2Protocol<TRequestUnixSocketGet>>();
+ }
+ IProtocol* UnixSocketPostProtocol() {
+ return Singleton<THttp2Protocol<TRequestUnixSocketPost>>();
+ }
+ IProtocol* UnixSocketFullProtocol() {
+ return Singleton<THttp2Protocol<TRequestUnixSocketFull>>();
+ }
+
+ void SetHttp2OutputConnectionsLimits(size_t softLimit, size_t hardLimit) {
+ HttpConnManager()->SetLimits(softLimit, hardLimit);
+ }
+
+ void SetHttp2InputConnectionsLimits(size_t softLimit, size_t hardLimit) {
+ HttpInConnLimits()->SetSoft(softLimit);
+ HttpInConnLimits()->SetHard(hardLimit);
+ }
+
+ TAtomicBase GetHttpOutputConnectionCount() {
+ return HttpOutConnCounter()->Val();
+ }
+
+ std::pair<size_t, size_t> GetHttpOutputConnectionLimits() {
+ return HttpConnManager()->GetLimits();
+ }
+
+ TAtomicBase GetHttpInputConnectionCount() {
+ return HttpInConnCounter()->Val();
+ }
+
+ void SetHttp2InputConnectionsTimeouts(unsigned minSeconds, unsigned maxSeconds) {
+ THttp2Options::ServerInputDeadlineKeepAliveMin = TDuration::Seconds(minSeconds);
+ THttp2Options::ServerInputDeadlineKeepAliveMax = TDuration::Seconds(maxSeconds);
+ }
+
+ class TUnixSocketResolver {
+ public:
+ NDns::TResolvedHost* Resolve(const TString& path) {
+ TString unixSocketPath = path;
+ if (path.size() > 2 && path[0] == '[' && path[path.size() - 1] == ']') {
+ unixSocketPath = path.substr(1, path.size() - 2);
+ }
+
+ if (auto resolvedUnixSocket = ResolvedUnixSockets_.FindPtr(unixSocketPath)) {
+ return resolvedUnixSocket->Get();
+ }
+
+ TNetworkAddress na{TUnixSocketPath(unixSocketPath)};
+ ResolvedUnixSockets_[unixSocketPath] = MakeHolder<NDns::TResolvedHost>(unixSocketPath, na);
+
+ return ResolvedUnixSockets_[unixSocketPath].Get();
+ }
+
+ private:
+ THashMap<TString, THolder<NDns::TResolvedHost>> ResolvedUnixSockets_;
+ };
+
+ TUnixSocketResolver* UnixSocketResolver() {
+ return FastTlsSingleton<TUnixSocketResolver>();
+ }
+
+ const NDns::TResolvedHost* Resolve(const TStringBuf host, ui16 port, NHttp::EResolverType resolverType) {
+ if (resolverType == EResolverType::EUNIXSOCKET) {
+ return UnixSocketResolver()->Resolve(TString(host));
+ }
+ return NDns::CachedResolve(NDns::TResolveInfo(host, port));
+
+ }
+}