aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2023-09-05 19:00:09 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2023-09-05 20:08:25 +0300
commitc3106e520c068a6b0e971bbca7a16f63d2a38045 (patch)
tree891cc8e321855bf2c6b48533d2561972759c1f25 /library/cpp
parent2fa5f05ca5559646c5f729dfc6a5d4290d679636 (diff)
downloadydb-c3106e520c068a6b0e971bbca7a16f63d2a38045.tar.gz
Intermediate changes
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/neh/https.cpp162
1 files changed, 99 insertions, 63 deletions
diff --git a/library/cpp/neh/https.cpp b/library/cpp/neh/https.cpp
index d75b3a4b10..8a592bd141 100644
--- a/library/cpp/neh/https.cpp
+++ b/library/cpp/neh/https.cpp
@@ -374,39 +374,66 @@ namespace NNeh {
}
class TConnCache;
- static TConnCache* SocketCache();
+ static TConnCache* ConnectionCache();
class TConnCache: public IThreadFactory::IThreadAble {
public:
- typedef TAutoLockFreeQueue<TSocketHolder> TConnList;
+ struct TConnection;
typedef TAutoPtr<TSocketHolder> TSocketRef;
+ typedef THolder<TConnection> TConnectionHolder;
+ typedef TAutoPtr<TConnectionHolder> TConnectionRef;
+ typedef TAutoLockFreeQueue<TConnectionHolder> TConnList;
struct TConnection {
- inline TConnection(TSocketRef& s, bool reUsed, const TResolvedHost* host) noexcept
+ inline TConnection(TSocketRef& s, const TResolvedHost host) noexcept
: Socket(s)
- , ReUsed(reUsed)
, Host(host)
{
- SocketCache()->ActiveSockets.Inc();
+ ConnectionCache()->ActiveSockets.Inc();
}
inline ~TConnection() {
if (!!Socket) {
- SocketCache()->ActiveSockets.Dec();
+ if (!!Ssl) {
+ // gracefull shutdown
+ BIO* bio = BIO_new_socket(*Socket, 0);
+ SSL_set_bio(Ssl.Get(), bio, bio);
+ for (size_t i = 0; i < 2; ++i) {
+ bool rval = SSL_shutdown(Ssl.Get());
+ if (0 == rval) {
+ continue;
+ } else if (1 == rval) {
+ break;
+ }
+ }
+ }
+ ConnectionCache()->ActiveSockets.Dec();
}
}
+ bool HasSsl() const {
+ return Ssl.Get();
+ }
+
+ TSslHolder&& MoveSsl() {
+ return std::move(Ssl);
+ }
+
+ void SetSsl(TSslHolder&& ssl) {
+ Ssl = std::move(ssl);
+ }
+
SOCKET Fd() {
return *Socket;
}
protected:
friend class TConnCache;
+ TSslHolder Ssl;
TSocketRef Socket;
public:
- const bool ReUsed;
- const TResolvedHost* Host;
+ const TResolvedHost Host;
};
TConnCache()
@@ -432,15 +459,15 @@ namespace NNeh {
class TConnector: public IJob {
public:
//create fresh connection
- TConnector(const TResolvedHost* host)
+ TConnector(const TResolvedHost host)
: Host_(host)
{
}
//continue connecting exist socket
- TConnector(const TResolvedHost* host, TSocketRef& s)
+ TConnector(const TResolvedHost host, TConnectionRef& connection)
: Host_(host)
- , S_(s)
+ , Connection_(connection)
{
}
@@ -448,15 +475,16 @@ namespace NNeh {
THolder<TConnector> This(this);
try {
- if (!S_) {
- TSocketRef res(new TSocketHolder());
+ if (!Connection_) {
+ TSocketRef socket(new TSocketHolder());
- for (TNetworkAddress::TIterator it = Host_->Addr.Begin(); it != Host_->Addr.End(); ++it) {
- int ret = NCoro::ConnectD(c, *res, *it, TDuration::MilliSeconds(300).ToDeadLine());
+ for (TNetworkAddress::TIterator it = Host_.Addr.Begin(); it != Host_.Addr.End(); ++it) {
+ int ret = NCoro::ConnectD(c, *socket, *it, TDuration::MilliSeconds(300).ToDeadLine());
if (!ret) {
- TConnection tc(res, false, Host_);
- SocketCache()->Release(tc);
+ TConnectionRef res(new TConnectionHolder);
+ res->Reset(new TConnection(socket, Host_));
+ ConnectionCache()->Release(res);
return;
}
@@ -465,9 +493,9 @@ namespace NNeh {
}
}
} else {
- if (!NCoro::PollT(c, *S_, CONT_POLL_WRITE, TDuration::MilliSeconds(300))) {
- TConnection tc(S_, false, Host_);
- SocketCache()->Release(tc);
+ if (!NCoro::PollT(c, (*Connection_)->Fd(), CONT_POLL_WRITE, TDuration::MilliSeconds(300))) {
+ ConnectionCache()->ActiveSockets.Inc();
+ ConnectionCache()->Release(Connection_);
}
}
} catch (...) {
@@ -475,11 +503,11 @@ namespace NNeh {
}
private:
- const TResolvedHost* Host_;
- TSocketRef S_;
+ const TResolvedHost Host_;
+ TConnectionRef Connection_;
};
- TConnection* Connect(TCont* c, const TString& msgAddr, const TResolvedHost* addr, TErrorRef* error) {
+ TConnectionRef Connect(TCont* c, const TString& msgAddr, const TResolvedHost& addr, TErrorRef* error) {
if (ExceedHardLimit()) {
if (error) {
*error = new TError("neh::https output connections limit reached", TError::TType::UnknownType);
@@ -487,17 +515,15 @@ namespace NNeh {
return nullptr;
}
- TSocketRef res;
+ TConnectionRef res;
TConnList& connList = ConnList(addr);
while (connList.Dequeue(&res)) {
+ ActiveSockets.Inc();
CachedSockets.Dec();
-
- if (IsNotSocketClosedByOtherSide(*res)) {
+ if (IsNotSocketClosedByOtherSide((*res)->Fd())) {
if (connList.Size() == 0) {
- //available connections exhausted - try create yet one (reserve)
TAutoPtr<IJob> job(new TConnector(addr));
-
if (c) {
try {
c->Executor()->Create(*job, "https-con");
@@ -508,7 +534,7 @@ namespace NNeh {
JobQueue()->Schedule(job);
}
}
- return new TConnection(res, true, addr);
+ return res;
}
}
@@ -528,36 +554,34 @@ namespace NNeh {
} catch (...) {
}
- TNetworkAddress::TIterator ait = addr->Addr.Begin();
- res.Reset(new TSocketHolder(NCoro::Socket(*ait)));
const TInstant now(TInstant::Now());
const TInstant deadline(now + TDuration::Seconds(10));
TDuration delay = TDuration::MilliSeconds(8);
- TInstant checkpoint = Min(deadline, now + delay);
- int ret = NCoro::ConnectD(c, *res, ait->ai_addr, ait->ai_addrlen, checkpoint);
+ TInstant checkpoint = Min(deadline, delay.ToDeadLine());
+
+ TNetworkAddress::TIterator ait = addr.Addr.Begin();
+ TSocketRef socket(new TSocketHolder(NCoro::Socket(*ait)));
+ int ret = NCoro::ConnectD(c, *socket, *ait, deadline);
+ res.Reset(new TConnectionHolder);
+ res->Reset(new TConnection(socket, addr));
if (ret) {
do {
if ((ret == ETIMEDOUT || ret == EINTR) && checkpoint < deadline) {
delay += delay;
checkpoint = Min(deadline, now + delay);
-
- TSocketRef res2;
-
+ TConnectionRef res2;
if (connList.Dequeue(&res2)) {
+ ActiveSockets.Inc();
CachedSockets.Dec();
-
- if (IsNotSocketClosedByOtherSide(*res2)) {
+ if (IsNotSocketClosedByOtherSide((*res2)->Fd())) {
try {
TAutoPtr<IJob> job(new TConnector(addr, res));
-
c->Executor()->Create(*job, "https-con");
Y_UNUSED(job.Release());
} catch (...) {
}
-
res = res2;
-
break;
}
}
@@ -567,22 +591,20 @@ namespace NNeh {
}
return nullptr;
}
- } while (ret = NCoro::PollD(c, *res, CONT_POLL_WRITE, checkpoint));
+ } while (ret = NCoro::PollD(c, (*res)->Fd(), CONT_POLL_WRITE, checkpoint));
}
-
- PrepareSocket(*res);
-
- return new TConnection(res, false, addr);
+ PrepareSocket((*res)->Fd());
+ return res;
}
- inline void Release(TConnection& conn) {
+ inline void Release(TConnectionRef conn) {
if (!ExceedHardLimit()) {
size_t maxConnId = MaxConnId_.load(std::memory_order_acquire);
- while (maxConnId < conn.Host->Id) {
+ while (maxConnId < (*conn)->Host.Id) {
MaxConnId_.compare_exchange_strong(
maxConnId,
- conn.Host->Id,
+ (*conn)->Host.Id,
std::memory_order_seq_cst,
std::memory_order_seq_cst);
maxConnId = MaxConnId_.load(std::memory_order_acquire);
@@ -591,7 +613,7 @@ namespace NNeh {
CachedSockets.Inc();
ActiveSockets.Dec();
- ConnList(conn.Host).Enqueue(conn.Socket);
+ ConnList((*conn)->Host).Enqueue(conn);
}
if (CachedSockets.Val() && ExceedSoftLimit()) {
@@ -659,7 +681,7 @@ namespace NNeh {
//try remove at least ExceedSoftLimit() oldest connections from cache
//вычисляем долю кеша, которую нужно почистить (в 256 долях) (но не менее 1/32 кеша)
size_t frac256 = Min(size_t(Max(size_t(256U / 32U), (ExceedSoftLimit() << 8) / (CachedSockets.Val() + 1))), (size_t)256U);
- TSocketRef tmp;
+ TConnectionRef tmp;
ui64 processed = 0;
for (size_t i = 0; i < MaxConnId_.load(std::memory_order_acquire) && !Shutdown_; i++) {
@@ -670,9 +692,9 @@ namespace NNeh {
if (!purgeCounter && qsize) {
if (qsize <= 2) {
- TSocketRef res;
+ TConnectionRef res;
if (tc.Dequeue(&res)) {
- if (IsNotSocketClosedByOtherSide(*res)) {
+ if (IsNotSocketClosedByOtherSide((*res)->Fd())) {
tc.Enqueue(res);
} else {
OnPurgeSocket(processed);
@@ -689,8 +711,8 @@ namespace NNeh {
}
}
- inline TConnList& ConnList(const TResolvedHost* addr) {
- return Lst_.Get(addr->Id);
+ inline TConnList& ConnList(const TResolvedHost& addr) {
+ return Lst_.Get(addr.Id);
}
inline size_t TotalSockets() const noexcept {
@@ -1181,6 +1203,16 @@ namespace NNeh {
{
}
+ void SetSsl(TSslHolder&& ssl) {
+ Ssl_ = std::move(ssl);
+ BIO_up_ref(*Connection_);
+ SSL_set_bio(Ssl_.Get(), *Connection_, *Connection_);
+ }
+
+ TSslHolder&& MoveSsl() {
+ return std::move(Ssl_);
+ }
+
void Handshake() override {
Ssl_.Reset(SSL_new(SslCtx_));
if (THttpsOptions::EnableSslClientDebug) {
@@ -1242,7 +1274,7 @@ namespace NNeh {
//TSslSessionHolder Session_;
};
- static TConnCache* SocketCache() {
+ static TConnCache* ConnectionCache() {
return Singleton<TConnCache>();
}
@@ -1295,20 +1327,24 @@ namespace NNeh {
}
TErrorRef error;
- THolder<TConnCache::TConnection> s(SocketCache()->Connect(c, Msg_.Addr, Addr_, &error));
- if (!s) {
+ TConnCache::TConnectionRef connection(ConnectionCache()->Connect(c, Msg_.Addr, *Addr_, &error));
+ if (!connection) {
Hndl_->NotifyError(error);
return;
}
- TSslClientIOStream io(TSslCtxClient::Instance(), Loc_, s->Fd(), Hndl_->CanceledPtr());
+ TSslClientIOStream io(TSslCtxClient::Instance(), Loc_, (*connection)->Fd(), Hndl_->CanceledPtr());
TContBIOWatcher w(io, c);
TString received;
THttpHeaders headers;
TString firstLine;
try {
- io.Handshake();
+ if ((*connection)->HasSsl()) {
+ io.SetSsl((*connection)->MoveSsl());
+ } else {
+ io.Handshake();
+ }
RequestData().SendTo(io);
Req_.Destroy();
error = ProcessRecv(io, &received, &headers, &firstLine);
@@ -1329,8 +1365,8 @@ namespace NNeh {
if (error) {
Hndl_->NotifyError(error, received, firstLine, headers);
} else {
- io.Shutdown();
- SocketCache()->Release(*s);
+ (*connection)->SetSsl(io.MoveSsl());
+ ConnectionCache()->Release(connection);
Hndl_->NotifyResponse(received, firstLine, headers);
}
}
@@ -1912,7 +1948,7 @@ namespace NNeh {
"invalid output fd limits; hardLimit=%" PRISZT ", softLimit=%" PRISZT,
hardLimit, softLimit);
- NHttps::SocketCache()->SetFdLimits(softLimit, hardLimit);
+ NHttps::ConnectionCache()->SetFdLimits(softLimit, hardLimit);
}
void SetHttpInputConnectionsLimits(size_t softLimit, size_t hardLimit) {