#include "http2.h"

#include "conn_cache.h"
#include "details.h"
#include "factory.h"
#include "http_common.h"
#include "smart_ptr.h"
#include "utils.h"

#include <library/cpp/http/push_parser/http_parser.h>
#include <library/cpp/http/misc/httpcodes.h>
#include <library/cpp/http/misc/parsed_request.h>
#include <library/cpp/neh/asio/executor.h>

#include <util/generic/singleton.h>
#include <util/generic/vector.h>
#include <util/network/iovec.h>
#include <util/stream/output.h>
#include <util/stream/zlib.h>
#include <util/system/condvar.h>
#include <util/system/mutex.h>
#include <util/system/spinlock.h>
#include <util/system/yassert.h>
#include <util/thread/factory.h>
#include <util/thread/singleton.h>
#include <util/system/sanitizers.h>
#include <util/system/thread.h>

#include <atomic>

#if defined(_unix_)
#include <sys/ioctl.h>
#endif

#if defined(_linux_)
#undef SIOCGSTAMP
#undef SIOCGSTAMPNS
#include <linux/sockios.h>
#define FIONWRITE SIOCOUTQ
#endif

//#define DEBUG_HTTP2

#ifdef DEBUG_HTTP2
#define DBGOUT(args) Cout << args << Endl;
#else
#define DBGOUT(args)
#endif

using namespace NDns;
using namespace NAsio;
using namespace NNeh;
using namespace NNeh::NHttp;
using namespace NNeh::NHttp2;
using namespace std::placeholders;

//
// has complex keep-alive references between entities in multi-thread enviroment,
// this create risks for races/memory leak, etc..
// so connecting/disconnecting entities must be doing carefully
//
// handler <=-> request <==> connection(socket) <= handlers, stored in io_service
//                           ^
//                           +== connections_cache
// '=>' -- shared/intrusive ptr
// '->' -- weak_ptr
//

static TDuration FixTimeoutForSanitizer(const TDuration timeout) {
    ui64 multiplier = 1;
    if (NSan::ASanIsOn()) {
        // https://github.com/google/sanitizers/wiki/AddressSanitizer
        multiplier = 4;
    } else if (NSan::MSanIsOn()) {
        // via https://github.com/google/sanitizers/wiki/MemorySanitizer
        multiplier = 3;
    } else if (NSan::TSanIsOn()) {
        // via https://clang.llvm.org/docs/ThreadSanitizer.html
        multiplier = 15;
    }

    return TDuration::FromValue(timeout.GetValue() * multiplier);
}

TDuration THttp2Options::ConnectTimeout = FixTimeoutForSanitizer(TDuration::MilliSeconds(1000));
TDuration THttp2Options::InputDeadline = TDuration::Max();
TDuration THttp2Options::OutputDeadline = TDuration::Max();
TDuration THttp2Options::SymptomSlowConnect = FixTimeoutForSanitizer(TDuration::MilliSeconds(10));
size_t THttp2Options::InputBufferSize = 16 * 1024;
bool THttp2Options::KeepInputBufferForCachedConnections = false;
size_t THttp2Options::AsioThreads = 4;
size_t THttp2Options::AsioServerThreads = 4;
bool THttp2Options::EnsureSendingCompleteByAck = false;
int THttp2Options::Backlog = 100;
TDuration THttp2Options::ServerInputDeadline = FixTimeoutForSanitizer(TDuration::MilliSeconds(500));
TDuration THttp2Options::ServerOutputDeadline = TDuration::Max();
TDuration THttp2Options::ServerInputDeadlineKeepAliveMax = FixTimeoutForSanitizer(TDuration::Seconds(120));
TDuration THttp2Options::ServerInputDeadlineKeepAliveMin = FixTimeoutForSanitizer(TDuration::Seconds(10));
bool THttp2Options::ServerUseDirectWrite = false;
bool THttp2Options::UseResponseAsErrorMessage = false;
bool THttp2Options::FullHeadersAsErrorMessage = false;
bool THttp2Options::ErrorDetailsAsResponseBody = false;
bool THttp2Options::RedirectionNotError = false;
bool THttp2Options::AnyResponseIsNotError = false;
bool THttp2Options::TcpKeepAlive = false;
i32 THttp2Options::LimitRequestsPerConnection = -1;
bool THttp2Options::QuickAck = false;
bool THttp2Options::UseAsyncSendRequest = false;
bool THttp2Options::RespectHostInHttpServerNetworkAddress = false;

