diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2023-09-14 21:37:20 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2023-09-14 21:53:33 +0300 |
commit | 59dbf512fa4bb44d4873c2dd926eb95a57895472 (patch) | |
tree | c192d027d0b3b13b988947b91e63a2a961793cde | |
parent | c021d71a2bce52d110b473b1c0f224664fea5163 (diff) | |
download | ydb-59dbf512fa4bb44d4873c2dd926eb95a57895472.tar.gz |
Intermediate changes
-rw-r--r-- | library/cpp/neh/https.cpp | 162 |
1 files changed, 63 insertions, 99 deletions
diff --git a/library/cpp/neh/https.cpp b/library/cpp/neh/https.cpp index 8a592bd141..d75b3a4b10 100644 --- a/library/cpp/neh/https.cpp +++ b/library/cpp/neh/https.cpp @@ -374,66 +374,39 @@ namespace NNeh { } class TConnCache; - static TConnCache* ConnectionCache(); + static TConnCache* SocketCache(); class TConnCache: public IThreadFactory::IThreadAble { public: - struct TConnection; + typedef TAutoLockFreeQueue<TSocketHolder> TConnList; typedef TAutoPtr<TSocketHolder> TSocketRef; - typedef THolder<TConnection> TConnectionHolder; - typedef TAutoPtr<TConnectionHolder> TConnectionRef; - typedef TAutoLockFreeQueue<TConnectionHolder> TConnList; struct TConnection { - inline TConnection(TSocketRef& s, const TResolvedHost host) noexcept + inline TConnection(TSocketRef& s, bool reUsed, const TResolvedHost* host) noexcept : Socket(s) + , ReUsed(reUsed) , Host(host) { - ConnectionCache()->ActiveSockets.Inc(); + SocketCache()->ActiveSockets.Inc(); } inline ~TConnection() { if (!!Socket) { - if (!!Ssl) { - // gracefull shutdown - BIO* bio = BIO_new_socket(*Socket, 0); - SSL_set_bio(Ssl.Get(), bio, bio); - for (size_t i = 0; i < 2; ++i) { - bool rval = SSL_shutdown(Ssl.Get()); - if (0 == rval) { - continue; - } else if (1 == rval) { - break; - } - } - } - ConnectionCache()->ActiveSockets.Dec(); + SocketCache()->ActiveSockets.Dec(); } } - bool HasSsl() const { - return Ssl.Get(); - } - - TSslHolder&& MoveSsl() { - return std::move(Ssl); - } - - void SetSsl(TSslHolder&& ssl) { - Ssl = std::move(ssl); - } - SOCKET Fd() { return *Socket; } protected: friend class TConnCache; - TSslHolder Ssl; TSocketRef Socket; public: - const TResolvedHost Host; + const bool ReUsed; + const TResolvedHost* Host; }; TConnCache() @@ -459,15 +432,15 @@ namespace NNeh { class TConnector: public IJob { public: //create fresh connection - TConnector(const TResolvedHost host) + TConnector(const TResolvedHost* host) : Host_(host) { } //continue connecting exist socket - TConnector(const TResolvedHost host, TConnectionRef& connection) + TConnector(const TResolvedHost* host, TSocketRef& s) : Host_(host) - , Connection_(connection) + , S_(s) { } @@ -475,16 +448,15 @@ namespace NNeh { THolder<TConnector> This(this); try { - if (!Connection_) { - TSocketRef socket(new TSocketHolder()); + if (!S_) { + TSocketRef res(new TSocketHolder()); - for (TNetworkAddress::TIterator it = Host_.Addr.Begin(); it != Host_.Addr.End(); ++it) { - int ret = NCoro::ConnectD(c, *socket, *it, TDuration::MilliSeconds(300).ToDeadLine()); + for (TNetworkAddress::TIterator it = Host_->Addr.Begin(); it != Host_->Addr.End(); ++it) { + int ret = NCoro::ConnectD(c, *res, *it, TDuration::MilliSeconds(300).ToDeadLine()); if (!ret) { - TConnectionRef res(new TConnectionHolder); - res->Reset(new TConnection(socket, Host_)); - ConnectionCache()->Release(res); + TConnection tc(res, false, Host_); + SocketCache()->Release(tc); return; } @@ -493,9 +465,9 @@ namespace NNeh { } } } else { - if (!NCoro::PollT(c, (*Connection_)->Fd(), CONT_POLL_WRITE, TDuration::MilliSeconds(300))) { - ConnectionCache()->ActiveSockets.Inc(); - ConnectionCache()->Release(Connection_); + if (!NCoro::PollT(c, *S_, CONT_POLL_WRITE, TDuration::MilliSeconds(300))) { + TConnection tc(S_, false, Host_); + SocketCache()->Release(tc); } } } catch (...) { @@ -503,11 +475,11 @@ namespace NNeh { } private: - const TResolvedHost Host_; - TConnectionRef Connection_; + const TResolvedHost* Host_; + TSocketRef S_; }; - TConnectionRef Connect(TCont* c, const TString& msgAddr, const TResolvedHost& addr, TErrorRef* error) { + TConnection* Connect(TCont* c, const TString& msgAddr, const TResolvedHost* addr, TErrorRef* error) { if (ExceedHardLimit()) { if (error) { *error = new TError("neh::https output connections limit reached", TError::TType::UnknownType); @@ -515,15 +487,17 @@ namespace NNeh { return nullptr; } - TConnectionRef res; + TSocketRef res; TConnList& connList = ConnList(addr); while (connList.Dequeue(&res)) { - ActiveSockets.Inc(); CachedSockets.Dec(); - if (IsNotSocketClosedByOtherSide((*res)->Fd())) { + + if (IsNotSocketClosedByOtherSide(*res)) { if (connList.Size() == 0) { + //available connections exhausted - try create yet one (reserve) TAutoPtr<IJob> job(new TConnector(addr)); + if (c) { try { c->Executor()->Create(*job, "https-con"); @@ -534,7 +508,7 @@ namespace NNeh { JobQueue()->Schedule(job); } } - return res; + return new TConnection(res, true, addr); } } @@ -554,34 +528,36 @@ namespace NNeh { } catch (...) { } + TNetworkAddress::TIterator ait = addr->Addr.Begin(); + res.Reset(new TSocketHolder(NCoro::Socket(*ait))); const TInstant now(TInstant::Now()); const TInstant deadline(now + TDuration::Seconds(10)); TDuration delay = TDuration::MilliSeconds(8); - TInstant checkpoint = Min(deadline, delay.ToDeadLine()); - - TNetworkAddress::TIterator ait = addr.Addr.Begin(); - TSocketRef socket(new TSocketHolder(NCoro::Socket(*ait))); - int ret = NCoro::ConnectD(c, *socket, *ait, deadline); - res.Reset(new TConnectionHolder); - res->Reset(new TConnection(socket, addr)); + TInstant checkpoint = Min(deadline, now + delay); + int ret = NCoro::ConnectD(c, *res, ait->ai_addr, ait->ai_addrlen, checkpoint); if (ret) { do { if ((ret == ETIMEDOUT || ret == EINTR) && checkpoint < deadline) { delay += delay; checkpoint = Min(deadline, now + delay); - TConnectionRef res2; + + TSocketRef res2; + if (connList.Dequeue(&res2)) { - ActiveSockets.Inc(); CachedSockets.Dec(); - if (IsNotSocketClosedByOtherSide((*res2)->Fd())) { + + if (IsNotSocketClosedByOtherSide(*res2)) { try { TAutoPtr<IJob> job(new TConnector(addr, res)); + c->Executor()->Create(*job, "https-con"); Y_UNUSED(job.Release()); } catch (...) { } + res = res2; + break; } } @@ -591,20 +567,22 @@ namespace NNeh { } return nullptr; } - } while (ret = NCoro::PollD(c, (*res)->Fd(), CONT_POLL_WRITE, checkpoint)); + } while (ret = NCoro::PollD(c, *res, CONT_POLL_WRITE, checkpoint)); } - PrepareSocket((*res)->Fd()); - return res; + + PrepareSocket(*res); + + return new TConnection(res, false, addr); } - inline void Release(TConnectionRef conn) { + inline void Release(TConnection& conn) { if (!ExceedHardLimit()) { size_t maxConnId = MaxConnId_.load(std::memory_order_acquire); - while (maxConnId < (*conn)->Host.Id) { + while (maxConnId < conn.Host->Id) { MaxConnId_.compare_exchange_strong( maxConnId, - (*conn)->Host.Id, + conn.Host->Id, std::memory_order_seq_cst, std::memory_order_seq_cst); maxConnId = MaxConnId_.load(std::memory_order_acquire); @@ -613,7 +591,7 @@ namespace NNeh { CachedSockets.Inc(); ActiveSockets.Dec(); - ConnList((*conn)->Host).Enqueue(conn); + ConnList(conn.Host).Enqueue(conn.Socket); } if (CachedSockets.Val() && ExceedSoftLimit()) { @@ -681,7 +659,7 @@ namespace NNeh { //try remove at least ExceedSoftLimit() oldest connections from cache //вычисляем долю кеша, которую нужно почистить (в 256 долях) (но не менее 1/32 кеша) size_t frac256 = Min(size_t(Max(size_t(256U / 32U), (ExceedSoftLimit() << 8) / (CachedSockets.Val() + 1))), (size_t)256U); - TConnectionRef tmp; + TSocketRef tmp; ui64 processed = 0; for (size_t i = 0; i < MaxConnId_.load(std::memory_order_acquire) && !Shutdown_; i++) { @@ -692,9 +670,9 @@ namespace NNeh { if (!purgeCounter && qsize) { if (qsize <= 2) { - TConnectionRef res; + TSocketRef res; if (tc.Dequeue(&res)) { - if (IsNotSocketClosedByOtherSide((*res)->Fd())) { + if (IsNotSocketClosedByOtherSide(*res)) { tc.Enqueue(res); } else { OnPurgeSocket(processed); @@ -711,8 +689,8 @@ namespace NNeh { } } - inline TConnList& ConnList(const TResolvedHost& addr) { - return Lst_.Get(addr.Id); + inline TConnList& ConnList(const TResolvedHost* addr) { + return Lst_.Get(addr->Id); } inline size_t TotalSockets() const noexcept { @@ -1203,16 +1181,6 @@ namespace NNeh { { } - void SetSsl(TSslHolder&& ssl) { - Ssl_ = std::move(ssl); - BIO_up_ref(*Connection_); - SSL_set_bio(Ssl_.Get(), *Connection_, *Connection_); - } - - TSslHolder&& MoveSsl() { - return std::move(Ssl_); - } - void Handshake() override { Ssl_.Reset(SSL_new(SslCtx_)); if (THttpsOptions::EnableSslClientDebug) { @@ -1274,7 +1242,7 @@ namespace NNeh { //TSslSessionHolder Session_; }; - static TConnCache* ConnectionCache() { + static TConnCache* SocketCache() { return Singleton<TConnCache>(); } @@ -1327,24 +1295,20 @@ namespace NNeh { } TErrorRef error; - TConnCache::TConnectionRef connection(ConnectionCache()->Connect(c, Msg_.Addr, *Addr_, &error)); - if (!connection) { + THolder<TConnCache::TConnection> s(SocketCache()->Connect(c, Msg_.Addr, Addr_, &error)); + if (!s) { Hndl_->NotifyError(error); return; } - TSslClientIOStream io(TSslCtxClient::Instance(), Loc_, (*connection)->Fd(), Hndl_->CanceledPtr()); + TSslClientIOStream io(TSslCtxClient::Instance(), Loc_, s->Fd(), Hndl_->CanceledPtr()); TContBIOWatcher w(io, c); TString received; THttpHeaders headers; TString firstLine; try { - if ((*connection)->HasSsl()) { - io.SetSsl((*connection)->MoveSsl()); - } else { - io.Handshake(); - } + io.Handshake(); RequestData().SendTo(io); Req_.Destroy(); error = ProcessRecv(io, &received, &headers, &firstLine); @@ -1365,8 +1329,8 @@ namespace NNeh { if (error) { Hndl_->NotifyError(error, received, firstLine, headers); } else { - (*connection)->SetSsl(io.MoveSsl()); - ConnectionCache()->Release(connection); + io.Shutdown(); + SocketCache()->Release(*s); Hndl_->NotifyResponse(received, firstLine, headers); } } @@ -1948,7 +1912,7 @@ namespace NNeh { "invalid output fd limits; hardLimit=%" PRISZT ", softLimit=%" PRISZT, hardLimit, softLimit); - NHttps::ConnectionCache()->SetFdLimits(softLimit, hardLimit); + NHttps::SocketCache()->SetFdLimits(softLimit, hardLimit); } void SetHttpInputConnectionsLimits(size_t softLimit, size_t hardLimit) { |