diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2023-09-25 18:13:46 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2023-09-25 18:47:05 +0300 |
commit | 2d0bcb5ea8b618455e529bd1ab99bfb4e87830c7 (patch) | |
tree | 0bdfcba692d19d3d4fb2044efc5765a46fd323a6 /library/cpp/neh/https.cpp | |
parent | 79c0d84bc00a8f9baccfdb6e0165222d182ab89f (diff) | |
download | ydb-2d0bcb5ea8b618455e529bd1ab99bfb4e87830c7.tar.gz |
Intermediate changes
Diffstat (limited to 'library/cpp/neh/https.cpp')
-rw-r--r-- | library/cpp/neh/https.cpp | 280 |
1 files changed, 122 insertions, 158 deletions
diff --git a/library/cpp/neh/https.cpp b/library/cpp/neh/https.cpp index d75b3a4b10..a5b0920d97 100644 --- a/library/cpp/neh/https.cpp +++ b/library/cpp/neh/https.cpp @@ -374,39 +374,77 @@ namespace NNeh { } class TConnCache; - static TConnCache* SocketCache(); + static TConnCache* ConnectionCache(); class TConnCache: public IThreadFactory::IThreadAble { public: - typedef TAutoLockFreeQueue<TSocketHolder> TConnList; + struct TConnection; typedef TAutoPtr<TSocketHolder> TSocketRef; + typedef THolder<TConnection> TConnectionHolder; + typedef TAutoPtr<TConnectionHolder> TConnectionRef; + typedef TAutoLockFreeQueue<TConnectionHolder> TConnList; struct TConnection { - inline TConnection(TSocketRef& s, bool reUsed, const TResolvedHost* host) noexcept + inline TConnection(TSocketRef& s, const TResolvedHost host) noexcept : Socket(s) - , ReUsed(reUsed) , Host(host) { - SocketCache()->ActiveSockets.Inc(); + ConnectionCache()->ActiveConnections.Inc(); } inline ~TConnection() { + if (!!Socket && IsNotSocketClosedByOtherSide(*Socket)) { + if (!!Ssl) { + ResetBIO(); + // do not wait for shutdown confirmation + Y_UNUSED(SSL_shutdown(Ssl.Get())); + } + } + ConnectionCache()->ActiveConnections.Dec(); + } + + void ResetBIO() { if (!!Socket) { - SocketCache()->ActiveSockets.Dec(); + BIO* bio = BIO_new_socket(*Socket, 0); + SSL_set_bio(Ssl.Get(), bio, bio); } } + bool HasSsl() const { + return Ssl.Get(); + } + + TSslHolder&& MoveSsl() { + return std::move(Ssl); + } + + void SetSsl(TSslHolder&& ssl) { + Ssl = std::move(ssl); + } + + bool ShutdownReceived() { + if (!Ssl) { + return false; + } + char buffer; + int rval = SSL_peek(Ssl.Get(), &buffer, sizeof(buffer)); + if (rval) { + return false; + } + return (SSL_get_shutdown(Ssl.Get()) & SSL_RECEIVED_SHUTDOWN); + } + SOCKET Fd() { return *Socket; } protected: friend class TConnCache; + TSslHolder Ssl; TSocketRef Socket; public: - const bool ReUsed; - const TResolvedHost* Host; + const TResolvedHost Host; }; TConnCache() @@ -428,58 +466,7 @@ namespace NNeh { T_->Join(); } - //used for forwarding filling cache - class TConnector: public IJob { - public: - //create fresh connection - TConnector(const TResolvedHost* host) - : Host_(host) - { - } - - //continue connecting exist socket - TConnector(const TResolvedHost* host, TSocketRef& s) - : Host_(host) - , S_(s) - { - } - - void DoRun(TCont* c) override { - THolder<TConnector> This(this); - - try { - if (!S_) { - TSocketRef res(new TSocketHolder()); - - for (TNetworkAddress::TIterator it = Host_->Addr.Begin(); it != Host_->Addr.End(); ++it) { - int ret = NCoro::ConnectD(c, *res, *it, TDuration::MilliSeconds(300).ToDeadLine()); - - if (!ret) { - TConnection tc(res, false, Host_); - SocketCache()->Release(tc); - return; - } - - if (ret == ECANCELED) { - return; - } - } - } else { - if (!NCoro::PollT(c, *S_, CONT_POLL_WRITE, TDuration::MilliSeconds(300))) { - TConnection tc(S_, false, Host_); - SocketCache()->Release(tc); - } - } - } catch (...) { - } - } - - private: - const TResolvedHost* Host_; - TSocketRef S_; - }; - - TConnection* Connect(TCont* c, const TString& msgAddr, const TResolvedHost* addr, TErrorRef* error) { + TConnectionRef Connect(TCont* c, const TString& msgAddr, const TResolvedHost& addr, TErrorRef* error) { if (ExceedHardLimit()) { if (error) { *error = new TError("neh::https output connections limit reached", TError::TType::UnknownType); @@ -487,28 +474,14 @@ namespace NNeh { return nullptr; } - TSocketRef res; + TConnectionRef res; TConnList& connList = ConnList(addr); while (connList.Dequeue(&res)) { - CachedSockets.Dec(); - - if (IsNotSocketClosedByOtherSide(*res)) { - if (connList.Size() == 0) { - //available connections exhausted - try create yet one (reserve) - TAutoPtr<IJob> job(new TConnector(addr)); - - if (c) { - try { - c->Executor()->Create(*job, "https-con"); - Y_UNUSED(job.Release()); - } catch (...) { - } - } else { - JobQueue()->Schedule(job); - } - } - return new TConnection(res, true, addr); + ActiveConnections.Inc(); + CachedConnections.Dec(); + if (IsNotSocketClosedByOtherSide((*res)->Fd()) && !(*res)->ShutdownReceived()) { + return res; } } @@ -519,46 +492,28 @@ namespace NNeh { return nullptr; } - try { - //run reserve/concurrent connecting - TAutoPtr<IJob> job(new TConnector(addr)); - - c->Executor()->Create(*job, "https-con"); - Y_UNUSED(job.Release()); - } catch (...) { - } - - TNetworkAddress::TIterator ait = addr->Addr.Begin(); - res.Reset(new TSocketHolder(NCoro::Socket(*ait))); const TInstant now(TInstant::Now()); const TInstant deadline(now + TDuration::Seconds(10)); TDuration delay = TDuration::MilliSeconds(8); - TInstant checkpoint = Min(deadline, now + delay); - int ret = NCoro::ConnectD(c, *res, ait->ai_addr, ait->ai_addrlen, checkpoint); + TInstant checkpoint = Min(deadline, delay.ToDeadLine()); + + TNetworkAddress::TIterator ait = addr.Addr.Begin(); + TSocketRef socket(new TSocketHolder(NCoro::Socket(*ait))); + int ret = NCoro::ConnectD(c, *socket, *ait, deadline); + res.Reset(new TConnectionHolder); + res->Reset(new TConnection(socket, addr)); if (ret) { do { if ((ret == ETIMEDOUT || ret == EINTR) && checkpoint < deadline) { delay += delay; checkpoint = Min(deadline, now + delay); - - TSocketRef res2; - + TConnectionRef res2; if (connList.Dequeue(&res2)) { - CachedSockets.Dec(); - - if (IsNotSocketClosedByOtherSide(*res2)) { - try { - TAutoPtr<IJob> job(new TConnector(addr, res)); - - c->Executor()->Create(*job, "https-con"); - Y_UNUSED(job.Release()); - } catch (...) { - } - - res = res2; - - break; + ActiveConnections.Inc(); + CachedConnections.Dec(); + if (IsNotSocketClosedByOtherSide((*res2)->Fd()) && !(*res)->ShutdownReceived()) { + return res2; } } } else { @@ -567,34 +522,30 @@ namespace NNeh { } return nullptr; } - } while (ret = NCoro::PollD(c, *res, CONT_POLL_WRITE, checkpoint)); + } while (ret = NCoro::PollD(c, (*res)->Fd(), CONT_POLL_WRITE, checkpoint)); } - - PrepareSocket(*res); - - return new TConnection(res, false, addr); + PrepareSocket((*res)->Fd()); + return res; } - inline void Release(TConnection& conn) { + inline void Release(TConnectionRef conn) { if (!ExceedHardLimit()) { size_t maxConnId = MaxConnId_.load(std::memory_order_acquire); - while (maxConnId < conn.Host->Id) { + while (maxConnId < (*conn)->Host.Id) { MaxConnId_.compare_exchange_strong( maxConnId, - conn.Host->Id, + (*conn)->Host.Id, std::memory_order_seq_cst, std::memory_order_seq_cst); maxConnId = MaxConnId_.load(std::memory_order_acquire); } - - CachedSockets.Inc(); - ActiveSockets.Dec(); - - ConnList(conn.Host).Enqueue(conn.Socket); + ConnList((*conn)->Host).Enqueue(conn); + CachedConnections.Inc(); + ActiveConnections.Dec(); } - if (CachedSockets.Val() && ExceedSoftLimit()) { + if (CachedConnections.Val() && ExceedSoftLimit()) { SuggestPurgeCache(); } } @@ -609,11 +560,11 @@ namespace NNeh { if (AtomicTryLock(&InPurging_)) { //evaluate the usefulness of purging the cache //если в кеше мало соединений (< MaxConnId_/16 или 64), не чистим кеш - if ((size_t)CachedSockets.Val() > (Min((size_t)MaxConnId_.load(std::memory_order_acquire), (size_t)1024U) >> 4)) { + if ((size_t)CachedConnections.Val() > (Min((size_t)MaxConnId_.load(std::memory_order_acquire), (size_t)1024U) >> 4)) { //по мере приближения к hardlimit нужда в чистке cache приближается к 100% - size_t closenessToHardLimit256 = ((ActiveSockets.Val() + 1) << 8) / (Limits.Delta() + 1); + size_t closenessToHardLimit256 = ((ActiveConnections.Val() + 1) << 8) / (Limits.Delta() + 1); //чем больше соединений в кеше, а не в работе, тем менее нужен кеш (можно его почистить) - size_t cacheUselessness256 = ((CachedSockets.Val() + 1) << 8) / (ActiveSockets.Val() + 1); + size_t cacheUselessness256 = ((CachedConnections.Val() + 1) << 8) / (ActiveConnections.Val() + 1); //итого, - пороги срабатывания: //при достижении soft-limit, если соединения в кеше, а не в работе @@ -647,21 +598,12 @@ namespace NNeh { } } - inline void OnPurgeSocket(ui64& processed) { - CachedSockets.Dec(); - if ((processed++ & 0x3f) == 0) { - //suspend execution every 64 processed socket (clean rate ~= 6400 sockets/sec) - Sleep(TDuration::MilliSeconds(10)); - } - } - void PurgeCache() noexcept { //try remove at least ExceedSoftLimit() oldest connections from cache //вычисляем долю кеша, которую нужно почистить (в 256 долях) (но не менее 1/32 кеша) - size_t frac256 = Min(size_t(Max(size_t(256U / 32U), (ExceedSoftLimit() << 8) / (CachedSockets.Val() + 1))), (size_t)256U); - TSocketRef tmp; + const size_t frac256 = Min<size_t>(256, Max<size_t>(8, (ExceedSoftLimit() << 8) / (CachedConnections.Val() + 1))); + TConnectionRef tmp; - ui64 processed = 0; for (size_t i = 0; i < MaxConnId_.load(std::memory_order_acquire) && !Shutdown_; i++) { TConnList& tc = Lst_.Get(i); if (size_t qsize = tc.Size()) { @@ -669,45 +611,52 @@ namespace NNeh { size_t purgeCounter = ((qsize * frac256) >> 8); if (!purgeCounter && qsize) { - if (qsize <= 2) { - TSocketRef res; + if (qsize == 1) { + // check lifeness + TConnectionRef res; if (tc.Dequeue(&res)) { - if (IsNotSocketClosedByOtherSide(*res)) { + // if connection valid put it back + if (IsNotSocketClosedByOtherSide((*res)->Fd()) && !(*res)->ShutdownReceived()) { tc.Enqueue(res); } else { - OnPurgeSocket(processed); + ActiveConnections.Inc(); + CachedConnections.Dec(); } } } else { + // drop at least one connection from queue with at least 2 connections purgeCounter = 1; } } + while (purgeCounter-- && tc.Dequeue(&tmp)) { - OnPurgeSocket(processed); + ActiveConnections.Inc(); + CachedConnections.Dec(); + tmp->Reset(nullptr); } } } } - inline TConnList& ConnList(const TResolvedHost* addr) { - return Lst_.Get(addr->Id); + inline TConnList& ConnList(const TResolvedHost& addr) { + return Lst_.Get(addr.Id); } - inline size_t TotalSockets() const noexcept { - return ActiveSockets.Val() + CachedSockets.Val(); + inline size_t TotalConnections() const noexcept { + return ActiveConnections.Val() + CachedConnections.Val(); } inline size_t ExceedSoftLimit() const noexcept { - return NHttp::TFdLimits::ExceedLimit(TotalSockets(), Limits.Soft()); + return NHttp::TFdLimits::ExceedLimit(TotalConnections(), Limits.Soft()); } inline size_t ExceedHardLimit() const noexcept { - return NHttp::TFdLimits::ExceedLimit(TotalSockets(), Limits.Hard()); + return NHttp::TFdLimits::ExceedLimit(TotalConnections(), Limits.Hard()); } NHttp::TFdLimits Limits; - TAtomicCounter ActiveSockets; - TAtomicCounter CachedSockets; + TAtomicCounter ActiveConnections; + TAtomicCounter CachedConnections; NHttp::TLockFreeSequence<TConnList> Lst_; @@ -1181,6 +1130,16 @@ namespace NNeh { { } + void SetSsl(TSslHolder&& ssl) { + Ssl_ = std::move(ssl); + BIO_up_ref(*Connection_); + SSL_set_bio(Ssl_.Get(), *Connection_, *Connection_); + } + + TSslHolder&& MoveSsl() { + return std::move(Ssl_); + } + void Handshake() override { Ssl_.Reset(SSL_new(SslCtx_)); if (THttpsOptions::EnableSslClientDebug) { @@ -1242,7 +1201,7 @@ namespace NNeh { //TSslSessionHolder Session_; }; - static TConnCache* SocketCache() { + static TConnCache* ConnectionCache() { return Singleton<TConnCache>(); } @@ -1295,23 +1254,29 @@ namespace NNeh { } TErrorRef error; - THolder<TConnCache::TConnection> s(SocketCache()->Connect(c, Msg_.Addr, Addr_, &error)); - if (!s) { + TConnCache::TConnectionRef connection(ConnectionCache()->Connect(c, Msg_.Addr, *Addr_, &error)); + if (!connection) { Hndl_->NotifyError(error); return; } - TSslClientIOStream io(TSslCtxClient::Instance(), Loc_, s->Fd(), Hndl_->CanceledPtr()); + TSslClientIOStream io(TSslCtxClient::Instance(), Loc_, (*connection)->Fd(), Hndl_->CanceledPtr()); TContBIOWatcher w(io, c); TString received; THttpHeaders headers; TString firstLine; try { - io.Handshake(); + if ((*connection)->HasSsl()) { + io.SetSsl((*connection)->MoveSsl()); + } else { + io.Handshake(); + } RequestData().SendTo(io); Req_.Destroy(); error = ProcessRecv(io, &received, &headers, &firstLine); + (*connection)->SetSsl(io.MoveSsl()); + (*connection)->ResetBIO(); } catch (const TSystemError& e) { if (c->Cancelled() || e.Status() == ECANCELED) { error = new TError("canceled", TError::TType::Cancelled); @@ -1329,8 +1294,7 @@ namespace NNeh { if (error) { Hndl_->NotifyError(error, received, firstLine, headers); } else { - io.Shutdown(); - SocketCache()->Release(*s); + ConnectionCache()->Release(connection); Hndl_->NotifyResponse(received, firstLine, headers); } } @@ -1912,7 +1876,7 @@ namespace NNeh { "invalid output fd limits; hardLimit=%" PRISZT ", softLimit=%" PRISZT, hardLimit, softLimit); - NHttps::SocketCache()->SetFdLimits(softLimit, hardLimit); + NHttps::ConnectionCache()->SetFdLimits(softLimit, hardLimit); } void SetHttpInputConnectionsLimits(size_t softLimit, size_t hardLimit) { |