diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2023-09-05 19:00:09 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2023-09-05 20:08:25 +0300 |
commit | c3106e520c068a6b0e971bbca7a16f63d2a38045 (patch) | |
tree | 891cc8e321855bf2c6b48533d2561972759c1f25 /library/cpp | |
parent | 2fa5f05ca5559646c5f729dfc6a5d4290d679636 (diff) | |
download | ydb-c3106e520c068a6b0e971bbca7a16f63d2a38045.tar.gz |
Intermediate changes
Diffstat (limited to 'library/cpp')
-rw-r--r-- | library/cpp/neh/https.cpp | 162 |
1 files changed, 99 insertions, 63 deletions
diff --git a/library/cpp/neh/https.cpp b/library/cpp/neh/https.cpp index d75b3a4b10..8a592bd141 100644 --- a/library/cpp/neh/https.cpp +++ b/library/cpp/neh/https.cpp @@ -374,39 +374,66 @@ namespace NNeh { } class TConnCache; - static TConnCache* SocketCache(); + static TConnCache* ConnectionCache(); class TConnCache: public IThreadFactory::IThreadAble { public: - typedef TAutoLockFreeQueue<TSocketHolder> TConnList; + struct TConnection; typedef TAutoPtr<TSocketHolder> TSocketRef; + typedef THolder<TConnection> TConnectionHolder; + typedef TAutoPtr<TConnectionHolder> TConnectionRef; + typedef TAutoLockFreeQueue<TConnectionHolder> TConnList; struct TConnection { - inline TConnection(TSocketRef& s, bool reUsed, const TResolvedHost* host) noexcept + inline TConnection(TSocketRef& s, const TResolvedHost host) noexcept : Socket(s) - , ReUsed(reUsed) , Host(host) { - SocketCache()->ActiveSockets.Inc(); + ConnectionCache()->ActiveSockets.Inc(); } inline ~TConnection() { if (!!Socket) { - SocketCache()->ActiveSockets.Dec(); + if (!!Ssl) { + // gracefull shutdown + BIO* bio = BIO_new_socket(*Socket, 0); + SSL_set_bio(Ssl.Get(), bio, bio); + for (size_t i = 0; i < 2; ++i) { + bool rval = SSL_shutdown(Ssl.Get()); + if (0 == rval) { + continue; + } else if (1 == rval) { + break; + } + } + } + ConnectionCache()->ActiveSockets.Dec(); } } + bool HasSsl() const { + return Ssl.Get(); + } + + TSslHolder&& MoveSsl() { + return std::move(Ssl); + } + + void SetSsl(TSslHolder&& ssl) { + Ssl = std::move(ssl); + } + SOCKET Fd() { return *Socket; } protected: friend class TConnCache; + TSslHolder Ssl; TSocketRef Socket; public: - const bool ReUsed; - const TResolvedHost* Host; + const TResolvedHost Host; }; TConnCache() @@ -432,15 +459,15 @@ namespace NNeh { class TConnector: public IJob { public: //create fresh connection - TConnector(const TResolvedHost* host) + TConnector(const TResolvedHost host) : Host_(host) { } //continue connecting exist socket - TConnector(const TResolvedHost* host, TSocketRef& s) + TConnector(const TResolvedHost host, TConnectionRef& connection) : Host_(host) - , S_(s) + , Connection_(connection) { } @@ -448,15 +475,16 @@ namespace NNeh { THolder<TConnector> This(this); try { - if (!S_) { - TSocketRef res(new TSocketHolder()); + if (!Connection_) { + TSocketRef socket(new TSocketHolder()); - for (TNetworkAddress::TIterator it = Host_->Addr.Begin(); it != Host_->Addr.End(); ++it) { - int ret = NCoro::ConnectD(c, *res, *it, TDuration::MilliSeconds(300).ToDeadLine()); + for (TNetworkAddress::TIterator it = Host_.Addr.Begin(); it != Host_.Addr.End(); ++it) { + int ret = NCoro::ConnectD(c, *socket, *it, TDuration::MilliSeconds(300).ToDeadLine()); if (!ret) { - TConnection tc(res, false, Host_); - SocketCache()->Release(tc); + TConnectionRef res(new TConnectionHolder); + res->Reset(new TConnection(socket, Host_)); + ConnectionCache()->Release(res); return; } @@ -465,9 +493,9 @@ namespace NNeh { } } } else { - if (!NCoro::PollT(c, *S_, CONT_POLL_WRITE, TDuration::MilliSeconds(300))) { - TConnection tc(S_, false, Host_); - SocketCache()->Release(tc); + if (!NCoro::PollT(c, (*Connection_)->Fd(), CONT_POLL_WRITE, TDuration::MilliSeconds(300))) { + ConnectionCache()->ActiveSockets.Inc(); + ConnectionCache()->Release(Connection_); } } } catch (...) { @@ -475,11 +503,11 @@ namespace NNeh { } private: - const TResolvedHost* Host_; - TSocketRef S_; + const TResolvedHost Host_; + TConnectionRef Connection_; }; - TConnection* Connect(TCont* c, const TString& msgAddr, const TResolvedHost* addr, TErrorRef* error) { + TConnectionRef Connect(TCont* c, const TString& msgAddr, const TResolvedHost& addr, TErrorRef* error) { if (ExceedHardLimit()) { if (error) { *error = new TError("neh::https output connections limit reached", TError::TType::UnknownType); @@ -487,17 +515,15 @@ namespace NNeh { return nullptr; } - TSocketRef res; + TConnectionRef res; TConnList& connList = ConnList(addr); while (connList.Dequeue(&res)) { + ActiveSockets.Inc(); CachedSockets.Dec(); - - if (IsNotSocketClosedByOtherSide(*res)) { + if (IsNotSocketClosedByOtherSide((*res)->Fd())) { if (connList.Size() == 0) { - //available connections exhausted - try create yet one (reserve) TAutoPtr<IJob> job(new TConnector(addr)); - if (c) { try { c->Executor()->Create(*job, "https-con"); @@ -508,7 +534,7 @@ namespace NNeh { JobQueue()->Schedule(job); } } - return new TConnection(res, true, addr); + return res; } } @@ -528,36 +554,34 @@ namespace NNeh { } catch (...) { } - TNetworkAddress::TIterator ait = addr->Addr.Begin(); - res.Reset(new TSocketHolder(NCoro::Socket(*ait))); const TInstant now(TInstant::Now()); const TInstant deadline(now + TDuration::Seconds(10)); TDuration delay = TDuration::MilliSeconds(8); - TInstant checkpoint = Min(deadline, now + delay); - int ret = NCoro::ConnectD(c, *res, ait->ai_addr, ait->ai_addrlen, checkpoint); + TInstant checkpoint = Min(deadline, delay.ToDeadLine()); + + TNetworkAddress::TIterator ait = addr.Addr.Begin(); + TSocketRef socket(new TSocketHolder(NCoro::Socket(*ait))); + int ret = NCoro::ConnectD(c, *socket, *ait, deadline); + res.Reset(new TConnectionHolder); + res->Reset(new TConnection(socket, addr)); if (ret) { do { if ((ret == ETIMEDOUT || ret == EINTR) && checkpoint < deadline) { delay += delay; checkpoint = Min(deadline, now + delay); - - TSocketRef res2; - + TConnectionRef res2; if (connList.Dequeue(&res2)) { + ActiveSockets.Inc(); CachedSockets.Dec(); - - if (IsNotSocketClosedByOtherSide(*res2)) { + if (IsNotSocketClosedByOtherSide((*res2)->Fd())) { try { TAutoPtr<IJob> job(new TConnector(addr, res)); - c->Executor()->Create(*job, "https-con"); Y_UNUSED(job.Release()); } catch (...) { } - res = res2; - break; } } @@ -567,22 +591,20 @@ namespace NNeh { } return nullptr; } - } while (ret = NCoro::PollD(c, *res, CONT_POLL_WRITE, checkpoint)); + } while (ret = NCoro::PollD(c, (*res)->Fd(), CONT_POLL_WRITE, checkpoint)); } - - PrepareSocket(*res); - - return new TConnection(res, false, addr); + PrepareSocket((*res)->Fd()); + return res; } - inline void Release(TConnection& conn) { + inline void Release(TConnectionRef conn) { if (!ExceedHardLimit()) { size_t maxConnId = MaxConnId_.load(std::memory_order_acquire); - while (maxConnId < conn.Host->Id) { + while (maxConnId < (*conn)->Host.Id) { MaxConnId_.compare_exchange_strong( maxConnId, - conn.Host->Id, + (*conn)->Host.Id, std::memory_order_seq_cst, std::memory_order_seq_cst); maxConnId = MaxConnId_.load(std::memory_order_acquire); @@ -591,7 +613,7 @@ namespace NNeh { CachedSockets.Inc(); ActiveSockets.Dec(); - ConnList(conn.Host).Enqueue(conn.Socket); + ConnList((*conn)->Host).Enqueue(conn); } if (CachedSockets.Val() && ExceedSoftLimit()) { @@ -659,7 +681,7 @@ namespace NNeh { //try remove at least ExceedSoftLimit() oldest connections from cache //вычисляем долю кеша, которую нужно почистить (в 256 долях) (но не менее 1/32 кеша) size_t frac256 = Min(size_t(Max(size_t(256U / 32U), (ExceedSoftLimit() << 8) / (CachedSockets.Val() + 1))), (size_t)256U); - TSocketRef tmp; + TConnectionRef tmp; ui64 processed = 0; for (size_t i = 0; i < MaxConnId_.load(std::memory_order_acquire) && !Shutdown_; i++) { @@ -670,9 +692,9 @@ namespace NNeh { if (!purgeCounter && qsize) { if (qsize <= 2) { - TSocketRef res; + TConnectionRef res; if (tc.Dequeue(&res)) { - if (IsNotSocketClosedByOtherSide(*res)) { + if (IsNotSocketClosedByOtherSide((*res)->Fd())) { tc.Enqueue(res); } else { OnPurgeSocket(processed); @@ -689,8 +711,8 @@ namespace NNeh { } } - inline TConnList& ConnList(const TResolvedHost* addr) { - return Lst_.Get(addr->Id); + inline TConnList& ConnList(const TResolvedHost& addr) { + return Lst_.Get(addr.Id); } inline size_t TotalSockets() const noexcept { @@ -1181,6 +1203,16 @@ namespace NNeh { { } + void SetSsl(TSslHolder&& ssl) { + Ssl_ = std::move(ssl); + BIO_up_ref(*Connection_); + SSL_set_bio(Ssl_.Get(), *Connection_, *Connection_); + } + + TSslHolder&& MoveSsl() { + return std::move(Ssl_); + } + void Handshake() override { Ssl_.Reset(SSL_new(SslCtx_)); if (THttpsOptions::EnableSslClientDebug) { @@ -1242,7 +1274,7 @@ namespace NNeh { //TSslSessionHolder Session_; }; - static TConnCache* SocketCache() { + static TConnCache* ConnectionCache() { return Singleton<TConnCache>(); } @@ -1295,20 +1327,24 @@ namespace NNeh { } TErrorRef error; - THolder<TConnCache::TConnection> s(SocketCache()->Connect(c, Msg_.Addr, Addr_, &error)); - if (!s) { + TConnCache::TConnectionRef connection(ConnectionCache()->Connect(c, Msg_.Addr, *Addr_, &error)); + if (!connection) { Hndl_->NotifyError(error); return; } - TSslClientIOStream io(TSslCtxClient::Instance(), Loc_, s->Fd(), Hndl_->CanceledPtr()); + TSslClientIOStream io(TSslCtxClient::Instance(), Loc_, (*connection)->Fd(), Hndl_->CanceledPtr()); TContBIOWatcher w(io, c); TString received; THttpHeaders headers; TString firstLine; try { - io.Handshake(); + if ((*connection)->HasSsl()) { + io.SetSsl((*connection)->MoveSsl()); + } else { + io.Handshake(); + } RequestData().SendTo(io); Req_.Destroy(); error = ProcessRecv(io, &received, &headers, &firstLine); @@ -1329,8 +1365,8 @@ namespace NNeh { if (error) { Hndl_->NotifyError(error, received, firstLine, headers); } else { - io.Shutdown(); - SocketCache()->Release(*s); + (*connection)->SetSsl(io.MoveSsl()); + ConnectionCache()->Release(connection); Hndl_->NotifyResponse(received, firstLine, headers); } } @@ -1912,7 +1948,7 @@ namespace NNeh { "invalid output fd limits; hardLimit=%" PRISZT ", softLimit=%" PRISZT, hardLimit, softLimit); - NHttps::SocketCache()->SetFdLimits(softLimit, hardLimit); + NHttps::ConnectionCache()->SetFdLimits(softLimit, hardLimit); } void SetHttpInputConnectionsLimits(size_t softLimit, size_t hardLimit) { |