aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2023-09-25 18:13:46 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2023-09-25 18:47:05 +0300
commit2d0bcb5ea8b618455e529bd1ab99bfb4e87830c7 (patch)
tree0bdfcba692d19d3d4fb2044efc5765a46fd323a6
parent79c0d84bc00a8f9baccfdb6e0165222d182ab89f (diff)
downloadydb-2d0bcb5ea8b618455e529bd1ab99bfb4e87830c7.tar.gz
Intermediate changes
-rw-r--r--library/cpp/neh/https.cpp280
1 files changed, 122 insertions, 158 deletions
diff --git a/library/cpp/neh/https.cpp b/library/cpp/neh/https.cpp
index d75b3a4b10..a5b0920d97 100644
--- a/library/cpp/neh/https.cpp
+++ b/library/cpp/neh/https.cpp
@@ -374,39 +374,77 @@ namespace NNeh {
}
class TConnCache;
- static TConnCache* SocketCache();
+ static TConnCache* ConnectionCache();
class TConnCache: public IThreadFactory::IThreadAble {
public:
- typedef TAutoLockFreeQueue<TSocketHolder> TConnList;
+ struct TConnection;
typedef TAutoPtr<TSocketHolder> TSocketRef;
+ typedef THolder<TConnection> TConnectionHolder;
+ typedef TAutoPtr<TConnectionHolder> TConnectionRef;
+ typedef TAutoLockFreeQueue<TConnectionHolder> TConnList;
struct TConnection {
- inline TConnection(TSocketRef& s, bool reUsed, const TResolvedHost* host) noexcept
+ inline TConnection(TSocketRef& s, const TResolvedHost host) noexcept
: Socket(s)
- , ReUsed(reUsed)
, Host(host)
{
- SocketCache()->ActiveSockets.Inc();
+ ConnectionCache()->ActiveConnections.Inc();
}
inline ~TConnection() {
+ if (!!Socket && IsNotSocketClosedByOtherSide(*Socket)) {
+ if (!!Ssl) {
+ ResetBIO();
+ // do not wait for shutdown confirmation
+ Y_UNUSED(SSL_shutdown(Ssl.Get()));
+ }
+ }
+ ConnectionCache()->ActiveConnections.Dec();
+ }
+
+ void ResetBIO() {
if (!!Socket) {
- SocketCache()->ActiveSockets.Dec();
+ BIO* bio = BIO_new_socket(*Socket, 0);
+ SSL_set_bio(Ssl.Get(), bio, bio);
}
}
+ bool HasSsl() const {
+ return Ssl.Get();
+ }
+
+ TSslHolder&& MoveSsl() {
+ return std::move(Ssl);
+ }
+
+ void SetSsl(TSslHolder&& ssl) {
+ Ssl = std::move(ssl);
+ }
+
+ bool ShutdownReceived() {
+ if (!Ssl) {
+ return false;
+ }
+ char buffer;
+ int rval = SSL_peek(Ssl.Get(), &buffer, sizeof(buffer));
+ if (rval) {
+ return false;
+ }
+ return (SSL_get_shutdown(Ssl.Get()) & SSL_RECEIVED_SHUTDOWN);
+ }
+
SOCKET Fd() {
return *Socket;
}
protected:
friend class TConnCache;
+ TSslHolder Ssl;
TSocketRef Socket;
public:
- const bool ReUsed;
- const TResolvedHost* Host;
+ const TResolvedHost Host;
};
TConnCache()
@@ -428,58 +466,7 @@ namespace NNeh {
T_->Join();
}
- //used for forwarding filling cache
- class TConnector: public IJob {
- public:
- //create fresh connection
- TConnector(const TResolvedHost* host)
- : Host_(host)
- {
- }
-
- //continue connecting exist socket
- TConnector(const TResolvedHost* host, TSocketRef& s)
- : Host_(host)
- , S_(s)
- {
- }
-
- void DoRun(TCont* c) override {
- THolder<TConnector> This(this);
-
- try {
- if (!S_) {
- TSocketRef res(new TSocketHolder());
-
- for (TNetworkAddress::TIterator it = Host_->Addr.Begin(); it != Host_->Addr.End(); ++it) {
- int ret = NCoro::ConnectD(c, *res, *it, TDuration::MilliSeconds(300).ToDeadLine());
-
- if (!ret) {
- TConnection tc(res, false, Host_);
- SocketCache()->Release(tc);
- return;
- }
-
- if (ret == ECANCELED) {
- return;
- }
- }
- } else {
- if (!NCoro::PollT(c, *S_, CONT_POLL_WRITE, TDuration::MilliSeconds(300))) {
- TConnection tc(S_, false, Host_);
- SocketCache()->Release(tc);
- }
- }
- } catch (...) {
- }
- }
-
- private:
- const TResolvedHost* Host_;
- TSocketRef S_;
- };
-
- TConnection* Connect(TCont* c, const TString& msgAddr, const TResolvedHost* addr, TErrorRef* error) {
+ TConnectionRef Connect(TCont* c, const TString& msgAddr, const TResolvedHost& addr, TErrorRef* error) {
if (ExceedHardLimit()) {
if (error) {
*error = new TError("neh::https output connections limit reached", TError::TType::UnknownType);
@@ -487,28 +474,14 @@ namespace NNeh {
return nullptr;
}
- TSocketRef res;
+ TConnectionRef res;
TConnList& connList = ConnList(addr);
while (connList.Dequeue(&res)) {
- CachedSockets.Dec();
-
- if (IsNotSocketClosedByOtherSide(*res)) {
- if (connList.Size() == 0) {
- //available connections exhausted - try create yet one (reserve)
- TAutoPtr<IJob> job(new TConnector(addr));
-
- if (c) {
- try {
- c->Executor()->Create(*job, "https-con");
- Y_UNUSED(job.Release());
- } catch (...) {
- }
- } else {
- JobQueue()->Schedule(job);
- }
- }
- return new TConnection(res, true, addr);
+ ActiveConnections.Inc();
+ CachedConnections.Dec();
+ if (IsNotSocketClosedByOtherSide((*res)->Fd()) && !(*res)->ShutdownReceived()) {
+ return res;
}
}
@@ -519,46 +492,28 @@ namespace NNeh {
return nullptr;
}
- try {
- //run reserve/concurrent connecting
- TAutoPtr<IJob> job(new TConnector(addr));
-
- c->Executor()->Create(*job, "https-con");
- Y_UNUSED(job.Release());
- } catch (...) {
- }
-
- TNetworkAddress::TIterator ait = addr->Addr.Begin();
- res.Reset(new TSocketHolder(NCoro::Socket(*ait)));
const TInstant now(TInstant::Now());
const TInstant deadline(now + TDuration::Seconds(10));
TDuration delay = TDuration::MilliSeconds(8);
- TInstant checkpoint = Min(deadline, now + delay);
- int ret = NCoro::ConnectD(c, *res, ait->ai_addr, ait->ai_addrlen, checkpoint);
+ TInstant checkpoint = Min(deadline, delay.ToDeadLine());
+
+ TNetworkAddress::TIterator ait = addr.Addr.Begin();
+ TSocketRef socket(new TSocketHolder(NCoro::Socket(*ait)));
+ int ret = NCoro::ConnectD(c, *socket, *ait, deadline);
+ res.Reset(new TConnectionHolder);
+ res->Reset(new TConnection(socket, addr));
if (ret) {
do {
if ((ret == ETIMEDOUT || ret == EINTR) && checkpoint < deadline) {
delay += delay;
checkpoint = Min(deadline, now + delay);
-
- TSocketRef res2;
-
+ TConnectionRef res2;
if (connList.Dequeue(&res2)) {
- CachedSockets.Dec();
-
- if (IsNotSocketClosedByOtherSide(*res2)) {
- try {
- TAutoPtr<IJob> job(new TConnector(addr, res));
-
- c->Executor()->Create(*job, "https-con");
- Y_UNUSED(job.Release());
- } catch (...) {
- }
-
- res = res2;
-
- break;
+ ActiveConnections.Inc();
+ CachedConnections.Dec();
+ if (IsNotSocketClosedByOtherSide((*res2)->Fd()) && !(*res)->ShutdownReceived()) {
+ return res2;
}
}
} else {
@@ -567,34 +522,30 @@ namespace NNeh {
}
return nullptr;
}
- } while (ret = NCoro::PollD(c, *res, CONT_POLL_WRITE, checkpoint));
+ } while (ret = NCoro::PollD(c, (*res)->Fd(), CONT_POLL_WRITE, checkpoint));
}
-
- PrepareSocket(*res);
-
- return new TConnection(res, false, addr);
+ PrepareSocket((*res)->Fd());
+ return res;
}
- inline void Release(TConnection& conn) {
+ inline void Release(TConnectionRef conn) {
if (!ExceedHardLimit()) {
size_t maxConnId = MaxConnId_.load(std::memory_order_acquire);
- while (maxConnId < conn.Host->Id) {
+ while (maxConnId < (*conn)->Host.Id) {
MaxConnId_.compare_exchange_strong(
maxConnId,
- conn.Host->Id,
+ (*conn)->Host.Id,
std::memory_order_seq_cst,
std::memory_order_seq_cst);
maxConnId = MaxConnId_.load(std::memory_order_acquire);
}
-
- CachedSockets.Inc();
- ActiveSockets.Dec();
-
- ConnList(conn.Host).Enqueue(conn.Socket);
+ ConnList((*conn)->Host).Enqueue(conn);
+ CachedConnections.Inc();
+ ActiveConnections.Dec();
}
- if (CachedSockets.Val() && ExceedSoftLimit()) {
+ if (CachedConnections.Val() && ExceedSoftLimit()) {
SuggestPurgeCache();
}
}
@@ -609,11 +560,11 @@ namespace NNeh {
if (AtomicTryLock(&InPurging_)) {
//evaluate the usefulness of purging the cache
//если в кеше мало соединений (< MaxConnId_/16 или 64), не чистим кеш
- if ((size_t)CachedSockets.Val() > (Min((size_t)MaxConnId_.load(std::memory_order_acquire), (size_t)1024U) >> 4)) {
+ if ((size_t)CachedConnections.Val() > (Min((size_t)MaxConnId_.load(std::memory_order_acquire), (size_t)1024U) >> 4)) {
//по мере приближения к hardlimit нужда в чистке cache приближается к 100%
- size_t closenessToHardLimit256 = ((ActiveSockets.Val() + 1) << 8) / (Limits.Delta() + 1);
+ size_t closenessToHardLimit256 = ((ActiveConnections.Val() + 1) << 8) / (Limits.Delta() + 1);
//чем больше соединений в кеше, а не в работе, тем менее нужен кеш (можно его почистить)
- size_t cacheUselessness256 = ((CachedSockets.Val() + 1) << 8) / (ActiveSockets.Val() + 1);
+ size_t cacheUselessness256 = ((CachedConnections.Val() + 1) << 8) / (ActiveConnections.Val() + 1);
//итого, - пороги срабатывания:
//при достижении soft-limit, если соединения в кеше, а не в работе
@@ -647,21 +598,12 @@ namespace NNeh {
}
}
- inline void OnPurgeSocket(ui64& processed) {
- CachedSockets.Dec();
- if ((processed++ & 0x3f) == 0) {
- //suspend execution every 64 processed socket (clean rate ~= 6400 sockets/sec)
- Sleep(TDuration::MilliSeconds(10));
- }
- }
-
void PurgeCache() noexcept {
//try remove at least ExceedSoftLimit() oldest connections from cache
//вычисляем долю кеша, которую нужно почистить (в 256 долях) (но не менее 1/32 кеша)
- size_t frac256 = Min(size_t(Max(size_t(256U / 32U), (ExceedSoftLimit() << 8) / (CachedSockets.Val() + 1))), (size_t)256U);
- TSocketRef tmp;
+ const size_t frac256 = Min<size_t>(256, Max<size_t>(8, (ExceedSoftLimit() << 8) / (CachedConnections.Val() + 1)));
+ TConnectionRef tmp;
- ui64 processed = 0;
for (size_t i = 0; i < MaxConnId_.load(std::memory_order_acquire) && !Shutdown_; i++) {
TConnList& tc = Lst_.Get(i);
if (size_t qsize = tc.Size()) {
@@ -669,45 +611,52 @@ namespace NNeh {
size_t purgeCounter = ((qsize * frac256) >> 8);
if (!purgeCounter && qsize) {
- if (qsize <= 2) {
- TSocketRef res;
+ if (qsize == 1) {
+ // check lifeness
+ TConnectionRef res;
if (tc.Dequeue(&res)) {
- if (IsNotSocketClosedByOtherSide(*res)) {
+ // if connection valid put it back
+ if (IsNotSocketClosedByOtherSide((*res)->Fd()) && !(*res)->ShutdownReceived()) {
tc.Enqueue(res);
} else {
- OnPurgeSocket(processed);
+ ActiveConnections.Inc();
+ CachedConnections.Dec();
}
}
} else {
+ // drop at least one connection from queue with at least 2 connections
purgeCounter = 1;
}
}
+
while (purgeCounter-- && tc.Dequeue(&tmp)) {
- OnPurgeSocket(processed);
+ ActiveConnections.Inc();
+ CachedConnections.Dec();
+ tmp->Reset(nullptr);
}
}
}
}
- inline TConnList& ConnList(const TResolvedHost* addr) {
- return Lst_.Get(addr->Id);
+ inline TConnList& ConnList(const TResolvedHost& addr) {
+ return Lst_.Get(addr.Id);
}
- inline size_t TotalSockets() const noexcept {
- return ActiveSockets.Val() + CachedSockets.Val();
+ inline size_t TotalConnections() const noexcept {
+ return ActiveConnections.Val() + CachedConnections.Val();
}
inline size_t ExceedSoftLimit() const noexcept {
- return NHttp::TFdLimits::ExceedLimit(TotalSockets(), Limits.Soft());
+ return NHttp::TFdLimits::ExceedLimit(TotalConnections(), Limits.Soft());
}
inline size_t ExceedHardLimit() const noexcept {
- return NHttp::TFdLimits::ExceedLimit(TotalSockets(), Limits.Hard());
+ return NHttp::TFdLimits::ExceedLimit(TotalConnections(), Limits.Hard());
}
NHttp::TFdLimits Limits;
- TAtomicCounter ActiveSockets;
- TAtomicCounter CachedSockets;
+ TAtomicCounter ActiveConnections;
+ TAtomicCounter CachedConnections;
NHttp::TLockFreeSequence<TConnList> Lst_;
@@ -1181,6 +1130,16 @@ namespace NNeh {
{
}
+ void SetSsl(TSslHolder&& ssl) {
+ Ssl_ = std::move(ssl);
+ BIO_up_ref(*Connection_);
+ SSL_set_bio(Ssl_.Get(), *Connection_, *Connection_);
+ }
+
+ TSslHolder&& MoveSsl() {
+ return std::move(Ssl_);
+ }
+
void Handshake() override {
Ssl_.Reset(SSL_new(SslCtx_));
if (THttpsOptions::EnableSslClientDebug) {
@@ -1242,7 +1201,7 @@ namespace NNeh {
//TSslSessionHolder Session_;
};
- static TConnCache* SocketCache() {
+ static TConnCache* ConnectionCache() {
return Singleton<TConnCache>();
}
@@ -1295,23 +1254,29 @@ namespace NNeh {
}
TErrorRef error;
- THolder<TConnCache::TConnection> s(SocketCache()->Connect(c, Msg_.Addr, Addr_, &error));
- if (!s) {
+ TConnCache::TConnectionRef connection(ConnectionCache()->Connect(c, Msg_.Addr, *Addr_, &error));
+ if (!connection) {
Hndl_->NotifyError(error);
return;
}
- TSslClientIOStream io(TSslCtxClient::Instance(), Loc_, s->Fd(), Hndl_->CanceledPtr());
+ TSslClientIOStream io(TSslCtxClient::Instance(), Loc_, (*connection)->Fd(), Hndl_->CanceledPtr());
TContBIOWatcher w(io, c);
TString received;
THttpHeaders headers;
TString firstLine;
try {
- io.Handshake();
+ if ((*connection)->HasSsl()) {
+ io.SetSsl((*connection)->MoveSsl());
+ } else {
+ io.Handshake();
+ }
RequestData().SendTo(io);
Req_.Destroy();
error = ProcessRecv(io, &received, &headers, &firstLine);
+ (*connection)->SetSsl(io.MoveSsl());
+ (*connection)->ResetBIO();
} catch (const TSystemError& e) {
if (c->Cancelled() || e.Status() == ECANCELED) {
error = new TError("canceled", TError::TType::Cancelled);
@@ -1329,8 +1294,7 @@ namespace NNeh {
if (error) {
Hndl_->NotifyError(error, received, firstLine, headers);
} else {
- io.Shutdown();
- SocketCache()->Release(*s);
+ ConnectionCache()->Release(connection);
Hndl_->NotifyResponse(received, firstLine, headers);
}
}
@@ -1912,7 +1876,7 @@ namespace NNeh {
"invalid output fd limits; hardLimit=%" PRISZT ", softLimit=%" PRISZT,
hardLimit, softLimit);
- NHttps::SocketCache()->SetFdLimits(softLimit, hardLimit);
+ NHttps::ConnectionCache()->SetFdLimits(softLimit, hardLimit);
}
void SetHttpInputConnectionsLimits(size_t softLimit, size_t hardLimit) {