aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/neh
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2023-09-14 21:37:20 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2023-09-14 21:53:33 +0300
commit59dbf512fa4bb44d4873c2dd926eb95a57895472 (patch)
treec192d027d0b3b13b988947b91e63a2a961793cde /library/cpp/neh
parentc021d71a2bce52d110b473b1c0f224664fea5163 (diff)
downloadydb-59dbf512fa4bb44d4873c2dd926eb95a57895472.tar.gz
Intermediate changes
Diffstat (limited to 'library/cpp/neh')
-rw-r--r--library/cpp/neh/https.cpp162
1 files changed, 63 insertions, 99 deletions
diff --git a/library/cpp/neh/https.cpp b/library/cpp/neh/https.cpp
index 8a592bd141..d75b3a4b10 100644
--- a/library/cpp/neh/https.cpp
+++ b/library/cpp/neh/https.cpp
@@ -374,66 +374,39 @@ namespace NNeh {
}
class TConnCache;
- static TConnCache* ConnectionCache();
+ static TConnCache* SocketCache();
class TConnCache: public IThreadFactory::IThreadAble {
public:
- struct TConnection;
+ typedef TAutoLockFreeQueue<TSocketHolder> TConnList;
typedef TAutoPtr<TSocketHolder> TSocketRef;
- typedef THolder<TConnection> TConnectionHolder;
- typedef TAutoPtr<TConnectionHolder> TConnectionRef;
- typedef TAutoLockFreeQueue<TConnectionHolder> TConnList;
struct TConnection {
- inline TConnection(TSocketRef& s, const TResolvedHost host) noexcept
+ inline TConnection(TSocketRef& s, bool reUsed, const TResolvedHost* host) noexcept
: Socket(s)
+ , ReUsed(reUsed)
, Host(host)
{
- ConnectionCache()->ActiveSockets.Inc();
+ SocketCache()->ActiveSockets.Inc();
}
inline ~TConnection() {
if (!!Socket) {
- if (!!Ssl) {
- // gracefull shutdown
- BIO* bio = BIO_new_socket(*Socket, 0);
- SSL_set_bio(Ssl.Get(), bio, bio);
- for (size_t i = 0; i < 2; ++i) {
- bool rval = SSL_shutdown(Ssl.Get());
- if (0 == rval) {
- continue;
- } else if (1 == rval) {
- break;
- }
- }
- }
- ConnectionCache()->ActiveSockets.Dec();
+ SocketCache()->ActiveSockets.Dec();
}
}
- bool HasSsl() const {
- return Ssl.Get();
- }
-
- TSslHolder&& MoveSsl() {
- return std::move(Ssl);
- }
-
- void SetSsl(TSslHolder&& ssl) {
- Ssl = std::move(ssl);
- }
-
SOCKET Fd() {
return *Socket;
}
protected:
friend class TConnCache;
- TSslHolder Ssl;
TSocketRef Socket;
public:
- const TResolvedHost Host;
+ const bool ReUsed;
+ const TResolvedHost* Host;
};
TConnCache()
@@ -459,15 +432,15 @@ namespace NNeh {
class TConnector: public IJob {
public:
//create fresh connection
- TConnector(const TResolvedHost host)
+ TConnector(const TResolvedHost* host)
: Host_(host)
{
}
//continue connecting exist socket
- TConnector(const TResolvedHost host, TConnectionRef& connection)
+ TConnector(const TResolvedHost* host, TSocketRef& s)
: Host_(host)
- , Connection_(connection)
+ , S_(s)
{
}
@@ -475,16 +448,15 @@ namespace NNeh {
THolder<TConnector> This(this);
try {
- if (!Connection_) {
- TSocketRef socket(new TSocketHolder());
+ if (!S_) {
+ TSocketRef res(new TSocketHolder());
- for (TNetworkAddress::TIterator it = Host_.Addr.Begin(); it != Host_.Addr.End(); ++it) {
- int ret = NCoro::ConnectD(c, *socket, *it, TDuration::MilliSeconds(300).ToDeadLine());
+ for (TNetworkAddress::TIterator it = Host_->Addr.Begin(); it != Host_->Addr.End(); ++it) {
+ int ret = NCoro::ConnectD(c, *res, *it, TDuration::MilliSeconds(300).ToDeadLine());
if (!ret) {
- TConnectionRef res(new TConnectionHolder);
- res->Reset(new TConnection(socket, Host_));
- ConnectionCache()->Release(res);
+ TConnection tc(res, false, Host_);
+ SocketCache()->Release(tc);
return;
}
@@ -493,9 +465,9 @@ namespace NNeh {
}
}
} else {
- if (!NCoro::PollT(c, (*Connection_)->Fd(), CONT_POLL_WRITE, TDuration::MilliSeconds(300))) {
- ConnectionCache()->ActiveSockets.Inc();
- ConnectionCache()->Release(Connection_);
+ if (!NCoro::PollT(c, *S_, CONT_POLL_WRITE, TDuration::MilliSeconds(300))) {
+ TConnection tc(S_, false, Host_);
+ SocketCache()->Release(tc);
}
}
} catch (...) {
@@ -503,11 +475,11 @@ namespace NNeh {
}
private:
- const TResolvedHost Host_;
- TConnectionRef Connection_;
+ const TResolvedHost* Host_;
+ TSocketRef S_;
};
- TConnectionRef Connect(TCont* c, const TString& msgAddr, const TResolvedHost& addr, TErrorRef* error) {
+ TConnection* Connect(TCont* c, const TString& msgAddr, const TResolvedHost* addr, TErrorRef* error) {
if (ExceedHardLimit()) {
if (error) {
*error = new TError("neh::https output connections limit reached", TError::TType::UnknownType);
@@ -515,15 +487,17 @@ namespace NNeh {
return nullptr;
}
- TConnectionRef res;
+ TSocketRef res;
TConnList& connList = ConnList(addr);
while (connList.Dequeue(&res)) {
- ActiveSockets.Inc();
CachedSockets.Dec();
- if (IsNotSocketClosedByOtherSide((*res)->Fd())) {
+
+ if (IsNotSocketClosedByOtherSide(*res)) {
if (connList.Size() == 0) {
+ //available connections exhausted - try create yet one (reserve)
TAutoPtr<IJob> job(new TConnector(addr));
+
if (c) {
try {
c->Executor()->Create(*job, "https-con");
@@ -534,7 +508,7 @@ namespace NNeh {
JobQueue()->Schedule(job);
}
}
- return res;
+ return new TConnection(res, true, addr);
}
}
@@ -554,34 +528,36 @@ namespace NNeh {
} catch (...) {
}
+ TNetworkAddress::TIterator ait = addr->Addr.Begin();
+ res.Reset(new TSocketHolder(NCoro::Socket(*ait)));
const TInstant now(TInstant::Now());
const TInstant deadline(now + TDuration::Seconds(10));
TDuration delay = TDuration::MilliSeconds(8);
- TInstant checkpoint = Min(deadline, delay.ToDeadLine());
-
- TNetworkAddress::TIterator ait = addr.Addr.Begin();
- TSocketRef socket(new TSocketHolder(NCoro::Socket(*ait)));
- int ret = NCoro::ConnectD(c, *socket, *ait, deadline);
- res.Reset(new TConnectionHolder);
- res->Reset(new TConnection(socket, addr));
+ TInstant checkpoint = Min(deadline, now + delay);
+ int ret = NCoro::ConnectD(c, *res, ait->ai_addr, ait->ai_addrlen, checkpoint);
if (ret) {
do {
if ((ret == ETIMEDOUT || ret == EINTR) && checkpoint < deadline) {
delay += delay;
checkpoint = Min(deadline, now + delay);
- TConnectionRef res2;
+
+ TSocketRef res2;
+
if (connList.Dequeue(&res2)) {
- ActiveSockets.Inc();
CachedSockets.Dec();
- if (IsNotSocketClosedByOtherSide((*res2)->Fd())) {
+
+ if (IsNotSocketClosedByOtherSide(*res2)) {
try {
TAutoPtr<IJob> job(new TConnector(addr, res));
+
c->Executor()->Create(*job, "https-con");
Y_UNUSED(job.Release());
} catch (...) {
}
+
res = res2;
+
break;
}
}
@@ -591,20 +567,22 @@ namespace NNeh {
}
return nullptr;
}
- } while (ret = NCoro::PollD(c, (*res)->Fd(), CONT_POLL_WRITE, checkpoint));
+ } while (ret = NCoro::PollD(c, *res, CONT_POLL_WRITE, checkpoint));
}
- PrepareSocket((*res)->Fd());
- return res;
+
+ PrepareSocket(*res);
+
+ return new TConnection(res, false, addr);
}
- inline void Release(TConnectionRef conn) {
+ inline void Release(TConnection& conn) {
if (!ExceedHardLimit()) {
size_t maxConnId = MaxConnId_.load(std::memory_order_acquire);
- while (maxConnId < (*conn)->Host.Id) {
+ while (maxConnId < conn.Host->Id) {
MaxConnId_.compare_exchange_strong(
maxConnId,
- (*conn)->Host.Id,
+ conn.Host->Id,
std::memory_order_seq_cst,
std::memory_order_seq_cst);
maxConnId = MaxConnId_.load(std::memory_order_acquire);
@@ -613,7 +591,7 @@ namespace NNeh {
CachedSockets.Inc();
ActiveSockets.Dec();
- ConnList((*conn)->Host).Enqueue(conn);
+ ConnList(conn.Host).Enqueue(conn.Socket);
}
if (CachedSockets.Val() && ExceedSoftLimit()) {
@@ -681,7 +659,7 @@ namespace NNeh {
//try remove at least ExceedSoftLimit() oldest connections from cache
//вычисляем долю кеша, которую нужно почистить (в 256 долях) (но не менее 1/32 кеша)
size_t frac256 = Min(size_t(Max(size_t(256U / 32U), (ExceedSoftLimit() << 8) / (CachedSockets.Val() + 1))), (size_t)256U);
- TConnectionRef tmp;
+ TSocketRef tmp;
ui64 processed = 0;
for (size_t i = 0; i < MaxConnId_.load(std::memory_order_acquire) && !Shutdown_; i++) {
@@ -692,9 +670,9 @@ namespace NNeh {
if (!purgeCounter && qsize) {
if (qsize <= 2) {
- TConnectionRef res;
+ TSocketRef res;
if (tc.Dequeue(&res)) {
- if (IsNotSocketClosedByOtherSide((*res)->Fd())) {
+ if (IsNotSocketClosedByOtherSide(*res)) {
tc.Enqueue(res);
} else {
OnPurgeSocket(processed);
@@ -711,8 +689,8 @@ namespace NNeh {
}
}
- inline TConnList& ConnList(const TResolvedHost& addr) {
- return Lst_.Get(addr.Id);
+ inline TConnList& ConnList(const TResolvedHost* addr) {
+ return Lst_.Get(addr->Id);
}
inline size_t TotalSockets() const noexcept {
@@ -1203,16 +1181,6 @@ namespace NNeh {
{
}
- void SetSsl(TSslHolder&& ssl) {
- Ssl_ = std::move(ssl);
- BIO_up_ref(*Connection_);
- SSL_set_bio(Ssl_.Get(), *Connection_, *Connection_);
- }
-
- TSslHolder&& MoveSsl() {
- return std::move(Ssl_);
- }
-
void Handshake() override {
Ssl_.Reset(SSL_new(SslCtx_));
if (THttpsOptions::EnableSslClientDebug) {
@@ -1274,7 +1242,7 @@ namespace NNeh {
//TSslSessionHolder Session_;
};
- static TConnCache* ConnectionCache() {
+ static TConnCache* SocketCache() {
return Singleton<TConnCache>();
}
@@ -1327,24 +1295,20 @@ namespace NNeh {
}
TErrorRef error;
- TConnCache::TConnectionRef connection(ConnectionCache()->Connect(c, Msg_.Addr, *Addr_, &error));
- if (!connection) {
+ THolder<TConnCache::TConnection> s(SocketCache()->Connect(c, Msg_.Addr, Addr_, &error));
+ if (!s) {
Hndl_->NotifyError(error);
return;
}
- TSslClientIOStream io(TSslCtxClient::Instance(), Loc_, (*connection)->Fd(), Hndl_->CanceledPtr());
+ TSslClientIOStream io(TSslCtxClient::Instance(), Loc_, s->Fd(), Hndl_->CanceledPtr());
TContBIOWatcher w(io, c);
TString received;
THttpHeaders headers;
TString firstLine;
try {
- if ((*connection)->HasSsl()) {
- io.SetSsl((*connection)->MoveSsl());
- } else {
- io.Handshake();
- }
+ io.Handshake();
RequestData().SendTo(io);
Req_.Destroy();
error = ProcessRecv(io, &received, &headers, &firstLine);
@@ -1365,8 +1329,8 @@ namespace NNeh {
if (error) {
Hndl_->NotifyError(error, received, firstLine, headers);
} else {
- (*connection)->SetSsl(io.MoveSsl());
- ConnectionCache()->Release(connection);
+ io.Shutdown();
+ SocketCache()->Release(*s);
Hndl_->NotifyResponse(received, firstLine, headers);
}
}
@@ -1948,7 +1912,7 @@ namespace NNeh {
"invalid output fd limits; hardLimit=%" PRISZT ", softLimit=%" PRISZT,
hardLimit, softLimit);
- NHttps::ConnectionCache()->SetFdLimits(softLimit, hardLimit);
+ NHttps::SocketCache()->SetFdLimits(softLimit, hardLimit);
}
void SetHttpInputConnectionsLimits(size_t softLimit, size_t hardLimit) {