bool THttp2Options::Set(TStringBuf name, TStringBuf value) {
#define HTTP2_TRY_SET(optType, optName)       \
    if (name == TStringBuf(#optName)) {      \
        optName = FromString<optType>(value); \
    }

    HTTP2_TRY_SET(TDuration, ConnectTimeout)
    else HTTP2_TRY_SET(TDuration, InputDeadline)
    else HTTP2_TRY_SET(TDuration, OutputDeadline)
    else HTTP2_TRY_SET(TDuration, SymptomSlowConnect) else HTTP2_TRY_SET(size_t, InputBufferSize) else HTTP2_TRY_SET(bool, KeepInputBufferForCachedConnections) else HTTP2_TRY_SET(size_t, AsioThreads) else HTTP2_TRY_SET(size_t, AsioServerThreads) else HTTP2_TRY_SET(bool, EnsureSendingCompleteByAck) else HTTP2_TRY_SET(int, Backlog) else HTTP2_TRY_SET(TDuration, ServerInputDeadline) else HTTP2_TRY_SET(TDuration, ServerOutputDeadline) else HTTP2_TRY_SET(TDuration, ServerInputDeadlineKeepAliveMax) else HTTP2_TRY_SET(TDuration, ServerInputDeadlineKeepAliveMin) else HTTP2_TRY_SET(bool, ServerUseDirectWrite) else HTTP2_TRY_SET(bool, UseResponseAsErrorMessage) else HTTP2_TRY_SET(bool, FullHeadersAsErrorMessage) else HTTP2_TRY_SET(bool, ErrorDetailsAsResponseBody) else HTTP2_TRY_SET(bool, RedirectionNotError) else HTTP2_TRY_SET(bool, AnyResponseIsNotError) else HTTP2_TRY_SET(bool, TcpKeepAlive) else HTTP2_TRY_SET(i32, LimitRequestsPerConnection) else HTTP2_TRY_SET(bool, QuickAck)
    else HTTP2_TRY_SET(bool, UseAsyncSendRequest) else {
        return false;
    }
    return true;
}

namespace NNeh {
    const NDns::TResolvedHost* Resolve(const TStringBuf host, ui16 port, NHttp::EResolverType resolverType);
}

namespace {
//#define DEBUG_STAT

#ifdef DEBUG_STAT
    struct TDebugStat {
        static std::atomic<size_t> ConnTotal;
        static std::atomic<size_t> ConnActive;
        static std::atomic<size_t> ConnCached;
        static std::atomic<size_t> ConnDestroyed;
        static std::atomic<size_t> ConnFailed;
        static std::atomic<size_t> ConnConnCanceled;
        static std::atomic<size_t> ConnSlow;
        static std::atomic<size_t> Conn2Success;
        static std::atomic<size_t> ConnPurgedInCache;
        static std::atomic<size_t> ConnDestroyedInCache;
        static std::atomic<size_t> RequestTotal;
        static std::atomic<size_t> RequestSuccessed;
        static std::atomic<size_t> RequestFailed;
        static void Print() {
            Cout << "ct=" << ConnTotal.load(std::memory_order_acquire)
                 << " ca=" << ConnActive.load(std::memory_order_acquire)
                 << " cch=" << ConnCached.load(std::memory_order_acquire)
                 << " cd=" << ConnDestroyed.load(std::memory_order_acquire)
                 << " cf=" << ConnFailed.load(std::memory_order_acquire)
                 << " ccc=" << ConnConnCanceled.load(std::memory_order_acquire)
                 << " csl=" << ConnSlow.load(std::memory_order_acquire)
                 << " c2s=" << Conn2Success.load(std::memory_order_acquire)
                 << " cpc=" << ConnPurgedInCache.load(std::memory_order_acquire)
                 << " cdc=" << ConnDestroyedInCache.load(std::memory_order_acquire)
                 << " rt=" << RequestTotal.load(std::memory_order_acquire)
                 << " rs=" << RequestSuccessed.load(std::memory_order_acquire)
                 << " rf=" << RequestFailed.load(std::memory_order_acquire)
                 << Endl;
        }
    };
    std::atomic<size_t> TDebugStat::ConnTotal = 0;
    std::atomic<size_t> TDebugStat::ConnActive = 0;
    std::atomic<size_t> TDebugStat::ConnCached = 0;
    std::atomic<size_t> TDebugStat::ConnDestroyed = 0;
    std::atomic<size_t> TDebugStat::ConnFailed = 0;
    std::atomic<size_t> TDebugStat::ConnConnCanceled = 0;
    std::atomic<size_t> TDebugStat::ConnSlow = 0;
    std::atomic<size_t> TDebugStat::Conn2Success = 0;
    std::atomic<size_t> TDebugStat::ConnPurgedInCache = 0;
    std::atomic<size_t> TDebugStat::ConnDestroyedInCache = 0;
    std::atomic<size_t> TDebugStat::RequestTotal = 0;
    std::atomic<size_t> TDebugStat::RequestSuccessed = 0;
    std::atomic<size_t> TDebugStat::RequestFailed = 0;
#endif

    inline void PrepareSocket(SOCKET s, const TRequestSettings& requestSettings = TRequestSettings()) {
        if (requestSettings.NoDelay) {
            SetNoDelay(s, true);
        }
    }

    bool Compress(TData& data, const TString& compressionScheme) {
        if (compressionScheme == "gzip" && data.size() > 23) {  // there is no string less than 24 bytes long that might be compressed with gzip
            try {
                TData gzipped(data.size());
                TMemoryOutput out(gzipped.data(), gzipped.size());
                TZLibCompress c(&out, ZLib::GZip);
                c.Write(data.data(), data.size());
                c.Finish();
                gzipped.resize(out.Buf() - gzipped.data());
                data.swap(gzipped);
                return true;
            } catch (yexception&) {
                // gzipped data occupies more space than original data
            }
        }
        return false;
    }

    class THttpRequestBuffers: public NAsio::TTcpSocket::IBuffers {
    public:
        THttpRequestBuffers(TRequestData::TPtr rd)
            : Req_(rd)
            , Parts_(Req_->Parts())
            , IOvec_(Parts_.data(), Parts_.size())
        {
        }

        TContIOVector* GetIOvec() override {
            return &IOvec_;
        }

    private:
        TRequestData::TPtr Req_;
        TVector<IOutputStream::TPart> Parts_;
        TContIOVector IOvec_;
    };

    struct TRequestGet1: public TRequestGet {
        static inline TStringBuf Name() noexcept {
            return TStringBuf("http");
        }
    };

    struct TRequestPost1: public TRequestPost {
        static inline TStringBuf Name() noexcept {
            return TStringBuf("post");
        }
    };

    struct TRequestFull1: public TRequestFull {
        static inline TStringBuf Name() noexcept {
            return TStringBuf("full");
        }
    };

    struct TRequestGet2: public TRequestGet {
        static inline TStringBuf Name() noexcept {
            return TStringBuf("http2");
        }
    };

    struct TRequestPost2: public TRequestPost {
        static inline TStringBuf Name() noexcept {
            return TStringBuf("post2");
        }
    };

    struct TRequestFull2: public TRequestFull {
        static inline TStringBuf Name() noexcept {
            return TStringBuf("full2");
        }
    };

    struct TRequestUnixSocketGet: public TRequestGet {
        static inline TStringBuf Name() noexcept {
            return TStringBuf("http+unix");
        }

        static TRequestSettings RequestSettings() {
            return TRequestSettings()
                .SetNoDelay(false)
                .SetResolverType(EResolverType::EUNIXSOCKET);
        }
    };

    struct TRequestUnixSocketPost: public TRequestPost {
        static inline TStringBuf Name() noexcept {
            return TStringBuf("post+unix");
        }

        static TRequestSettings RequestSettings() {
            return TRequestSettings()
                .SetNoDelay(false)
                .SetResolverType(EResolverType::EUNIXSOCKET);
        }
    };

    struct TRequestUnixSocketFull: public TRequestFull {
        static inline TStringBuf Name() noexcept {
            return TStringBuf("full+unix");
        }

        static TRequestSettings RequestSettings() {
            return TRequestSettings()
                .SetNoDelay(false)
                .SetResolverType(EResolverType::EUNIXSOCKET);
        }
    };

    typedef TAutoPtr<THttpRequestBuffers> THttpRequestBuffersPtr;

    class THttpRequest;
    typedef TSharedPtrB<THttpRequest> THttpRequestRef;

    class THttpConn;
    typedef TIntrusivePtr<THttpConn> THttpConnRef;

    typedef std::function<TRequestData::TPtr(const TMessage&, const TParsedLocation&)> TRequestBuilder;

    class THttpRequest {
    public:
        class THandle: public TSimpleHandle {
        public:
            THandle(IOnRecv* f, const TMessage& msg, TStatCollector* s) noexcept
                : TSimpleHandle(f, msg, s)
            {
            }

            bool MessageSendedCompletely() const noexcept override {
                if (TSimpleHandle::MessageSendedCompletely()) {
                    return true;
                }

                THttpRequestRef req(GetRequest());
                if (!!req && req->RequestSendedCompletely()) {
                    const_cast<THandle*>(this)->SetSendComplete();
                }

                return TSimpleHandle::MessageSendedCompletely();
            }

            void Cancel() noexcept override {
                if (TSimpleHandle::Canceled()) {
                    return;
                }

                THttpRequestRef req(GetRequest());
                if (!!req) {
                    TSimpleHandle::Cancel();
                    req->Cancel();
                }
            }

            void NotifyError(TErrorRef error, const THttpParser* rsp = nullptr) {
#ifdef DEBUG_STAT
                ++TDebugStat::RequestFailed;
#endif
                if (rsp) {
                    TSimpleHandle::NotifyError(error, rsp->DecodedContent(), rsp->FirstLine(), rsp->Headers());
                } else {
                    TSimpleHandle::NotifyError(error);
                }

                ReleaseRequest();
            }

            //not thread safe!
            void SetRequest(const TWeakPtrB<THttpRequest>& r) noexcept {
                Req_ = r;
            }

        private:
            THttpRequestRef GetRequest() const noexcept {
                TGuard<TSpinLock> g(SP_);
                return Req_;
            }

            void ReleaseRequest() noexcept {
                TWeakPtrB<THttpRequest> tmp;
                TGuard<TSpinLock> g(SP_);
                tmp.Swap(Req_);
            }

            mutable TSpinLock SP_;
            TWeakPtrB<THttpRequest> Req_;
        };

        typedef TIntrusivePtr<THandle> THandleRef;

        static void Run(THandleRef& h, const TMessage& msg, TRequestBuilder f, const TRequestSettings& s) {
            THttpRequestRef req(new THttpRequest(h, msg, f, s));
            req->WeakThis_ = req;
            h->SetRequest(req->WeakThis_);
            req->Run(req);
        }

        ~THttpRequest() {
            DBGOUT("~THttpRequest()");
        }

    private:
        THttpRequest(THandleRef& h, TMessage msg, TRequestBuilder f, const TRequestSettings& s)
            : Hndl_(h)
            , RequestBuilder_(f)
            , RequestSettings_(s)
            , Msg_(std::move(msg))
            , Loc_(Msg_.Addr)
            , Addr_(Resolve(Loc_.Host, Loc_.GetPort(), RequestSettings_.ResolverType))
            , AddrIter_(Addr_->Addr.Begin())
            , Canceled_(false)
            , RequestSendedCompletely_(false)
        {
        }

        void Run(THttpRequestRef& req);

    public:
        THttpRequestBuffersPtr BuildRequest() {
            return new THttpRequestBuffers(RequestBuilder_(Msg_, Loc_));
        }

        TRequestSettings RequestSettings() {
            return RequestSettings_;
        }

        //can create a spare socket in an attempt to decrease connecting time
        void OnDetectSlowConnecting();

        //remove extra connection on success connec
        void OnConnect(THttpConn* c);

        //have some response input
        void OnBeginRead() noexcept {
            RequestSendedCompletely_ = true;
        }

        void OnResponse(TAutoPtr<THttpParser>& rsp);

        void OnConnectFailed(THttpConn* c, const TErrorCode& ec);
        void OnSystemError(THttpConn* c, const TErrorCode& ec);
        void OnError(THttpConn* c, const TString& errorText);

        bool RequestSendedCompletely() noexcept;

        void Cancel() noexcept;

    private:
        void NotifyResponse(const TString& resp, const TString& firstLine, const THttpHeaders& headers) {
            THandleRef h(ReleaseHandler());
            if (!!h) {
                h->NotifyResponse(resp, firstLine, headers);
            }
        }

        void NotifyError(
            const TString& errorText,
            TError::TType errorType = TError::UnknownType,
            i32 errorCode = 0, i32 systemErrorCode = 0) {
            NotifyError(new TError(errorText, errorType, errorCode, systemErrorCode));
        }

        void NotifyError(TErrorRef error, const THttpParser* rsp = nullptr) {
            THandleRef h(ReleaseHandler());
            if (!!h) {
                h->NotifyError(error, rsp);
            }
        }

        void Finalize(THttpConn* skipConn = nullptr) noexcept;

        inline THandleRef ReleaseHandler() noexcept {
            THandleRef h;
            {
                TGuard<TSpinLock> g(SL_);
                h.Swap(Hndl_);
            }
            return h;
        }

        inline THttpConnRef GetConn() noexcept {
            TGuard<TSpinLock> g(SL_);
            return Conn_;
        }

        inline THttpConnRef ReleaseConn() noexcept {
            THttpConnRef c;
            {
                TGuard<TSpinLock> g(SL_);
                c.Swap(Conn_);
            }
            return c;
        }

        inline THttpConnRef ReleaseConn2() noexcept {
            THttpConnRef c;
            {
                TGuard<TSpinLock> g(SL_);
                c.Swap(Conn2_);
            }
            return c;
        }

        TSpinLock SL_; //guaranted calling notify() only once (prevent race between asio thread and current)
        THandleRef Hndl_;
        TRequestBuilder RequestBuilder_;
        TRequestSettings RequestSettings_;
        const TMessage Msg_;
        const TParsedLocation Loc_;
        const TResolvedHost* Addr_;
        TNetworkAddress::TIterator AddrIter_;
        THttpConnRef Conn_;
        THttpConnRef Conn2_; //concurrent connection used, if detected slow connecting on first connection
        TWeakPtrB<THttpRequest> WeakThis_;
        TAtomicBool Canceled_;
        TAtomicBool RequestSendedCompletely_;
    };

    TAtomicCounter* HttpOutConnCounter();

    class THttpConn: public TThrRefBase {
    public:
        static THttpConnRef Create(TIOService& srv);

        ~THttpConn() override {
            DBGOUT("~THttpConn()");
            Req_.Reset();
            HttpOutConnCounter()->Dec();
#ifdef DEBUG_STAT
            ++TDebugStat::ConnDestroyed;
#endif
        }

        void StartRequest(THttpRequestRef req, const TEndpoint& ep, size_t addrId, TDuration slowConn) {
            {
                //thread safe linking connection->request
                TGuard<TSpinLock> g(SL_);
                Req_ = req;
            }
            AddrId_ = addrId;
            try {
                TDuration connectDeadline(THttp2Options::ConnectTimeout);
                if (THttp2Options::ConnectTimeout > slowConn) {
                    //use append non fatal connect deadline, so on first timedout
                    //report about slow connecting to THttpRequest, and continue wait ConnectDeadline_ period
                    connectDeadline = slowConn;
                    ConnectDeadline_ = THttp2Options::ConnectTimeout - slowConn;
                }
                DBGOUT("AsyncConnect to " << ep.IpToString());
                AS_.AsyncConnect(ep, std::bind(&THttpConn::OnConnect, THttpConnRef(this), _1, _2), connectDeadline);
            } catch (...) {
                ReleaseRequest();
                throw;
            }
        }

        //start next request on keep-alive connection
        bool StartNextRequest(THttpRequestRef& req) {
            if (Finalized_) {
                return false;
            }

            {
                //thread safe linking connection->request
                TGuard<TSpinLock> g(SL_);
                Req_ = req;
            }

            RequestWritten_ = false;
            BeginReadResponse_ = false;

            try {
                TErrorCode ec;
                SendRequest(req->BuildRequest(), ec); //throw std::bad_alloc
                if (ec.Value() == ECANCELED) {
                    OnCancel();
                } else if (ec) {
                    OnError(ec);
                }
            } catch (...) {
                OnError(CurrentExceptionMessage());
                throw;
            }
            return true;
        }

        //connection received from cache must be validated before using
        //(process removing closed conection from cache consume some time)
        inline bool IsValid() const noexcept {
            return !Finalized_;
        }

        void SetCached(bool v) noexcept {
            Cached_ = v;
        }

        void Close() noexcept {
            try {
                Cancel();
            } catch (...) {
            }
        }

        void DetachRequest() noexcept {
            ReleaseRequest();
        }

        void Cancel() { //throw std::bad_alloc
            if (!Canceled_) {
                Canceled_ = true;
                Finalized_ = true;
                OnCancel();
                AS_.AsyncCancel();
            }
        }

        void OnCancel() {
            THttpRequestRef r(ReleaseRequest());
            if (!!r) {
                static const TString reqCanceled("request canceled");
                r->OnError(this, reqCanceled);
            }
        }

        bool RequestSendedCompletely() const noexcept {
            DBGOUT("RequestSendedCompletely()");
            if (!Connected_ || !RequestWritten_) {
                return false;
            }
            if (BeginReadResponse_) {
                return true;
            }
#if defined(FIONWRITE)
            if (THttp2Options::EnsureSendingCompleteByAck) {
                int nbytes = Max<int>();
                int err = ioctl(AS_.Native(), FIONWRITE, &nbytes);
                return err ? false : nbytes == 0;
            }
#endif
            return true;
        }

        TIOService& GetIOService() const noexcept {
            return AS_.GetIOService();
        }

    private:
        THttpConn(TIOService& srv)
            : AddrId_(0)
            , AS_(srv)
            , BuffSize_(THttp2Options::InputBufferSize)
            , Connected_(false)
            , Cached_(false)
            , Canceled_(false)
            , Finalized_(false)
            , InAsyncRead_(false)
            , RequestWritten_(false)
            , BeginReadResponse_(false)
        {
            HttpOutConnCounter()->Inc();
        }

        //can be called only from asio
        void OnConnect(const TErrorCode& ec, IHandlingContext& ctx) {
            DBGOUT("THttpConn::OnConnect: " << ec.Value());
            if (Y_UNLIKELY(ec)) {
                if (ec.Value() == ETIMEDOUT && ConnectDeadline_.GetValue()) {
                    //detect slow connecting (yet not reached final timeout)
                    DBGOUT("OnConnectTimingCheck");
                    THttpRequestRef req(GetRequest());
                    if (!req) {
                        return; //cancel from client thread can ahead us
                    }
                    TDuration newDeadline(ConnectDeadline_);
                    ConnectDeadline_ = TDuration::Zero(); //next timeout is final

                    req->OnDetectSlowConnecting();
                    //continue wait connect
                    ctx.ContinueUseHandler(newDeadline);

                    return;
                }
#ifdef DEBUG_STAT
                if (ec.Value() != ECANCELED) {
                    ++TDebugStat::ConnFailed;
                } else {
                    ++TDebugStat::ConnConnCanceled;
                }
#endif
                if (ec.Value() == EIO) {
                    //try get more detail error info
                    char buf[1];
                    TErrorCode errConnect;
                    AS_.ReadSome(buf, 1, errConnect);
                    OnConnectFailed(errConnect.Value() ? errConnect : ec);
                } else if (ec.Value() == ECANCELED) {
                    // not try connecting to next host ip addr, simple fail
                    OnError(ec);
                } else {
                    OnConnectFailed(ec);
                }
            } else {
                Connected_ = true;

                THttpRequestRef req(GetRequest());
                if (!req || Canceled_) {
                    return;
                }

                try {
                    PrepareSocket(AS_.Native(), req->RequestSettings());
                    if (THttp2Options::TcpKeepAlive) {
                        SetKeepAlive(AS_.Native(), true);
                    }
                } catch (TSystemError& err) {
                    TErrorCode ec2(err.Status());
                    OnError(ec2);
                    return;
                }

                req->OnConnect(this);

                THttpRequestBuffersPtr ptr(req->BuildRequest());
                PrepareParser();

                TErrorCode ec3;
                SendRequest(ptr, ec3);
                if (ec3) {
                    OnError(ec3);
                }
            }
        }

        void PrepareParser() {
            Prs_ = new THttpParser();
            Prs_->Prepare();
        }

        void SendRequest(const THttpRequestBuffersPtr& bfs, TErrorCode& ec) { //throw std::bad_alloc
            if (!THttp2Options::UseAsyncSendRequest) {
                size_t amount = AS_.WriteSome(*bfs->GetIOvec(), ec);

                if (ec && ec.Value() != EAGAIN && ec.Value() != EWOULDBLOCK && ec.Value() != EINPROGRESS) {
                    return;
                }
                ec.Assign(0);

                bfs->GetIOvec()->Proceed(amount);

                if (bfs->GetIOvec()->Complete()) {
                    RequestWritten_ = true;
                    StartRead();
                } else {
                    SendRequestAsync(bfs);
                }
            } else {
                SendRequestAsync(bfs);
            }
        }

        void SendRequestAsync(const THttpRequestBuffersPtr& bfs) {
            NAsio::TTcpSocket::TSendedData sd(bfs.Release());
            AS_.AsyncWrite(sd, std::bind(&THttpConn::OnWrite, THttpConnRef(this), _1, _2, _3), THttp2Options::OutputDeadline);
        }

        void OnWrite(const TErrorCode& err, size_t amount, IHandlingContext& ctx) {
            Y_UNUSED(amount);
            Y_UNUSED(ctx);
            if (err) {
                OnError(err);
            } else {
                DBGOUT("OnWrite()");
                RequestWritten_ = true;
                StartRead();
            }
        }

        inline void StartRead() {
            if (!InAsyncRead_ && !Canceled_) {
                InAsyncRead_ = true;
                AS_.AsyncPollRead(std::bind(&THttpConn::OnCanRead, THttpConnRef(this), _1, _2), THttp2Options::InputDeadline);
            }
        }

        //can be called only from asio
        void OnReadSome(const TErrorCode& err, size_t bytes, IHandlingContext& ctx) {
            if (Y_UNLIKELY(err)) {
                OnError(err);
                return;
            }
            if (!BeginReadResponse_) {
                //used in MessageSendedCompletely()
                BeginReadResponse_ = true;
                THttpRequestRef r(GetRequest());
                if (!!r) {
                    r->OnBeginRead();
                }
            }
            DBGOUT("receive:" << TStringBuf(Buff_.Get(), bytes));
            try {
                if (!Prs_) {
                    throw yexception() << TStringBuf("receive some data while not in request");
                }

#if defined(_linux_)
                if (THttp2Options::QuickAck) {
                    SetSockOpt(AS_.Native(), SOL_TCP, TCP_QUICKACK, (int)1);
                }
#endif

                DBGOUT("parse:");
                while (!Prs_->Parse(Buff_.Get(), bytes)) {
                    if (BuffSize_ == bytes) {
                        TErrorCode ec;
                        bytes = AS_.ReadSome(Buff_.Get(), BuffSize_, ec);

                        if (!ec) {
                            continue;
                        }

                        if (ec.Value() != EAGAIN && ec.Value() != EWOULDBLOCK) {
                            OnError(ec);

                            return;
                        }
                    }
                    //continue async. read from socket
                    ctx.ContinueUseHandler(THttp2Options::InputDeadline);

                    return;
                }

                //succesfully reach end of http response
                THttpRequestRef r(ReleaseRequest());
                if (!r) {
                    //lost race to req. canceling
                    DBGOUT("connection failed");
                    return;
                }

                DBGOUT("response:");
                bool keepALive = Prs_->IsKeepAlive();

                r->OnResponse(Prs_);

                if (!keepALive) {
                    return;
                }

                //continue use connection (keep-alive mode)
                PrepareParser();

                if (!THttp2Options::KeepInputBufferForCachedConnections) {
                    Buff_.Destroy();
                }
                //continue async. read from socket
                ctx.ContinueUseHandler(THttp2Options::InputDeadline);

                PutSelfToCache();
            } catch (...) {
                OnError(CurrentExceptionMessage());
            }
        }

        void PutSelfToCache();

        //method for reaction on input data for re-used keep-alive connection,
        //which free/release buffer when was placed in cache
        void OnCanRead(const TErrorCode& err, IHandlingContext& ctx) {
            if (Y_UNLIKELY(err)) {
                OnError(err);
            } else {
                if (!Buff_) {
                    Buff_.Reset(new char[BuffSize_]);
                }
                TErrorCode ec;
                OnReadSome(ec, AS_.ReadSome(Buff_.Get(), BuffSize_, ec), ctx);
            }
        }

        //unlink connection and request, thread-safe mark connection as non valid
        inline THttpRequestRef GetRequest() noexcept {
            TGuard<TSpinLock> g(SL_);
            return Req_;
        }

        inline THttpRequestRef ReleaseRequest() noexcept {
            THttpRequestRef r;
            {
                TGuard<TSpinLock> g(SL_);
                r.Swap(Req_);
            }
            return r;
        }

        void OnConnectFailed(const TErrorCode& ec);

        inline void OnError(const TErrorCode& ec) {
            OnError(ec.Text());
        }

        inline void OnError(const TString& errText);

        size_t AddrId_;
        NAsio::TTcpSocket AS_;
        TArrayHolder<char> Buff_; //input buffer
        const size_t BuffSize_;

        TAutoPtr<THttpParser> Prs_; //input data parser & parsed info storage

        TSpinLock SL_;
        THttpRequestRef Req_; //current request
        TDuration ConnectDeadline_;
        TAtomicBool Connected_;
        TAtomicBool Cached_;
        TAtomicBool Canceled_;
        TAtomicBool Finalized_;

        bool InAsyncRead_;
        TAtomicBool RequestWritten_;
        TAtomicBool BeginReadResponse_;
    };

    //conn limits monitoring, cache clean, contain used in http clients asio threads/executors
    class THttpConnManager: public IThreadFactory::IThreadAble {
    public:
        THttpConnManager()
            : TotalConn(0)
            , EP_(THttp2Options::AsioThreads)
            , InPurging_(0)
            , MaxConnId_(0)
            , Shutdown_(false)
        {
            T_ = SystemThreadFactory()->Run(this);
            Limits.SetSoft(40000);
            Limits.SetHard(50000);
        }

        ~THttpConnManager() override {
            {
                TGuard<TMutex> g(PurgeMutex_);

                Shutdown_ = true;
                CondPurge_.Signal();
            }

            EP_.SyncShutdown();

            T_->Join();
        }

        inline void SetLimits(size_t softLimit, size_t hardLimit) noexcept {
            Limits.SetSoft(softLimit);
            Limits.SetHard(hardLimit);
        }

        inline std::pair<size_t, size_t> GetLimits() const noexcept {
            return {Limits.Soft(), Limits.Hard()};
        }

        inline void CheckLimits() {
            if (ExceedSoftLimit()) {
                SuggestPurgeCache();

                if (ExceedHardLimit()) {
                    Y_ABORT("neh::http2 output connections limit reached");
                    //ythrow yexception() << "neh::http2 output connections limit reached";
                }
            }
        }

        inline bool Get(THttpConnRef& conn, size_t addrId) {
#ifdef DEBUG_STAT
            TDebugStat::ConnTotal.store(TotalConn.Val(), std::memory_order_release);
            TDebugStat::ConnActive.store(Active(), std::memory_order_release);
            TDebugStat::ConnCached.store(Cache_.Size(), std::memory_order_release);
#endif
            return Cache_.Get(conn, addrId);
        }

        inline void Put(THttpConnRef& conn, size_t addrId) {
            if (Y_LIKELY(!Shutdown_ && !ExceedHardLimit() && !CacheDisabled())) {
                if (Y_UNLIKELY(addrId > (size_t)AtomicGet(MaxConnId_))) {
                    AtomicSet(MaxConnId_, addrId);
                }
                Cache_.Put(conn, addrId);
            } else {
                conn->Close();
                conn.Drop();
            }
        }

        inline size_t OnConnError(size_t addrId) {
            return Cache_.Validate(addrId);
        }

        TIOService& GetIOService() {
            return EP_.GetExecutor().GetIOService();
        }

        bool CacheDisabled() const {
            return Limits.Soft() == 0;
        }

        bool IsShutdown() const noexcept {
            return Shutdown_;
        }

        TAtomicCounter TotalConn;

    private:
        inline size_t Total() const noexcept {
            return TotalConn.Val();
        }

        inline size_t Active() const noexcept {
            return TFdLimits::ExceedLimit(Total(), Cache_.Size());
        }

        inline size_t ExceedSoftLimit() const noexcept {
            return TFdLimits::ExceedLimit(Total(), Limits.Soft());
        }

        inline size_t ExceedHardLimit() const noexcept {
            return TFdLimits::ExceedLimit(Total(), Limits.Hard());
        }

        void SuggestPurgeCache() {
            if (AtomicTryLock(&InPurging_)) {
                //evaluate the usefulness of purging the cache
                //если в кеше мало соединений (< MaxConnId_/16 или 64), не чистим кеш
                if (Cache_.Size() > (Min((size_t)AtomicGet(MaxConnId_), (size_t)1024U) >> 4)) {
                    //по мере приближения к hardlimit нужда в чистке cache приближается к 100%
                    size_t closenessToHardLimit256 = ((Active() + 1) << 8) / (Limits.Delta() + 1);
                    //чем больше соединений в кеше, а не в работе, тем менее нужен кеш (можно его почистить)
                    size_t cacheUselessness256 = ((Cache_.Size() + 1) << 8) / (Active() + 1);

                    //итого, - пороги срабатывания:
                    //при достижении soft-limit, если соединения в кеше, а не в работе
                    //на полпути от soft-limit к hard-limit, если в кеше больше половины соединений
                    //при приближении к hardlimit пытаться почистить кеш почти постоянно
                    if ((closenessToHardLimit256 + cacheUselessness256) >= 256U) {
                        TGuard<TMutex> g(PurgeMutex_);

                        CondPurge_.Signal();
                        return; //memo: thread MUST unlock InPurging_ (see DoExecute())
                    }
                }
                AtomicUnlock(&InPurging_);
            }
        }

        void DoExecute() override {
            TThread::SetCurrentThreadName("NehHttpConnMngr");
            while (true) {
                {
                    TGuard<TMutex> g(PurgeMutex_);

                    if (Shutdown_)
                        return;

                    CondPurge_.WaitI(PurgeMutex_);
                }

                PurgeCache();

                AtomicUnlock(&InPurging_);
            }
        }

        void PurgeCache() noexcept {
            //try remove at least ExceedSoftLimit() oldest connections from cache
            //вычисляем долю кеша, которую нужно почистить (в 256 долях) (но не менее 1/32 кеша)
            size_t frac256 = Min(size_t(Max(size_t(256U / 32U), (ExceedSoftLimit() << 8) / (Cache_.Size() + 1))), (size_t)256U);

            size_t processed = 0;
            size_t maxConnId = AtomicGet(MaxConnId_);
            for (size_t i = 0; i <= maxConnId && !Shutdown_; ++i) {
                processed += Cache_.Purge(i, frac256);
                if (processed > 32) {
#ifdef DEBUG_STAT
                    TDebugStat::ConnPurgedInCache += processed;
#endif
                    processed = 0;
                    Sleep(TDuration::MilliSeconds(10)); //prevent big spike cpu/system usage
                }
            }
        }

        TFdLimits Limits;
        TExecutorsPool EP_;

        TConnCache<THttpConn> Cache_;
        TAtomic InPurging_;
        TAtomic MaxConnId_;

        TAutoPtr<IThreadFactory::IThread> T_;
        TCondVar CondPurge_;
        TMutex PurgeMutex_;
        TAtomicBool Shutdown_;
    };

    THttpConnManager* HttpConnManager() {
        return Singleton<THttpConnManager>();
    }

    TAtomicCounter* HttpOutConnCounter() {
        return &HttpConnManager()->TotalConn;
    }

    THttpConnRef THttpConn::Create(TIOService& srv) {
        if (HttpConnManager()->IsShutdown()) {
            throw yexception() << "can't create connection with shutdowned service";
        }

        return new THttpConn(srv);
    }

    void THttpConn::PutSelfToCache() {
        THttpConnRef c(this);
        HttpConnManager()->Put(c, AddrId_);
    }

    void THttpConn::OnConnectFailed(const TErrorCode& ec) {
        THttpRequestRef r(GetRequest());
        if (!!r) {
            r->OnConnectFailed(this, ec);
        }
        OnError(ec);
    }

    void THttpConn::OnError(const TString& errText) {
        Finalized_ = true;
        if (Connected_) {
            Connected_ = false;
            TErrorCode ec;
            AS_.Shutdown(NAsio::TTcpSocket::ShutdownBoth, ec);
        } else {
            if (AS_.IsOpen()) {
                AS_.AsyncCancel();
            }
        }
        THttpRequestRef r(ReleaseRequest());
        if (!!r) {
            r->OnError(this, errText);
        } else {
            if (Cached_) {
                size_t res = HttpConnManager()->OnConnError(AddrId_);
                Y_UNUSED(res);
#ifdef DEBUG_STAT
                TDebugStat::ConnDestroyedInCache += res;
#endif
            }
        }
    }

    void THttpRequest::Run(THttpRequestRef& req) {
#ifdef DEBUG_STAT
        if ((++TDebugStat::RequestTotal & 0xFFF) == 0) {
            TDebugStat::Print();
        }
#endif
        try {
            while (!Canceled_) {
                THttpConnRef conn;
                if (HttpConnManager()->Get(conn, Addr_->Id)) {
                    DBGOUT("Use connection from cache");
                    Conn_ = conn; //thread magic
                    if (!conn->StartNextRequest(req)) {
                        continue; //if use connection from cache, ignore write error and try another conn
                    }
                } else {
                    HttpConnManager()->CheckLimits(); //here throw exception if reach hard limit (or atexit() state)
                    Conn_ = THttpConn::Create(HttpConnManager()->GetIOService());
                    TEndpoint ep(new NAddr::TAddrInfo(&*AddrIter_));
                    Conn_->StartRequest(req, ep, Addr_->Id, THttp2Options::SymptomSlowConnect); // can throw
                }
                break;
            }
        } catch (...) {
            Conn_.Reset();
            throw;
        }
    }

    //it seems we have lost TCP SYN packet, create extra connection for decrease response time
    void THttpRequest::OnDetectSlowConnecting() {
#ifdef DEBUG_STAT
        ++TDebugStat::ConnSlow;
#endif
        //use some io_service (Run() thread-executor), from first conn. for more thread safety
        THttpConnRef conn = GetConn();

        if (!conn) {
            return;
        }

        THttpConnRef conn2;
        try {
            conn2 = THttpConn::Create(conn->GetIOService());
        } catch (...) {
            return; // cant create spare connection, simple continue use only main
        }

        {
            TGuard<TSpinLock> g(SL_);
            Conn2_ = conn2;
        }

        if (Y_UNLIKELY(Canceled_)) {
            ReleaseConn2();
        } else {
            //use connect timeout for disable detecting slow connecting on second conn.
            TEndpoint ep(new NAddr::TAddrInfo(&*Addr_->Addr.Begin()));
            try {
                conn2->StartRequest(WeakThis_, ep, Addr_->Id, THttp2Options::ConnectTimeout);
            } catch (...) {
                // ignore errors on spare connection
                ReleaseConn2();
            }
        }
    }

    void THttpRequest::OnConnect(THttpConn* c) {
        THttpConnRef extraConn;
        {
            TGuard<TSpinLock> g(SL_);
            if (Y_UNLIKELY(!!Conn2_)) {
                //has pair concurrent conn, 'should stay only one'
                if (Conn2_.Get() == c) {
#ifdef DEBUG_STAT
                    ++TDebugStat::Conn2Success;
#endif
                    Conn2_.Swap(Conn_);
                }
                extraConn.Swap(Conn2_);
            }
        }
        if (!!extraConn) {
            extraConn->DetachRequest(); //prevent call OnError()
            extraConn->Close();
        }
    }

    void THttpRequest::OnResponse(TAutoPtr<THttpParser>& rsp) {
        DBGOUT("THttpRequest::OnResponse()");
        ReleaseConn();
        if (Y_LIKELY(((rsp->RetCode() >= 200 && rsp->RetCode() < (!THttp2Options::RedirectionNotError ? 300 : 400)) || THttp2Options::AnyResponseIsNotError))) {
            NotifyResponse(rsp->DecodedContent(), rsp->FirstLine(), rsp->Headers());
        } else {
            TString message;

            if (THttp2Options::FullHeadersAsErrorMessage) {
                TStringStream err;
                err << rsp->FirstLine();

                THttpHeaders hdrs = rsp->Headers();
                for (auto h = hdrs.begin(); h < hdrs.end(); h++) {
                    err << h->ToString() << TStringBuf("\r\n");
                }

                message = err.Str();
            } else if (THttp2Options::UseResponseAsErrorMessage) {
                message = rsp->DecodedContent();
            } else {
                TStringStream err;
                err << TStringBuf("request failed(") << rsp->FirstLine() << TStringBuf(")");
                message = err.Str();
            }

            NotifyError(new TError(message, TError::ProtocolSpecific, rsp->RetCode()), rsp.Get());
        }
    }

    void THttpRequest::OnConnectFailed(THttpConn* c, const TErrorCode& ec) {
        DBGOUT("THttpRequest::OnConnectFailed()");
        //detach/discard failed conn, try connect to next ip addr (if can)
        THttpConnRef cc(GetConn());
        if (c != cc.Get() || AddrIter_ == Addr_->Addr.End() || ++AddrIter_ == Addr_->Addr.End() || Canceled_) {
            return OnSystemError(c, ec);
        }
        // can try next host addr
        c->DetachRequest();
        c->Close();
        THttpConnRef nextConn;
        try {
            nextConn = THttpConn::Create(HttpConnManager()->GetIOService());
        } catch (...) {
            OnSystemError(nullptr, ec);
            return;
        }
        {
            THttpConnRef nc = nextConn;
            TGuard<TSpinLock> g(SL_);
            Conn_.Swap(nc);
        }
        TEndpoint ep(new NAddr::TAddrInfo(&*AddrIter_));
        try {
            nextConn->StartRequest(WeakThis_, ep, Addr_->Id, THttp2Options::SymptomSlowConnect);
        } catch (...) {
            OnError(nullptr, CurrentExceptionMessage());
            return;
        }

        if (Canceled_) {
            OnError(nullptr, "canceled");
        }
    }

    void THttpRequest::OnSystemError(THttpConn* c, const TErrorCode& ec) {
        DBGOUT("THttpRequest::OnSystemError()");
        NotifyError(ec.Text(), TError::TType::UnknownType, 0, ec.Value());
        Finalize(c);
    }

    void THttpRequest::OnError(THttpConn* c, const TString& errorText) {
        DBGOUT("THttpRequest::OnError()");
        NotifyError(errorText);
        Finalize(c);
    }

    bool THttpRequest::RequestSendedCompletely() noexcept {
        if (RequestSendedCompletely_) {
            return true;
        }

        THttpConnRef c(GetConn());
        return !!c ? c->RequestSendedCompletely() : false;
    }

    void THttpRequest::Cancel() noexcept {
        if (!Canceled_) {
            Canceled_ = true;
            try {
                static const TString canceled("Canceled");
                NotifyError(canceled, TError::Cancelled);
                Finalize();
            } catch (...) {
            }
        }
    }

    inline void FinalizeConn(THttpConnRef& c, THttpConn* skipConn) noexcept {
        if (!!c && c.Get() != skipConn) {
            c->DetachRequest();
            c->Close();
        }
    }

    void THttpRequest::Finalize(THttpConn* skipConn) noexcept {
        THttpConnRef c1(ReleaseConn());
        FinalizeConn(c1, skipConn);
        THttpConnRef c2(ReleaseConn2());
        FinalizeConn(c2, skipConn);
    }

    /////////////////////////////////// server side ////////////////////////////////////

    TAtomicCounter* HttpInConnCounter() {
        return Singleton<TAtomicCounter>();
    }

    TFdLimits* HttpInConnLimits() {
        return Singleton<TFdLimits>();
    }

    class THttpServer: public IRequester {
        typedef TAutoPtr<TTcpAcceptor> TTcpAcceptorPtr;
        typedef TAtomicSharedPtr<TTcpSocket> TTcpSocketRef;
        class TConn;
        typedef TSharedPtrB<TConn> TConnRef;

        class TRequest: public IHttpRequest {
        public:
            TRequest(TWeakPtrB<TConn>& c, TAutoPtr<THttpParser>& p)
                : C_(c)
                , P_(p)
                , RemoteHost_(C_->RemoteHost())
                , CompressionScheme_(P_->GetBestCompressionScheme())
                , H_(TStringBuf(P_->FirstLine()))
            {
            }

            ~TRequest() override {
                if (!!C_) {
                    try {
                        C_->SendError(Id(), 503, "service unavailable (request ignored)", P_->HttpVersion(), {});
                    } catch (...) {
                        DBGOUT("~TRequest()::SendFail() exception");
                    }
                }
            }

            TAtomicBase Id() const {
                return Id_;
            }

        protected:
            TStringBuf Scheme() const override {
                return TStringBuf("http");
            }

            TString RemoteHost() const override {
                return RemoteHost_;
            }

            TStringBuf Service() const override {
                return TStringBuf(H_.Path).Skip(1);
            }

            const THttpHeaders& Headers() const override {
                return P_->Headers();
            }

            TStringBuf Method() const override {
                return H_.Method;
            }

            TStringBuf Body() const override {
                return P_->DecodedContent();
            }

            TStringBuf Cgi() const override {
                return H_.Cgi;
            }

            TStringBuf RequestId() const override {
                return TStringBuf();
            }

            bool Canceled() const override {
                if (!C_) {
                    return false;
                }
                return C_->IsCanceled();
            }

            void SendReply(TData& data) override {
                SendReply(data, TString(), HttpCodes::HTTP_OK);
            }

            void SendReply(TData& data, const TString& headers, int httpCode) override {
                if (!!C_) {
                    C_->Send(Id(), data, CompressionScheme_, P_->HttpVersion(), headers, httpCode);
                    C_.Reset();
                }
            }

            void SendError(TResponseError err, const THttpErrorDetails& details) override {
                static const unsigned errorToHttpCode[IRequest::MaxResponseError] =
                    {
                        400,
                        403,
                        404,
                        429,
                        500,
                        501,
                        502,
                        503,
                        509};

                if (!!C_) {
                    C_->SendError(Id(), errorToHttpCode[err], details.Details, P_->HttpVersion(), details.Headers);
                    C_.Reset();
                }
            }

            static TAtomicBase NextId() {
                static TAtomic idGenerator = 0;
                TAtomicBase id = 0;
                do {
                    id = AtomicIncrement(idGenerator);
                } while (!id);
                return id;
            }

            TSharedPtrB<TConn> C_;
            TAutoPtr<THttpParser> P_;
            TString RemoteHost_;
            TString CompressionScheme_;
            TParsedHttpFull H_;
            TAtomicBase Id_ = NextId();
        };

        class TRequestGet: public TRequest {
        public:
            TRequestGet(TWeakPtrB<TConn>& c, TAutoPtr<THttpParser> p)
                : TRequest(c, p)
            {
            }

            TStringBuf Data() const override {
                return H_.Cgi;
            }
        };

        class TRequestPost: public TRequest {
        public:
            TRequestPost(TWeakPtrB<TConn>& c, TAutoPtr<THttpParser> p)
                : TRequest(c, p)
            {
            }

            TStringBuf Data() const override {
                return P_->DecodedContent();
            }
        };

        class TConn {
        private:
            TConn(THttpServer& hs, const TTcpSocketRef& s)
                : HS_(hs)
                , AS_(s)
                , RemoteHost_(NNeh::PrintHostByRfc(*AS_->RemoteEndpoint().Addr()))
                , BuffSize_(THttp2Options::InputBufferSize)
                , Buff_(new char[BuffSize_])
                , Canceled_(false)
                , LeftRequestsToDisconnect_(hs.LimitRequestsPerConnection)
            {
                DBGOUT("THttpServer::TConn()");
                HS_.OnCreateConn();
            }

            inline TConnRef SelfRef() noexcept {
                return WeakThis_;
            }

        public:
            static void Create(THttpServer& hs, const TTcpSocketRef& s) {
                TSharedPtrB<TConn> conn(new TConn(hs, s));
                conn->WeakThis_ = conn;
                conn->ExpectNewRequest();
                conn->AS_->AsyncPollRead(std::bind(&TConn::OnCanRead, conn, _1, _2), THttp2Options::ServerInputDeadline);
            }

            ~TConn() {
                DBGOUT("~THttpServer::TConn(" << (!AS_ ? -666 : AS_->Native()) << ")");
                HS_.OnDestroyConn();
            }

        private:
            void ExpectNewRequest() {
                P_.Reset(new THttpParser(THttpParser::Request));
                P_->Prepare();
            }

            void OnCanRead(const TErrorCode& ec, IHandlingContext& ctx) {
                if (ec) {
                    OnError();
                } else {
                    TErrorCode ec2;
                    OnReadSome(ec2, AS_->ReadSome(Buff_.Get(), BuffSize_, ec2), ctx);
                }
            }

            void OnError() {
                DBGOUT("Srv OnError(" << (!AS_ ? -666 : AS_->Native()) << ")");
                Canceled_ = true;
                AS_->AsyncCancel();
            }

            void OnReadSome(const TErrorCode& ec, size_t amount, IHandlingContext& ctx) {
                if (ec || !amount) {
                    OnError();

                    return;
                }

                DBGOUT("ReadSome(" << (!AS_ ? -666 : AS_->Native()) << "): " << amount);
                try {
                    size_t buffPos = 0;
                    //DBGOUT("receive and parse: " << TStringBuf(Buff_.Get(), amount));
                    while (P_->Parse(Buff_.Get() + buffPos, amount - buffPos)) {
                        if (!P_->IsKeepAlive() || LeftRequestsToDisconnect_ == 1) {
                            SeenMessageWithoutKeepalive_ = true;
                        }

                        char rt = *P_->FirstLine().data();
                        const size_t extraDataSize = P_->GetExtraDataSize();
                        if (rt == 'P' || rt == 'p') {
                            OnRequest(new TRequestPost(WeakThis_, P_));
                        } else {
                            OnRequest(new TRequestGet(WeakThis_, P_));
                        }
                        if (extraDataSize) {
                            // has http pipelining
                            buffPos = amount - extraDataSize;
                            ExpectNewRequest();
                        } else {
                            ExpectNewRequest();
                            ctx.ContinueUseHandler(HS_.GetKeepAliveTimeout());
                            return;
                        }
                    }
                    ctx.ContinueUseHandler(THttp2Options::ServerInputDeadline);
                } catch (...) {
                    OnError();
                }
            }

            void OnRequest(TRequest* r) {
                DBGOUT("OnRequest()");
                if (AtomicGet(PrimaryResponse_)) {
                    // has pipelining
                    PipelineOrder_.Enqueue(r->Id());
                } else {
                    AtomicSet(PrimaryResponse_, r->Id());
                }
                HS_.OnRequest(r);
                OnRequestDone();
            }

            void OnRequestDone() {
                DBGOUT("OnRequestDone()");
                if (LeftRequestsToDisconnect_ > 0) {
                    --LeftRequestsToDisconnect_;
                }
            }

            static void PrintHttpVersion(IOutputStream& out, const THttpVersion& ver) {
                out << TStringBuf("HTTP/") << ver.Major << TStringBuf(".") << ver.Minor;
            }

            struct TResponseData : TThrRefBase {
                TResponseData(size_t reqId, TTcpSocket::TSendedData data)
                    : RequestId_(reqId)
                    , Data_(data)
                {
                }

                size_t RequestId_;
                TTcpSocket::TSendedData Data_;
            };
            typedef TIntrusivePtr<TResponseData> TResponseDataRef;

        public:
            //called non thread-safe (from outside thread)
            void Send(TAtomicBase requestId, TData& data, const TString& compressionScheme, const THttpVersion& ver, const TString& headers, int httpCode) {
                class THttpResponseFormatter {
                public:
                    THttpResponseFormatter(TData& theData, const TString& contentEncoding, const THttpVersion& theVer, const TString& theHeaders, int theHttpCode, bool closeConnection) {
                        Header.Reserve(128 + contentEncoding.size() + theHeaders.size());
                        PrintHttpVersion(Header, theVer);
                        Header << TStringBuf(" ") << theHttpCode << ' ' << HttpCodeStr(theHttpCode);
                        if (Compress(theData, contentEncoding)) {
                            Header << TStringBuf("\r\nContent-Encoding: ") << contentEncoding;
                        }
                        Header << TStringBuf("\r\nContent-Length: ") << theData.size();
                        if (closeConnection) {
                            Header << TStringBuf("\r\nConnection: close");
                        } else if (Y_LIKELY(theVer.Major > 1 || theVer.Minor > 0)) {
                            // since HTTP/1.1 Keep-Alive is default behaviour
                            Header << TStringBuf("\r\nConnection: Keep-Alive");
                        }
                        if (theHeaders) {
                            Header << theHeaders;
                        }
                        Header << TStringBuf("\r\n\r\n");

                        Body.swap(theData);

                        Parts[0].buf = Header.Data();
                        Parts[0].len = Header.Size();
                        Parts[1].buf = Body.data();
                        Parts[1].len = Body.size();
                    }

                    TStringStream Header;
                    TData Body;
                    IOutputStream::TPart Parts[2];
                };

                class TBuffers: public THttpResponseFormatter, public TTcpSocket::IBuffers {
                public:
                    TBuffers(TData& theData, const TString& contentEncoding, const THttpVersion& theVer, const TString& theHeaders, int theHttpCode, bool closeConnection)
                        : THttpResponseFormatter(theData, contentEncoding, theVer, theHeaders, theHttpCode, closeConnection)
                        , IOVec(Parts, 2)
                    {
                    }

                    TContIOVector* GetIOvec() override {
                        return &IOVec;
                    }

                    TContIOVector IOVec;
                };

                TTcpSocket::TSendedData sd(new TBuffers(data, compressionScheme, ver, headers, httpCode, SeenMessageWithoutKeepalive_));
                SendData(requestId, sd);
            }

            //called non thread-safe (from outside thread)
            void SendError(TAtomicBase requestId, unsigned httpCode, const TString& descr, const THttpVersion& ver, const TString& headers) {
                if (Canceled_) {
                    return;
                }

                class THttpErrorResponseFormatter {
                public:
                    THttpErrorResponseFormatter(unsigned theHttpCode, const TString& theDescr, const THttpVersion& theVer, bool closeConnection, const TString& headers) {
                        PrintHttpVersion(Answer, theVer);
                        Answer << TStringBuf(" ") << theHttpCode << TStringBuf(" ");
                        if (theDescr.size() && !THttp2Options::ErrorDetailsAsResponseBody) {
                            // Reason-Phrase  = *<TEXT, excluding CR, LF>
                            // replace bad chars to '.'
                            TString reasonPhrase = theDescr;
                            for (TString::iterator it = reasonPhrase.begin(); it != reasonPhrase.end(); ++it) {
                                char& ch = *it;
                                if (ch == ' ') {
                                    continue;
                                }
                                if (((ch & 31) == ch) || static_cast<unsigned>(ch) == 127 || (static_cast<unsigned>(ch) & 0x80)) {
                                    //CTLs || DEL(127) || non ascii
                                    // (ch <= 32) || (ch >= 127)
                                    ch = '.';
                                }
                            }
                            Answer << reasonPhrase;
                        } else {
                            Answer << HttpCodeStr(static_cast<int>(theHttpCode));
                        }

                        if (closeConnection) {
                            Answer << TStringBuf("\r\nConnection: close");
                        }

                        if (headers) {
                            Answer << "\r\n" << headers;
                        }

                        if (THttp2Options::ErrorDetailsAsResponseBody) {
                            Answer << TStringBuf("\r\nContent-Length:") << theDescr.size() << "\r\n\r\n" << theDescr;
                        } else {
                            Answer << "\r\n"
                                      "Content-Length:0\r\n\r\n"sv;
                        }

                        Parts[0].buf = Answer.Data();
                        Parts[0].len = Answer.Size();
                    }

                    TStringStream Answer;
                    IOutputStream::TPart Parts[1];
                };

                class TBuffers: public THttpErrorResponseFormatter, public TTcpSocket::IBuffers {
                public:
                    TBuffers(
                        unsigned theHttpCode,
                        const TString& theDescr,
                        const THttpVersion& theVer,
                        bool closeConnection,
                        const TString& headers
                    )
                        : THttpErrorResponseFormatter(theHttpCode, theDescr, theVer, closeConnection, headers)
                        , IOVec(Parts, 1)
                    {
                    }

                    TContIOVector* GetIOvec() override {
                        return &IOVec;
                    }

                    TContIOVector IOVec;
                };

                TTcpSocket::TSendedData sd(new TBuffers(httpCode, descr, ver, SeenMessageWithoutKeepalive_, headers));
                SendData(requestId, sd);
            }

            void ProcessPipeline() {
                // on successfull response to current (PrimaryResponse_) request
                TAtomicBase requestId;
                if (PipelineOrder_.Dequeue(&requestId)) {
                    TAtomicBase oldReqId;
                    do {
                        oldReqId = AtomicGet(PrimaryResponse_);
                        Y_ABORT_UNLESS(oldReqId, "race inside http pipelining");
                    } while (!AtomicCas(&PrimaryResponse_, requestId, oldReqId));

                    ProcessResponsesData();
                } else {
                    TAtomicBase oldReqId = AtomicGet(PrimaryResponse_);
                    if (oldReqId) {
                        while (!AtomicCas(&PrimaryResponse_, 0, oldReqId)) {
                            Y_ABORT_UNLESS(oldReqId == AtomicGet(PrimaryResponse_), "race inside http pipelining [2]");
                        }
                    }
                }
            }

            void ProcessResponsesData() {
                // process responses data queue, send response (if already have next PrimaryResponse_)
                TResponseDataRef rd;
                while (ResponsesDataQueue_.Dequeue(&rd)) {
                    ResponsesData_[rd->RequestId_] = rd;
                }
                TAtomicBase requestId = AtomicGet(PrimaryResponse_);
                if (requestId) {
                    THashMap<TAtomicBase, TResponseDataRef>::iterator it = ResponsesData_.find(requestId);
                    if (it != ResponsesData_.end()) {
                        // has next primary response
                        rd = it->second;
                        ResponsesData_.erase(it);
                        AS_->AsyncWrite(rd->Data_, std::bind(&TConn::OnSend, SelfRef(), _1, _2, _3), THttp2Options::ServerOutputDeadline);
                    }
                }
            }

        private:
            void SendData(TAtomicBase requestId, TTcpSocket::TSendedData sd) {
                TContIOVector& vec = *sd->GetIOvec();

                if (requestId != AtomicGet(PrimaryResponse_)) {
                    // already has another request for response first, so push this to queue
                    // + enqueue event for safe checking queue (at local/transport thread)
                    TResponseDataRef rdr = new TResponseData(requestId, sd);
                    ResponsesDataQueue_.Enqueue(rdr);
                    AS_->GetIOService().Post(std::bind(&TConn::ProcessResponsesData, SelfRef()));
                    return;
                }
                if (THttp2Options::ServerUseDirectWrite) {
                    vec.Proceed(AS_->WriteSome(vec));
                }
                if (!vec.Complete()) {
                    DBGOUT("AsyncWrite()");
                    AS_->AsyncWrite(sd, std::bind(&TConn::OnSend, SelfRef(), _1, _2, _3), THttp2Options::ServerOutputDeadline);
                } else {
                    // run ProcessPipeline at safe thread
                    AS_->GetIOService().Post(std::bind(&TConn::ProcessPipeline, SelfRef()));
                }
            }

            void OnSend(const TErrorCode& ec, size_t amount, IHandlingContext&) {
                Y_UNUSED(amount);
                if (ec) {
                    OnError();
                } else {
                    ProcessPipeline();
                }

                if (SeenMessageWithoutKeepalive_) {
                    TErrorCode shutdown_ec;
                    AS_->Shutdown(TTcpSocket::ShutdownBoth, shutdown_ec);
                }
            }

        public:
            bool IsCanceled() const noexcept {
                return Canceled_;
            }

            const TString& RemoteHost() const noexcept {
                return RemoteHost_;
            }

        private:
            TWeakPtrB<TConn> WeakThis_;
            THttpServer& HS_;
            TTcpSocketRef AS_;
            TString RemoteHost_;
            size_t BuffSize_;
            TArrayHolder<char> Buff_;
            TAutoPtr<THttpParser> P_;
            // pipeline supporting
            TAtomic PrimaryResponse_ = 0;
            TLockFreeQueue<TAtomicBase> PipelineOrder_;
            TLockFreeQueue<TResponseDataRef> ResponsesDataQueue_;
            THashMap<TAtomicBase, TResponseDataRef> ResponsesData_;

            TAtomicBool Canceled_;
            TAtomicBool SeenMessageWithoutKeepalive_ = false;

            i32 LeftRequestsToDisconnect_ = -1;
        };

        ///////////////////////////////////////////////////////////

    public:
        THttpServer(IOnRequest* cb, const TParsedLocation& loc)
            : E_(THttp2Options::AsioServerThreads)
            , CB_(cb)
            , LimitRequestsPerConnection(THttp2Options::LimitRequestsPerConnection)
        {

            TNetworkAddress addr = THttp2Options::RespectHostInHttpServerNetworkAddress ?
                                    TNetworkAddress(TString(loc.Host), loc.GetPort())
                                    : TNetworkAddress(loc.GetPort());

            for (TNetworkAddress::TIterator it = addr.Begin(); it != addr.End(); ++it) {
                TEndpoint ep(new NAddr::TAddrInfo(&*it));
                TTcpAcceptorPtr a(new TTcpAcceptor(AcceptExecutor_.GetIOService()));
                DBGOUT("bind:" << ep.IpToString() << ":" << ep.Port());
                a->Bind(ep);
                a->Listen(THttp2Options::Backlog);
                StartAccept(a.Get());
                A_.push_back(a);
            }
        }

        ~THttpServer() override {
            AcceptExecutor_.SyncShutdown(); //cancel operation for all current sockets (include acceptors)
            A_.clear();                     //stop listening
            E_.SyncShutdown();
        }

        void OnAccept(TTcpAcceptor* a, TAtomicSharedPtr<TTcpSocket> s, const TErrorCode& ec, IHandlingContext&) {
            if (Y_UNLIKELY(ec)) {
                if (ec.Value() == ECANCELED) {
                    return;
                } else if (ec.Value() == EMFILE || ec.Value() == ENFILE || ec.Value() == ENOMEM || ec.Value() == ENOBUFS) {
                    //reach some os limit, suspend accepting
                    TAtomicSharedPtr<TDeadlineTimer> dt(new TDeadlineTimer(a->GetIOService()));
                    dt->AsyncWaitExpireAt(TDuration::Seconds(30), std::bind(&THttpServer::OnTimeoutSuspendAccept, this, a, dt, _1, _2));
                    return;
                } else {
                    Cdbg << "acc: " << ec.Text() << Endl;
                }
            } else {
                if (static_cast<size_t>(HttpInConnCounter()->Val()) < HttpInConnLimits()->Hard()) {
                    try {
                        SetNonBlock(s->Native());
                        PrepareSocket(s->Native());
                        TConn::Create(*this, s);
                    } catch (TSystemError& err) {
                        TErrorCode ec2(err.Status());
                        Cdbg << "acc: " << ec2.Text() << Endl;
                    }
                } //else accepted socket will be closed
            }
            StartAccept(a); //continue accepting
        }

        void OnTimeoutSuspendAccept(TTcpAcceptor* a, TAtomicSharedPtr<TDeadlineTimer>, const TErrorCode& ec, IHandlingContext&) {
            if (!ec) {
                DBGOUT("resume acceptor")
                StartAccept(a);
            }
        }

        void OnRequest(IRequest* r) {
            try {
                CB_->OnRequest(r);
            } catch (...) {
                Cdbg << CurrentExceptionMessage() << Endl;
            }
        }

    protected:
        void OnCreateConn() noexcept {
            HttpInConnCounter()->Inc();
        }

        void OnDestroyConn() noexcept {
            HttpInConnCounter()->Dec();
        }

        TDuration GetKeepAliveTimeout() const noexcept {
            size_t cc = HttpInConnCounter()->Val();
            TFdLimits lim(*HttpInConnLimits());

            if (!TFdLimits::ExceedLimit(cc, lim.Soft())) {
                return THttp2Options::ServerInputDeadlineKeepAliveMax;
            }

            if (cc > lim.Hard()) {
                cc = lim.Hard();
            }
            TDuration::TValue softTuneRange = THttp2Options::ServerInputDeadlineKeepAliveMax.Seconds() - THttp2Options::ServerInputDeadlineKeepAliveMin.Seconds();

            return TDuration::Seconds((softTuneRange * (cc - lim.Soft())) / (lim.Hard() - lim.Soft() + 1)) + THttp2Options::ServerInputDeadlineKeepAliveMin;
        }

    private:
        void StartAccept(TTcpAcceptor* a) {
            TAtomicSharedPtr<TTcpSocket> s(new TTcpSocket(E_.Size() ? E_.GetExecutor().GetIOService() : AcceptExecutor_.GetIOService()));
            a->AsyncAccept(*s, std::bind(&THttpServer::OnAccept, this, a, s, _1, _2));
        }

        TIOServiceExecutor AcceptExecutor_;
        TVector<TTcpAcceptorPtr> A_;
        TExecutorsPool E_;
        IOnRequest* CB_;

    public:
        const i32 LimitRequestsPerConnection;
    };

    template <class T>
    class THttp2Protocol: public IProtocol {
    public:
        IRequesterRef CreateRequester(IOnRequest* cb, const TParsedLocation& loc) override {
            return new THttpServer(cb, loc);
        }

        THandleRef ScheduleRequest(const TMessage& msg, IOnRecv* fallback, TServiceStatRef& ss) override {
            THttpRequest::THandleRef ret(new THttpRequest::THandle(fallback, msg, !ss ? nullptr : new TStatCollector(ss)));
            try {
                THttpRequest::Run(ret, msg, &T::Build, T::RequestSettings());
            } catch (...) {
                ret->ResetOnRecv();
                throw;
            }
            return ret.Get();
        }

        TStringBuf Scheme() const noexcept override {
            return T::Name();
        }

        bool SetOption(TStringBuf name, TStringBuf value) override {
            return THttp2Options::Set(name, value);
        }
    };
}

namespace NNeh {
    IProtocol* Http1Protocol() {
        return Singleton<THttp2Protocol<TRequestGet1>>();
    }
    IProtocol* Post1Protocol() {
        return Singleton<THttp2Protocol<TRequestPost1>>();
    }
    IProtocol* Full1Protocol() {
        return Singleton<THttp2Protocol<TRequestFull1>>();
    }
    IProtocol* Http2Protocol() {
        return Singleton<THttp2Protocol<TRequestGet2>>();
    }
    IProtocol* Post2Protocol() {
        return Singleton<THttp2Protocol<TRequestPost2>>();
    }
    IProtocol* Full2Protocol() {
        return Singleton<THttp2Protocol<TRequestFull2>>();
    }
    IProtocol* UnixSocketGetProtocol() {
        return Singleton<THttp2Protocol<TRequestUnixSocketGet>>();
    }
    IProtocol* UnixSocketPostProtocol() {
        return Singleton<THttp2Protocol<TRequestUnixSocketPost>>();
    }
    IProtocol* UnixSocketFullProtocol() {
        return Singleton<THttp2Protocol<TRequestUnixSocketFull>>();
    }

    void SetHttp2OutputConnectionsLimits(size_t softLimit, size_t hardLimit) {
        HttpConnManager()->SetLimits(softLimit, hardLimit);
    }

    void SetHttp2InputConnectionsLimits(size_t softLimit, size_t hardLimit) {
        HttpInConnLimits()->SetSoft(softLimit);
        HttpInConnLimits()->SetHard(hardLimit);
    }

    TAtomicBase GetHttpOutputConnectionCount() {
        return HttpOutConnCounter()->Val();
    }

    std::pair<size_t, size_t> GetHttpOutputConnectionLimits() {
        return HttpConnManager()->GetLimits();
    }

    TAtomicBase GetHttpInputConnectionCount() {
        return HttpInConnCounter()->Val();
    }

    void SetHttp2InputConnectionsTimeouts(unsigned minSeconds, unsigned maxSeconds) {
        THttp2Options::ServerInputDeadlineKeepAliveMin = TDuration::Seconds(minSeconds);
        THttp2Options::ServerInputDeadlineKeepAliveMax = TDuration::Seconds(maxSeconds);
    }

    class TUnixSocketResolver {
    public:
        NDns::TResolvedHost* Resolve(const TString& path) {
            TString unixSocketPath = path;
            if (path.size() > 2 && path[0] == '[' && path[path.size() - 1] == ']') {
                unixSocketPath = path.substr(1, path.size() - 2);
            }

            if (auto resolvedUnixSocket = ResolvedUnixSockets_.FindPtr(unixSocketPath)) {
                return resolvedUnixSocket->Get();
            }

            TNetworkAddress na{TUnixSocketPath(unixSocketPath)};
            ResolvedUnixSockets_[unixSocketPath] = MakeHolder<NDns::TResolvedHost>(unixSocketPath, na);

            return ResolvedUnixSockets_[unixSocketPath].Get();
        }

    private:
        THashMap<TString, THolder<NDns::TResolvedHost>> ResolvedUnixSockets_;
    };

    TUnixSocketResolver* UnixSocketResolver() {
        return FastTlsSingleton<TUnixSocketResolver>();
    }

    const NDns::TResolvedHost* Resolve(const TStringBuf host, ui16 port, NHttp::EResolverType resolverType) {
        if (resolverType == EResolverType::EUNIXSOCKET) {
            return UnixSocketResolver()->Resolve(TString(host));
        }
        return NDns::CachedResolve(NDns::TResolveInfo(host, port));

    }
}