diff options
| author | babenko <[email protected]> | 2024-09-19 00:03:20 +0300 |
|---|---|---|
| committer | babenko <[email protected]> | 2024-09-19 00:16:35 +0300 |
| commit | ff8fb64ae74790de8eae8e3a3be7b7e204387590 (patch) | |
| tree | 3f11910b345f71bd82a531748c7154e4a0074ebb | |
| parent | d1a1efc07455cf12920c34ecc614573a978b3848 (diff) | |
Refactor and improve logging in HTTP client
commit_hash:f1cdaa7cad19759076e1635e041044afb79321b1
| -rw-r--r-- | yt/yt/core/bus/tcp/connection.cpp | 8 | ||||
| -rw-r--r-- | yt/yt/core/bus/tcp/connection.h | 2 | ||||
| -rw-r--r-- | yt/yt/core/crypto/tls.cpp | 101 | ||||
| -rw-r--r-- | yt/yt/core/http/client.cpp | 77 | ||||
| -rw-r--r-- | yt/yt/core/http/client.h | 10 | ||||
| -rw-r--r-- | yt/yt/core/http/connection_pool.cpp | 27 | ||||
| -rw-r--r-- | yt/yt/core/http/connection_reuse_helpers.h | 10 | ||||
| -rw-r--r-- | yt/yt/core/http/helpers.cpp | 2 | ||||
| -rw-r--r-- | yt/yt/core/http/http.h | 5 | ||||
| -rw-r--r-- | yt/yt/core/http/mock/http.h | 4 | ||||
| -rw-r--r-- | yt/yt/core/http/private.h | 8 | ||||
| -rw-r--r-- | yt/yt/core/http/public.h | 2 | ||||
| -rw-r--r-- | yt/yt/core/http/server.cpp | 34 | ||||
| -rw-r--r-- | yt/yt/core/http/stream.cpp | 20 | ||||
| -rw-r--r-- | yt/yt/core/http/stream.h | 16 | ||||
| -rw-r--r-- | yt/yt/core/http/unittests/http_ut.cpp | 13 | ||||
| -rw-r--r-- | yt/yt/core/net/connection.cpp | 47 | ||||
| -rw-r--r-- | yt/yt/core/net/connection.h | 6 | ||||
| -rw-r--r-- | yt/yt/core/net/dialer.cpp | 54 | ||||
| -rw-r--r-- | yt/yt/core/net/dialer.h | 10 | ||||
| -rw-r--r-- | yt/yt/core/net/public.h | 2 |
21 files changed, 260 insertions, 198 deletions
diff --git a/yt/yt/core/bus/tcp/connection.cpp b/yt/yt/core/bus/tcp/connection.cpp index b71fbb9363b..83faf6cb42b 100644 --- a/yt/yt/core/bus/tcp/connection.cpp +++ b/yt/yt/core/bus/tcp/connection.cpp @@ -621,18 +621,18 @@ void TTcpConnection::ConnectSocket(const TNetworkAddress& address) DialerSession_->Dial(); } -void TTcpConnection::OnDialerFinished(const TErrorOr<SOCKET>& socketOrError) +void TTcpConnection::OnDialerFinished(const TErrorOr<TFileDescriptor>& fdOrError) { YT_LOG_DEBUG("Dialer finished"); DialerSession_.Reset(); - if (!socketOrError.IsOK()) { + if (!fdOrError.IsOK()) { Abort(TError( NBus::EErrorCode::TransportError, "Error connecting to %v", EndpointDescription_) - << socketOrError); + << fdOrError); return; } @@ -643,7 +643,7 @@ void TTcpConnection::OnDialerFinished(const TErrorOr<SOCKET>& socketOrError) return; } - Socket_ = socketOrError.Value(); + Socket_ = fdOrError.Value(); auto tosLevel = TosLevel_.load(); if (tosLevel != DefaultTosLevel) { diff --git a/yt/yt/core/bus/tcp/connection.h b/yt/yt/core/bus/tcp/connection.h index d5d631f3b22..2948d20b676 100644 --- a/yt/yt/core/bus/tcp/connection.h +++ b/yt/yt/core/bus/tcp/connection.h @@ -297,7 +297,7 @@ private: int GetSocketPort(); void ConnectSocket(const NNet::TNetworkAddress& address); - void OnDialerFinished(const TErrorOr<SOCKET>& socketOrError); + void OnDialerFinished(const TErrorOr<TFileDescriptor>& fdOrError); void ResolveAddress(); void OnAddressResolveFinished(const TErrorOr<NNet::TNetworkAddress>& result); diff --git a/yt/yt/core/crypto/tls.cpp b/yt/yt/core/crypto/tls.cpp index ee3d1399b62..38779e21a4c 100644 --- a/yt/yt/core/crypto/tls.cpp +++ b/yt/yt/core/crypto/tls.cpp @@ -57,7 +57,7 @@ constexpr auto TlsBufferSize = 1_MB; struct TSslContextImpl : public TRefCounted { - SSL_CTX* Ctx = nullptr; + SSL_CTX* Context = nullptr; TSslContextImpl() { @@ -66,36 +66,36 @@ struct TSslContextImpl ~TSslContextImpl() { - if (Ctx) { - SSL_CTX_free(Ctx); + if (Context) { + SSL_CTX_free(Context); } - if (ActiveCtx_) { - SSL_CTX_free(ActiveCtx_); + if (ActiveContext_) { + SSL_CTX_free(ActiveContext_); } } void Reset() { - if (Ctx) { - SSL_CTX_free(Ctx); + if (Context) { + SSL_CTX_free(Context); } #if OPENSSL_VERSION_NUMBER >= 0x10100000L - Ctx = SSL_CTX_new(TLS_method()); - if (!Ctx) { + Context = SSL_CTX_new(TLS_method()); + if (!Context) { THROW_ERROR_EXCEPTION("SSL_CTX_new(TLS_method()) failed") << GetSslErrors(); } - if (SSL_CTX_set_min_proto_version(Ctx, TLS1_2_VERSION) == 0) { + if (SSL_CTX_set_min_proto_version(Context, TLS1_2_VERSION) == 0) { THROW_ERROR_EXCEPTION("SSL_CTX_set_min_proto_version failed") << GetSslErrors(); } - if (SSL_CTX_set_max_proto_version(Ctx, TLS1_2_VERSION) == 0) { + if (SSL_CTX_set_max_proto_version(Context, TLS1_2_VERSION) == 0) { THROW_ERROR_EXCEPTION("SSL_CTX_set_max_proto_version failed") << GetSslErrors(); } #else Ctx = SSL_CTX_new(TLSv1_2_method()); - if (!Ctx) { + if (!Context) { THROW_ERROR_EXCEPTION("SSL_CTX_new(TLSv1_2_method()) failed") << GetSslErrors(); } @@ -104,17 +104,17 @@ struct TSslContextImpl void Commit(TInstant time) { - SSL_CTX* oldCtx; - YT_ASSERT(Ctx); + SSL_CTX* oldContext; + YT_ASSERT(Context); { auto guard = WriterGuard(Lock_); - oldCtx = ActiveCtx_; - ActiveCtx_ = Ctx; - Ctx = nullptr; + oldContext = ActiveContext_; + ActiveContext_ = Context; + Context = nullptr; CommitTime_ = time; } - if (oldCtx) { - SSL_CTX_free(oldCtx); + if (oldContext) { + SSL_CTX_free(oldContext); } } @@ -127,19 +127,19 @@ struct TSslContextImpl SSL* NewSsl() { auto guard = ReaderGuard(Lock_); - YT_ASSERT(ActiveCtx_); - return SSL_new(ActiveCtx_); + YT_ASSERT(ActiveContext_); + return SSL_new(ActiveContext_); } bool IsActive(const SSL* ssl) { auto guard = ReaderGuard(Lock_); - return SSL_get_SSL_CTX(ssl) == ActiveCtx_; + return SSL_get_SSL_CTX(ssl) == ActiveContext_; } private: YT_DECLARE_SPIN_LOCK(NThreading::TReaderWriterSpinLock, Lock_); - SSL_CTX* ActiveCtx_ = nullptr; + SSL_CTX* ActiveContext_ = nullptr; TInstant CommitTime_; }; @@ -155,14 +155,14 @@ class TTlsConnection { public: TTlsConnection( - TSslContextImplPtr ctx, + TSslContextImplPtr context, IPollerPtr poller, IConnectionPtr connection) - : Ctx_(std::move(ctx)) + : Context_(std::move(context)) , Invoker_(CreateSerializedInvoker(poller->GetInvoker(), "crypto_tls_connection")) , Underlying_(std::move(connection)) { - Ssl_ = Ctx_->NewSsl(); + Ssl_ = Context_->NewSsl(); if (!Ssl_) { THROW_ERROR_EXCEPTION("SSL_new failed") << GetSslErrors(); @@ -229,14 +229,19 @@ public: return Underlying_->GetWriteByteCount(); } - const TNetworkAddress& LocalAddress() const override + TConnectionId GetId() const override + { + return Underlying_->GetId(); + } + + const TNetworkAddress& GetLocalAddress() const override { - return Underlying_->LocalAddress(); + return Underlying_->GetLocalAddress(); } - const TNetworkAddress& RemoteAddress() const override + const TNetworkAddress& GetRemoteAddress() const override { - return Underlying_->RemoteAddress(); + return Underlying_->GetRemoteAddress(); } TConnectionStatistics GetWriteStatistics() const override @@ -356,7 +361,7 @@ public: } private: - const TSslContextImplPtr Ctx_; + const TSslContextImplPtr Context_; const IInvokerPtr Invoker_; const IConnectionPtr Underlying_; @@ -570,18 +575,18 @@ public: TSslContextImplPtr ctx, IDialerPtr dialer, IPollerPtr poller) - : Ctx_(std::move(ctx)) + : Context_(std::move(ctx)) , Underlying_(std::move(dialer)) , Poller_(std::move(poller)) { } - TFuture<IConnectionPtr> Dial(const TNetworkAddress& remote, TDialerContextPtr context) override + TFuture<IConnectionPtr> Dial(const TNetworkAddress& remoteAddress, TDialerContextPtr context) override { - return Underlying_->Dial(remote) - .Apply(BIND([ctx = Ctx_, poller = Poller_, context = std::move(context)] (const IConnectionPtr& underlying) -> IConnectionPtr { + return Underlying_->Dial(remoteAddress) + .Apply(BIND([ctx = Context_, poller = Poller_, context = std::move(context)] (const IConnectionPtr& underlying) -> IConnectionPtr { auto connection = New<TTlsConnection>(ctx, poller, underlying); - if (context != nullptr && context->Host != std::nullopt) { - connection->SetHost(*(context->Host)); + if (context && context->Host) { + connection->SetHost(*context->Host); } connection->StartClient(); return connection; @@ -589,7 +594,7 @@ public: } private: - const TSslContextImplPtr Ctx_; + const TSslContextImplPtr Context_; const IDialerPtr Underlying_; const IPollerPtr Poller_; }; @@ -658,12 +663,12 @@ TInstant TSslContext::GetCommitTime() const void TSslContext::UseBuiltinOpenSslX509Store() { - SSL_CTX_set_cert_store(Impl_->Ctx, GetBuiltinOpenSslX509Store().Release()); + SSL_CTX_set_cert_store(Impl_->Context, GetBuiltinOpenSslX509Store().Release()); } void TSslContext::SetCipherList(const TString& list) { - if (SSL_CTX_set_cipher_list(Impl_->Ctx, list.data()) == 0) { + if (SSL_CTX_set_cipher_list(Impl_->Context, list.data()) == 0) { THROW_ERROR_EXCEPTION("SSL_CTX_set_cipher_list failed") << TErrorAttribute("cipher_list", list) << GetSslErrors(); @@ -672,7 +677,7 @@ void TSslContext::SetCipherList(const TString& list) void TSslContext::AddCertificateFromFile(const TString& path) { - if (SSL_CTX_use_certificate_file(Impl_->Ctx, path.c_str(), SSL_FILETYPE_PEM) != 1) { + if (SSL_CTX_use_certificate_file(Impl_->Context, path.c_str(), SSL_FILETYPE_PEM) != 1) { THROW_ERROR_EXCEPTION("SSL_CTX_use_certificate_file failed") << TErrorAttribute("path", path) << GetSslErrors(); @@ -681,7 +686,7 @@ void TSslContext::AddCertificateFromFile(const TString& path) void TSslContext::AddCertificateChainFromFile(const TString& path) { - if (SSL_CTX_use_certificate_chain_file(Impl_->Ctx, path.c_str()) != 1) { + if (SSL_CTX_use_certificate_chain_file(Impl_->Context, path.c_str()) != 1) { THROW_ERROR_EXCEPTION("SSL_CTX_use_certificate_chain_file failed") << TErrorAttribute("path", path) << GetSslErrors(); @@ -690,7 +695,7 @@ void TSslContext::AddCertificateChainFromFile(const TString& path) void TSslContext::AddPrivateKeyFromFile(const TString& path) { - if (SSL_CTX_use_PrivateKey_file(Impl_->Ctx, path.c_str(), SSL_FILETYPE_PEM) != 1) { + if (SSL_CTX_use_PrivateKey_file(Impl_->Context, path.c_str(), SSL_FILETYPE_PEM) != 1) { THROW_ERROR_EXCEPTION("SSL_CTX_use_PrivateKey_file failed") << TErrorAttribute("path", path) << GetSslErrors(); @@ -714,12 +719,12 @@ void TSslContext::AddCertificateChain(const TString& certificateChain) X509_free(certificateObject); }); - if (SSL_CTX_use_certificate(Impl_->Ctx, certificateObject) != 1) { + if (SSL_CTX_use_certificate(Impl_->Context, certificateObject) != 1) { THROW_ERROR_EXCEPTION("SSL_CTX_use_certificate failed") << GetSslErrors(); } - SSL_CTX_clear_chain_certs(Impl_->Ctx); + SSL_CTX_clear_chain_certs(Impl_->Context); while (true) { auto chainCertificateObject = PEM_read_bio_X509(bio, nullptr, nullptr, nullptr); if (!chainCertificateObject) { @@ -733,7 +738,7 @@ void TSslContext::AddCertificateChain(const TString& certificateChain) << GetSslErrors(); } - int result = SSL_CTX_add0_chain_cert(Impl_->Ctx, chainCertificateObject); + int result = SSL_CTX_add0_chain_cert(Impl_->Context, chainCertificateObject); if (!result) { X509_free(chainCertificateObject); THROW_ERROR_EXCEPTION("SSL_CTX_add0_chain_cert") @@ -759,7 +764,7 @@ void TSslContext::AddCertificate(const TString& certificate) X509_free(certificateObject); }); - if (SSL_CTX_use_certificate(Impl_->Ctx, certificateObject) != 1) { + if (SSL_CTX_use_certificate(Impl_->Context, certificateObject) != 1) { THROW_ERROR_EXCEPTION("SSL_CTX_use_certificate failed") << GetSslErrors(); } @@ -782,7 +787,7 @@ void TSslContext::AddPrivateKey(const TString& privateKey) EVP_PKEY_free(privateKeyObject); }); - if (SSL_CTX_use_PrivateKey(Impl_->Ctx, privateKeyObject) != 1) { + if (SSL_CTX_use_PrivateKey(Impl_->Context, privateKeyObject) != 1) { THROW_ERROR_EXCEPTION("SSL_CTX_use_PrivateKey failed") << GetSslErrors(); } diff --git a/yt/yt/core/http/client.cpp b/yt/yt/core/http/client.cpp index dc7bf3077df..d5e072025ef 100644 --- a/yt/yt/core/http/client.cpp +++ b/yt/yt/core/http/client.cpp @@ -12,6 +12,7 @@ #include <yt/yt/core/net/connection.h> #include <yt/yt/core/concurrency/poller.h> + #include <util/string/cast.h> namespace NYT::NHttp { @@ -26,13 +27,13 @@ class TClient { public: TClient( - const TClientConfigPtr& config, - const IDialerPtr& dialer, - const IInvokerPtr& invoker) - : Config_(config) - , Dialer_(dialer) - , Invoker_(invoker) - , ConnectionPool_(New<TConnectionPool>(dialer, config, invoker)) + TClientConfigPtr config, + IDialerPtr dialer, + IInvokerPtr invoker) + : Config_(std::move(config)) + , Dialer_(std::move(dialer)) + , Invoker_(std::move(invoker)) + , ConnectionPool_(New<TConnectionPool>(Dialer_, Config_, Invoker_)) { } TFuture<IResponsePtr> Get( @@ -109,7 +110,7 @@ private: const TClientConfigPtr Config_; const IDialerPtr Dialer_; const IInvokerPtr Invoker_; - TConnectionPoolPtr ConnectionPool_; + const TConnectionPoolPtr ConnectionPool_; static int GetDefaultPort(const TUrlRef& parsedUrl) { @@ -125,9 +126,8 @@ private: auto host = parsedUrl.Host; TNetworkAddress address; - auto tryIP = TNetworkAddress::TryParse(host); - if (tryIP.IsOK()) { - address = tryIP.Value(); + if (auto ipOrError = TNetworkAddress::TryParse(host); ipOrError.IsOK()) { + address = ipOrError.Value(); } else { auto asyncAddress = TAddressResolver::Get()->Resolve(ToString(host)); address = WaitFor(asyncAddress) @@ -137,15 +137,17 @@ private: return TNetworkAddress(address, parsedUrl.Port.value_or(GetDefaultPort(parsedUrl))); } - std::pair<THttpOutputPtr, THttpInputPtr> OpenHttp(const TUrlRef& urlRef) + std::pair<THttpOutputPtr, THttpInputPtr> Connect(const TUrlRef& urlRef) { auto context = New<TDialerContext>(); context->Host = urlRef.Host; + auto address = GetAddress(urlRef); // TODO(aleexfi): Enable connection pool by default if (Config_->MaxIdleConnections == 0) { - auto connection = WaitFor(Dialer_->Dial(address, std::move(context))).ValueOrThrow(); + auto connection = WaitFor(Dialer_->Dial(address, std::move(context))) + .ValueOrThrow(); auto input = New<THttpInput>( connection, @@ -161,9 +163,10 @@ private: return {std::move(output), std::move(input)}; } else { - auto connection = WaitFor(ConnectionPool_->Connect(address, std::move(context))).ValueOrThrow(); + auto connection = WaitFor(ConnectionPool_->Connect(address, std::move(context))) + .ValueOrThrow(); - auto reuseSharedState = New<NDetail::TReusableConnectionState>(connection, ConnectionPool_); + auto reusableState = New<NDetail::TReusableConnectionState>(connection, ConnectionPool_); auto input = New<NDetail::TConnectionReuseWrapper<THttpInput>>( connection, @@ -171,13 +174,13 @@ private: Invoker_, EMessageType::Response, Config_); - input->SetReusableState(reuseSharedState); + input->SetReusableState(reusableState); auto output = New<NDetail::TConnectionReuseWrapper<THttpOutput>>( connection, EMessageType::Request, Config_); - output->SetReusableState(reuseSharedState); + output->SetReusableState(reusableState); return {std::move(output), std::move(input)}; } @@ -186,7 +189,7 @@ private: template <typename T> TFuture<T> WrapError(const TString& url, TCallback<T()> action) { - return BIND([=, this_ = MakeStrong(this)] { + return BIND([=, this_ = MakeStrong(this), action = std::move(action)] { try { return action(); } catch (const std::exception& ex) { @@ -223,7 +226,7 @@ private: // Waits for response headers internally. Response_->GetStatusCode(); - return IResponsePtr{Response_}; + return IResponsePtr(Response_); })); } @@ -238,10 +241,10 @@ private: } private: - THttpOutputPtr Request_; - THttpInputPtr Response_; - TIntrusivePtr<TClient> Client_; - TString Url_; + const THttpOutputPtr Request_; + const THttpInputPtr Response_; + const TIntrusivePtr<TClient> Client_; + const TString Url_; }; std::pair<THttpOutputPtr, THttpInputPtr> StartAndWriteHeaders( @@ -254,7 +257,7 @@ private: auto urlRef = ParseUrl(url); - std::tie(request, response) = OpenHttp(urlRef); + std::tie(request, response) = Connect(urlRef); request->SetHost(urlRef.Host, urlRef.PortStr); if (headers) { @@ -276,7 +279,7 @@ private: { return WrapError(url, BIND([=, this, this_ = MakeStrong(this)] { auto [request, response] = StartAndWriteHeaders(method, url, headers); - return IActiveRequestPtr{New<TActiveRequest>(request, response, this_, url)}; + return IActiveRequestPtr(New<TActiveRequest>(request, response, this_, url)); })); } @@ -303,28 +306,32 @@ private: return DoRequest(method, *redirectUrl, body, headers, redirectCount + 1); } - return IResponsePtr(response); + return response; } }; //////////////////////////////////////////////////////////////////////////////// IClientPtr CreateClient( - const TClientConfigPtr& config, - const IDialerPtr& dialer, - const IInvokerPtr& invoker) + TClientConfigPtr config, + IDialerPtr dialer, + IInvokerPtr invoker) { - return New<TClient>(config, dialer, invoker); + return New<TClient>( + std::move(config), + std::move(dialer), + std::move(invoker)); } IClientPtr CreateClient( - const TClientConfigPtr& config, - const IPollerPtr& poller) + TClientConfigPtr config, + IPollerPtr poller) { + auto invoker = poller->GetInvoker(); return CreateClient( - config, - CreateDialer(New<TDialerConfig>(), poller, HttpLogger()), - poller->GetInvoker()); + std::move(config), + CreateDialer(New<TDialerConfig>(), std::move(poller), HttpLogger()), + std::move(invoker)); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/http/client.h b/yt/yt/core/http/client.h index a75df31142e..27457626332 100644 --- a/yt/yt/core/http/client.h +++ b/yt/yt/core/http/client.h @@ -78,12 +78,12 @@ DEFINE_REFCOUNTED_TYPE(IClient) //////////////////////////////////////////////////////////////////////////////// IClientPtr CreateClient( - const TClientConfigPtr& config, - const NNet::IDialerPtr& dialer, - const IInvokerPtr& invoker); + TClientConfigPtr config, + NNet::IDialerPtr dialer, + IInvokerPtr invoker); IClientPtr CreateClient( - const TClientConfigPtr& config, - const NConcurrency::IPollerPtr& poller); + TClientConfigPtr config, + NConcurrency::IPollerPtr poller); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/http/connection_pool.cpp b/yt/yt/core/http/connection_pool.cpp index a14891164c1..24dc08d5b23 100644 --- a/yt/yt/core/http/connection_pool.cpp +++ b/yt/yt/core/http/connection_pool.cpp @@ -1,5 +1,7 @@ #include "connection_pool.h" +#include "private.h" + #include <yt/yt/core/concurrency/periodic_executor.h> #include <yt/yt/core/net/connection.h> @@ -11,6 +13,10 @@ using namespace NConcurrency; //////////////////////////////////////////////////////////////////////////////// +static constexpr auto& Logger = HttpLogger; + +//////////////////////////////////////////////////////////////////////////////// + TDuration TIdleConnection::GetIdleTime() const { return TInstant::Now() - InsertionTime; @@ -56,7 +62,11 @@ TFuture<IConnectionPtr> TConnectionPool::Connect( while (auto item = Connections_.Extract(address)) { if (item->GetIdleTime() < Config_->ConnectionIdleTimeout && item->IsOK()) { - return MakeFuture<IConnectionPtr>(std::move(item->Connection)); + auto&& connection = item->Connection; + YT_LOG_DEBUG("Connection is extracted from cache (Address: %v, ConnectionId: %v)", + address, + connection->GetId()); + return MakeFuture<IConnectionPtr>(std::move(connection)); } } } @@ -66,21 +76,28 @@ TFuture<IConnectionPtr> TConnectionPool::Connect( void TConnectionPool::Release(const IConnectionPtr& connection) { + YT_LOG_DEBUG("Connection is put to cache (Address: %v, ConnectionId: %v)", + connection->GetRemoteAddress(), + connection->GetId()); + auto guard = Guard(SpinLock_); - Connections_.Insert(connection->RemoteAddress(), {connection, TInstant::Now()}); + Connections_.Insert(connection->GetRemoteAddress(), {connection, TInstant::Now()}); } void TConnectionPool::DropExpiredConnections() { auto guard = Guard(SpinLock_); - TMultiLruCache<TNetworkAddress, TIdleConnection> validConnections( - Config_->MaxIdleConnections); + decltype(Connections_) validConnections(Config_->MaxIdleConnections); while (Connections_.GetSize() > 0) { auto idleConnection = Connections_.Pop(); if (idleConnection.GetIdleTime() < Config_->ConnectionIdleTimeout && idleConnection.IsOK()) { - validConnections.Insert(idleConnection.Connection->RemoteAddress(), idleConnection); + validConnections.Insert(idleConnection.Connection->GetRemoteAddress(), idleConnection); + } else { + YT_LOG_DEBUG("Connection expired from cache (Address: %v, ConnectionId: %v)", + idleConnection.Connection->GetRemoteAddress(), + idleConnection.Connection->GetId()); } } diff --git a/yt/yt/core/http/connection_reuse_helpers.h b/yt/yt/core/http/connection_reuse_helpers.h index bb3762bb6d6..5d7039ebad8 100644 --- a/yt/yt/core/http/connection_reuse_helpers.h +++ b/yt/yt/core/http/connection_reuse_helpers.h @@ -1,6 +1,6 @@ #pragma once -#include "public.h" +#include "private.h" #include <yt/yt/core/net/public.h> @@ -9,7 +9,7 @@ namespace NYT::NHttp::NDetail { //////////////////////////////////////////////////////////////////////////////// //! Responsible for returning the connection to the owning pool -//! if it could be reused +//! if it could be reused. struct TReusableConnectionState final { std::atomic<bool> Reusable = true; @@ -20,10 +20,12 @@ struct TReusableConnectionState final ~TReusableConnectionState(); }; -using TReusableConnectionStatePtr = TIntrusivePtr<TReusableConnectionState>; +DEFINE_REFCOUNTED_TYPE(TReusableConnectionState) + +//////////////////////////////////////////////////////////////////////////////// //! Reports to the shared state whether the connection could be reused -//! (by calling T::IsSafeToReuse() in the destructor) +//! (by calling T::IsSafeToReuse() in the destructor). template <class T> class TConnectionReuseWrapper : public T diff --git a/yt/yt/core/http/helpers.cpp b/yt/yt/core/http/helpers.cpp index 96a9c672450..e5bead9511b 100644 --- a/yt/yt/core/http/helpers.cpp +++ b/yt/yt/core/http/helpers.cpp @@ -413,7 +413,7 @@ bool TryParseTraceParent(const TString& traceParent, NTracing::TSpanContext& spa // Now we have exactly three parts: traceId-spanId-options. // Parse trace context. - if (!TGuid::FromStringHex32(parts[0], &spanContext.TraceId)) { + if (!NTracing::TTraceId::FromStringHex32(parts[0], &spanContext.TraceId)) { return false; } diff --git a/yt/yt/core/http/http.h b/yt/yt/core/http/http.h index afd6c061f61..0de762c235e 100644 --- a/yt/yt/core/http/http.h +++ b/yt/yt/core/http/http.h @@ -233,8 +233,9 @@ struct IRequest virtual const NNet::TNetworkAddress& GetRemoteAddress() const = 0; - virtual TGuid GetConnectionId() const = 0; - virtual TGuid GetRequestId() const = 0; + virtual TConnectionId GetConnectionId() const = 0; + virtual TRequestId GetRequestId() const = 0; + virtual i64 GetReadByteCount() const = 0; virtual TInstant GetStartTime() const = 0; diff --git a/yt/yt/core/http/mock/http.h b/yt/yt/core/http/mock/http.h index 27e62e561fb..84ee0356ea8 100644 --- a/yt/yt/core/http/mock/http.h +++ b/yt/yt/core/http/mock/http.h @@ -21,8 +21,8 @@ public: MOCK_METHOD(const TUrlRef&, GetUrl, (), (override)); MOCK_METHOD(const THeadersPtr&, GetHeaders, (), (override)); MOCK_METHOD(const NNet::TNetworkAddress&, GetRemoteAddress, (), (const, override)); - MOCK_METHOD(TGuid, GetConnectionId, (), (const, override)); - MOCK_METHOD(TGuid, GetRequestId, (), (const, override)); + MOCK_METHOD(TConnectionId, GetConnectionId, (), (const, override)); + MOCK_METHOD(TRequestId, GetRequestId, (), (const, override)); MOCK_METHOD(i64, GetReadByteCount, (), (const, override)); MOCK_METHOD(TInstant, GetStartTime, (), (const, override)); MOCK_METHOD(int, GetPort, (), (const, override)); diff --git a/yt/yt/core/http/private.h b/yt/yt/core/http/private.h index 691d022bbd9..a1563033503 100644 --- a/yt/yt/core/http/private.h +++ b/yt/yt/core/http/private.h @@ -20,4 +20,12 @@ DECLARE_REFCOUNTED_CLASS(THttpOutput) //////////////////////////////////////////////////////////////////////////////// +namespace NDetail { + +DECLARE_REFCOUNTED_STRUCT(TReusableConnectionState) + +} // namespace NDetail + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT::NHttp diff --git a/yt/yt/core/http/public.h b/yt/yt/core/http/public.h index 74f0e8d29d6..b904f89675c 100644 --- a/yt/yt/core/http/public.h +++ b/yt/yt/core/http/public.h @@ -33,6 +33,8 @@ DECLARE_REFCOUNTED_CLASS(TSharedRefOutputStream) //////////////////////////////////////////////////////////////////////////////// using TContentEncoding = TString; +using TConnectionId = TGuid; +using TRequestId = TGuid; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/http/server.cpp b/yt/yt/core/http/server.cpp index 74648033071..ae7df4b6204 100644 --- a/yt/yt/core/http/server.cpp +++ b/yt/yt/core/http/server.cpp @@ -165,20 +165,19 @@ private: ConnectionsDropped_.Increment(); ActiveConnections_--; YT_LOG_WARNING("Server is over max active connection limit (RemoteAddress: %v)", - connection->RemoteAddress()); + connection->GetRemoteAddress()); return; } ConnectionsActive_.Update(count); ConnectionsAccepted_.Increment(); - auto connectionId = TGuid::Create(); YT_LOG_DEBUG("Connection accepted (ConnectionId: %v, RemoteAddress: %v, LocalAddress: %v)", - connectionId, - connection->RemoteAddress(), - connection->LocalAddress()); + connection->GetId(), + connection->GetRemoteAddress(), + connection->GetLocalAddress()); Invoker_->Invoke( - BIND(&TServer::HandleConnection, MakeStrong(this), std::move(connection), connectionId)); + BIND(&TServer::HandleConnection, MakeStrong(this), std::move(connection))); } bool HandleRequest(const THttpInputPtr& request, const THttpOutputPtr& response) @@ -271,10 +270,10 @@ private: return true; } - void HandleConnection(const IConnectionPtr& connection, TGuid connectionId) + void HandleConnection(const IConnectionPtr& connection) { try { - connection->SubscribePeerDisconnect(BIND([config = Config_, canceler = GetCurrentFiberCanceler(), connectionId = connectionId] { + connection->SubscribePeerDisconnect(BIND([config = Config_, canceler = GetCurrentFiberCanceler(), connectionId = connection->GetId()] { YT_LOG_DEBUG("Client closed TCP socket (ConnectionId: %v)", connectionId); if (config->CancelFiberOnConnectionClose.value_or(false)) { @@ -291,17 +290,17 @@ private: connection->SetNoDelay(); } - DoHandleConnection(connection, connectionId); + DoHandleConnection(connection); } catch (const std::exception& ex) { - YT_LOG_ERROR(ex, "Unhandled exception (ConnectionId: %v)", connectionId); + YT_LOG_ERROR(ex, "Unhandled exception (ConnectionId: %v)", connection->GetId()); } } - void DoHandleConnection(const IConnectionPtr& connection, TGuid connectionId) + void DoHandleConnection(const IConnectionPtr& connection) { auto request = New<THttpInput>( connection, - connection->RemoteAddress(), + connection->GetRemoteAddress(), GetCurrentInvoker(), EMessageType::Request, Config_); @@ -317,11 +316,8 @@ private: EMessageType::Response, Config_); - request->SetConnectionId(connectionId); - response->SetConnectionId(connectionId); - while (true) { - auto requestId = TGuid::Create(); + auto requestId = TRequestId::Create(); request->SetRequestId(requestId); response->SetRequestId(requestId); @@ -332,7 +328,7 @@ private: auto logDrop = [&] (auto reason) { YT_LOG_DEBUG("Dropping HTTP connection (ConnectionId: %v, Reason: %v)", - connectionId, + connection->GetId(), reason); }; @@ -381,10 +377,10 @@ private: auto connectionResult = WaitFor(connection->Close()); if (connectionResult.IsOK()) { YT_LOG_DEBUG("HTTP connection closed (ConnectionId: %v)", - connectionId); + connection->GetId()); } else { YT_LOG_DEBUG(connectionResult, "Error closing HTTP connection (ConnectionId: %v)", - connectionId); + connection->GetId()); } } }; diff --git a/yt/yt/core/http/stream.cpp b/yt/yt/core/http/stream.cpp index db16ffdf53a..632e035a4c9 100644 --- a/yt/yt/core/http/stream.cpp +++ b/yt/yt/core/http/stream.cpp @@ -295,22 +295,17 @@ const TNetworkAddress& THttpInput::GetRemoteAddress() const return RemoteAddress_; } -TGuid THttpInput::GetConnectionId() const +TConnectionId THttpInput::GetConnectionId() const { - return ConnectionId_; + return Connection_->GetId(); } -void THttpInput::SetConnectionId(TGuid connectionId) -{ - ConnectionId_ = connectionId; -} - -TGuid THttpInput::GetRequestId() const +TRequestId THttpInput::GetRequestId() const { return RequestId_; } -void THttpInput::SetRequestId(TGuid requestId) +void THttpInput::SetRequestId(TRequestId requestId) { RequestId_ = requestId; } @@ -633,12 +628,7 @@ void THttpOutput::Reset() Trailers_.Reset(); } -void THttpOutput::SetConnectionId(TGuid connectionId) -{ - ConnectionId_ = connectionId; -} - -void THttpOutput::SetRequestId(TGuid requestId) +void THttpOutput::SetRequestId(TRequestId requestId) { RequestId_ = requestId; } diff --git a/yt/yt/core/http/stream.h b/yt/yt/core/http/stream.h index b01da702f16..9efcea7f7a0 100644 --- a/yt/yt/core/http/stream.h +++ b/yt/yt/core/http/stream.h @@ -105,11 +105,10 @@ public: const NNet::TNetworkAddress& GetRemoteAddress() const override; - TGuid GetConnectionId() const override; - void SetConnectionId(TGuid connectionId); + TConnectionId GetConnectionId() const override; - TGuid GetRequestId() const override; - void SetRequestId(TGuid requestId); + TRequestId GetRequestId() const override; + void SetRequestId(TRequestId requestId); i64 GetReadByteCount() const override; @@ -149,8 +148,7 @@ private: THeadersPtr Headers_; // Debug. - TGuid ConnectionId_; - TGuid RequestId_; + TRequestId RequestId_; i64 StartByteCount_ = 0; NNet::TConnectionStatistics StartStatistics_; TInstant LastProgressLogTime_; @@ -215,8 +213,7 @@ public: bool IsSafeToReuse() const; void Reset(); - void SetConnectionId(TGuid connectionId); - void SetRequestId(TGuid requestId); + void SetRequestId(TRequestId requestId); i64 GetWriteByteCount() const override; @@ -228,8 +225,7 @@ private: TClosure OnWriteFinish_; //! Debugging. - TGuid ConnectionId_; - TGuid RequestId_; + TRequestId RequestId_; i64 StartByteCount_ = 0; NNet::TConnectionStatistics StartStatistics_; bool HeadersLogged_ = false; diff --git a/yt/yt/core/http/unittests/http_ut.cpp b/yt/yt/core/http/unittests/http_ut.cpp index e5bc967d730..cda7dea65f8 100644 --- a/yt/yt/core/http/unittests/http_ut.cpp +++ b/yt/yt/core/http/unittests/http_ut.cpp @@ -159,6 +159,11 @@ struct TFakeConnection TString Input; TString Output; + TConnectionId GetId() const override + { + return {}; + } + bool SetNoDelay() override { return true; @@ -216,12 +221,12 @@ struct TFakeConnection THROW_ERROR_EXCEPTION("Not implemented"); } - const TNetworkAddress& LocalAddress() const override + const TNetworkAddress& GetLocalAddress() const override { THROW_ERROR_EXCEPTION("Not implemented"); } - const TNetworkAddress& RemoteAddress() const override + const TNetworkAddress& GetRemoteAddress() const override { THROW_ERROR_EXCEPTION("Not implemented"); } @@ -1149,7 +1154,7 @@ TEST_P(THttpServerTest, ConnectionKeepAlive) auto response = New<THttpInput>( connection, - connection->RemoteAddress(), + connection->GetRemoteAddress(), Poller->GetInvoker(), EMessageType::Response, New<THttpIOConfig>()); @@ -1183,7 +1188,7 @@ TEST_P(THttpServerTest, ConnectionKeepAlive) auto response = New<THttpInput>( connection, - connection->RemoteAddress(), + connection->GetRemoteAddress(), Poller->GetInvoker(), EMessageType::Response, New<THttpIOConfig>()); diff --git a/yt/yt/core/net/connection.cpp b/yt/yt/core/net/connection.cpp index c18c6195c49..fa0c88d4d29 100644 --- a/yt/yt/core/net/connection.cpp +++ b/yt/yt/core/net/connection.cpp @@ -628,8 +628,7 @@ public: TFuture<void> Close() { - auto error = TError("Connection closed") - << TErrorAttribute("connection", Name_); + auto error = AnnotateError(TError("Connection closed")); return AbortIO(error); } @@ -665,12 +664,17 @@ public: return future; } - const TNetworkAddress& LocalAddress() const + TConnectionId GetId() const + { + return Id_; + } + + const TNetworkAddress& GetLocalAddress() const { return LocalAddress_; } - const TNetworkAddress& RemoteAddress() const + const TNetworkAddress& GetRemoteAddress() const { return RemoteAddress_; } @@ -746,7 +750,8 @@ public: } private: - const TString Name_; + const TConnectionId Id_ = TConnectionId::Create(); + const TString Endpoint_; const TString LoggingTag_; const TNetworkAddress LocalAddress_; const TNetworkAddress RemoteAddress_; @@ -768,7 +773,8 @@ private: TString filePath, const IPollerPtr& poller, bool useDeliveryFence) - : Name_(Format("File{%v}", filePath)) + : Endpoint_(Format("File{%v}", filePath)) + , LoggingTag_(Format("ConnectionId: %v %v", Id_, Endpoint_)) , FD_(fd) , Poller_(std::move(poller)) , UseDeliveryFence_(useDeliveryFence) @@ -780,8 +786,8 @@ private: const TNetworkAddress& localAddress, const TNetworkAddress& remoteAddress, IPollerPtr poller) - : Name_(Format("FD{%v<->%v}", localAddress, remoteAddress)) - , LoggingTag_(Format("ConnectionId: %v", Name_)) + : Endpoint_(Format("FD{%v<->%v}", localAddress, remoteAddress)) + , LoggingTag_(Format("ConnectionId: %v %v", Id_, Endpoint_)) , LocalAddress_(localAddress) , RemoteAddress_(remoteAddress) , FD_(fd) @@ -894,6 +900,13 @@ private: TDelayedExecutorCookie ReadTimeoutCookie_; TDelayedExecutorCookie WriteTimeoutCookie_; + TError AnnotateError(const TError& error) const + { + return error + << TErrorAttribute("connection_id", Id_) + << TErrorAttribute("connection_endpoint", Endpoint_); + } + TFuture<void> DoWrite(const TSharedRef& data) { auto write = std::make_unique<TWriteOperation>(data); @@ -959,8 +972,7 @@ private: if (error.IsOK()) { if (direction->Operation) { - THROW_ERROR_EXCEPTION("Another IO operation is in progress") - << TErrorAttribute("connection", Name_); + THROW_ERROR(AnnotateError(TError("Another IO operation is in progress"))); } YT_VERIFY(!direction->Running); @@ -1010,7 +1022,7 @@ private: if (result.IsOK()) { direction->BytesTransferred += result.Value().ByteCount; } else { - result = result << TErrorAttribute("connection", Name_); + result = AnnotateError(result); } bool needUnregister = false; @@ -1154,14 +1166,19 @@ public: YT_UNUSED_FUTURE(Impl_->Abort(TError("Connection is abandoned"))); } - const TNetworkAddress& LocalAddress() const override + TConnectionId GetId() const override + { + return Impl_->GetId(); + } + + const TNetworkAddress& GetLocalAddress() const override { - return Impl_->LocalAddress(); + return Impl_->GetLocalAddress(); } - const TNetworkAddress& RemoteAddress() const override + const TNetworkAddress& GetRemoteAddress() const override { - return Impl_->RemoteAddress(); + return Impl_->GetRemoteAddress(); } int GetHandle() const override diff --git a/yt/yt/core/net/connection.h b/yt/yt/core/net/connection.h index 24b28872d5e..894b7482220 100644 --- a/yt/yt/core/net/connection.h +++ b/yt/yt/core/net/connection.h @@ -68,8 +68,10 @@ struct IConnection : public IConnectionReader , public IConnectionWriter { - virtual const TNetworkAddress& LocalAddress() const = 0; - virtual const TNetworkAddress& RemoteAddress() const = 0; + virtual TConnectionId GetId() const = 0; + + virtual const TNetworkAddress& GetLocalAddress() const = 0; + virtual const TNetworkAddress& GetRemoteAddress() const = 0; // Returns true if connection is not is failed state and has no // active IO operations. diff --git a/yt/yt/core/net/dialer.cpp b/yt/yt/core/net/dialer.cpp index d21b9b01402..5bc8d3db92d 100644 --- a/yt/yt/core/net/dialer.cpp +++ b/yt/yt/core/net/dialer.cpp @@ -1,6 +1,7 @@ #include "dialer.h" #include "connection.h" #include "config.h" +#include "private.h" #include <yt/yt/core/concurrency/pollable_detail.h> @@ -15,6 +16,10 @@ using namespace NConcurrency; //////////////////////////////////////////////////////////////////////////////// +static constexpr auto& Logger = NetLogger; + +//////////////////////////////////////////////////////////////////////////////// + class TDialSession : public TRefCounted { @@ -23,48 +28,55 @@ public: const TNetworkAddress& remoteAddress, const IAsyncDialerPtr& asyncDialer, IPollerPtr poller) - : Name_(Format("dialer[%v]", remoteAddress)) - , RemoteAddress_(remoteAddress) + : RemoteAddress_(remoteAddress) , Poller_(std::move(poller)) , Session_(asyncDialer->CreateSession( remoteAddress, BIND(&TDialSession::OnDialerFinished, MakeWeak(this)))) + { } + + TFuture<IConnectionPtr> Run() { + YT_LOG_DEBUG("Dial started (Address: %v)", + RemoteAddress_); + Session_->Dial(); Promise_.OnCanceled(BIND([this, this_ = MakeStrong(this)] (const TError& error) { Promise_.TrySet(TError(NYT::EErrorCode::Canceled, "Dial canceled") - << TErrorAttribute("dialer", Name_) + << TErrorAttribute("remote_address", ToString(RemoteAddress_)) << error); })); - } - TFuture<IConnectionPtr> GetFuture() const - { return Promise_.ToFuture(); } private: - const TString Name_; const TNetworkAddress RemoteAddress_; const IPollerPtr Poller_; const IAsyncDialerSessionPtr Session_; const TPromise<IConnectionPtr> Promise_ = NewPromise<IConnectionPtr>(); - void OnDialerFinished(const TErrorOr<SOCKET>& socketOrError) + void OnDialerFinished(const TErrorOr<TFileDescriptor>& fdOrError) { - if (socketOrError.IsOK()) { - auto socket = socketOrError.Value(); - Promise_.TrySet(CreateConnectionFromFD( - socket, - GetSocketName(socket), - RemoteAddress_, - Poller_)); - } else { - Promise_.TrySet(socketOrError - << TErrorAttribute("dialer", Name_)); + if (!fdOrError.IsOK()) { + Promise_.TrySet(TError("Dial failed") + << TErrorAttribute("remote_address", ToString(RemoteAddress_)) + << fdOrError); + return; } + + auto socket = fdOrError.Value(); + YT_LOG_DEBUG("Dial completed (Address: %v, FD: %v)", + RemoteAddress_, + socket); + + Promise_.TrySet(CreateConnectionFromFD( + socket, + GetSocketName(socket), + RemoteAddress_, + Poller_)); } }; @@ -86,14 +98,14 @@ public: { } TFuture<IConnectionPtr> Dial( - const TNetworkAddress& remote, + const TNetworkAddress& remoteAddress, TDialerContextPtr /*context*/) override { auto session = New<TDialSession>( - remote, + remoteAddress, AsyncDialer_, Poller_); - return session->GetFuture(); + return session->Run(); } private: diff --git a/yt/yt/core/net/dialer.h b/yt/yt/core/net/dialer.h index b48c782cf5e..ac145d28a8d 100644 --- a/yt/yt/core/net/dialer.h +++ b/yt/yt/core/net/dialer.h @@ -4,6 +4,8 @@ #include <yt/yt/core/net/address.h> +#include <yt/yt/core/misc/proc.h> + #include <library/cpp/yt/logging/public.h> #include <library/cpp/yt/memory/ref.h> @@ -14,8 +16,7 @@ namespace NYT::NNet { //////////////////////////////////////////////////////////////////////////////// -struct TDialerContext - : public TRefCounted +struct TDialerContext final { //! Host is used for TlsDialer. std::optional<TString> Host; @@ -24,12 +25,11 @@ struct TDialerContext DEFINE_REFCOUNTED_TYPE(TDialerContext) //! Dialer establishes connection to a (resolved) network address. - struct IDialer : public virtual TRefCounted { virtual TFuture<IConnectionPtr> Dial( - const TNetworkAddress& remote, + const TNetworkAddress& remoteAddress, TDialerContextPtr context = nullptr) = 0; }; @@ -43,7 +43,7 @@ IDialerPtr CreateDialer( //////////////////////////////////////////////////////////////////////////////// //! Async dialer notifies caller via callback for better performance. -using TAsyncDialerCallback = TCallback<void(const TErrorOr<SOCKET>&)>; +using TAsyncDialerCallback = TCallback<void(const TErrorOr<TFileDescriptor>&)>; //! Dialer session interface. //! Caller should hold a reference to a session until callback is called. diff --git a/yt/yt/core/net/public.h b/yt/yt/core/net/public.h index b28a2f65d21..fde7eea47a6 100644 --- a/yt/yt/core/net/public.h +++ b/yt/yt/core/net/public.h @@ -12,6 +12,8 @@ class TNetworkAddress; class TIP6Address; class TIP6Network; +using TConnectionId = TGuid; + DECLARE_REFCOUNTED_STRUCT(IConnection) DECLARE_REFCOUNTED_STRUCT(IPacketConnection) DECLARE_REFCOUNTED_STRUCT(IConnectionReader) |
