diff options
author | babenko <babenko@yandex-team.com> | 2024-09-30 22:50:52 +0300 |
---|---|---|
committer | babenko <babenko@yandex-team.com> | 2024-09-30 23:01:01 +0300 |
commit | d19bc73d3c420fac320c383e649321a318c56b82 (patch) | |
tree | 84de53a7b0451679447105c96ee3a5be9cf17677 /yt | |
parent | 803c95f77d7e098750be07c125e78f892ec7c169 (diff) | |
download | ydb-d19bc73d3c420fac320c383e649321a318c56b82.tar.gz |
Cosmetics and better logging HTTP input/output
commit_hash:78c959d8b1d506e78fa0d49b89525db73487a75c
Diffstat (limited to 'yt')
-rw-r--r-- | yt/yt/core/http/connection_pool.cpp | 65 | ||||
-rw-r--r-- | yt/yt/core/http/connection_pool.h | 25 | ||||
-rw-r--r-- | yt/yt/core/http/connection_reuse_helpers.h | 2 | ||||
-rw-r--r-- | yt/yt/core/http/stream.cpp | 143 | ||||
-rw-r--r-- | yt/yt/core/http/stream.h | 40 |
5 files changed, 157 insertions, 118 deletions
diff --git a/yt/yt/core/http/connection_pool.cpp b/yt/yt/core/http/connection_pool.cpp index 27e6965067..8c3c0fd4fa 100644 --- a/yt/yt/core/http/connection_pool.cpp +++ b/yt/yt/core/http/connection_pool.cpp @@ -17,12 +17,12 @@ static constexpr auto& Logger = HttpLogger; //////////////////////////////////////////////////////////////////////////////// -TDuration TIdleConnection::GetIdleTime() const +TDuration TConnectionPool::TPooledConnection::GetIdleTime() const { return TInstant::Now() - InsertionTime; } -bool TIdleConnection::IsOK() const +bool TConnectionPool::TPooledConnection::IsValid() const { return Connection->IsIdle(); } @@ -35,8 +35,8 @@ TConnectionPool::TConnectionPool( IInvokerPtr invoker) : Dialer_(std::move(dialer)) , Config_(std::move(config)) - , Connections_(Config_->MaxIdleConnections) - , ExpiredConnectionsCollector_( + , Cache_(Config_->MaxIdleConnections) + , ExpirationExecutor_( New<TPeriodicExecutor>( std::move(invoker), BIND(&TConnectionPool::DropExpiredConnections, MakeWeak(this)), @@ -44,13 +44,13 @@ TConnectionPool::TConnectionPool( Config_->ConnectionIdleTimeout))) { if (Config_->MaxIdleConnections > 0) { - ExpiredConnectionsCollector_->Start(); + ExpirationExecutor_->Start(); } } TConnectionPool::~TConnectionPool() { - YT_UNUSED_FUTURE(ExpiredConnectionsCollector_->Stop()); + YT_UNUSED_FUTURE(ExpirationExecutor_->Stop()); } TFuture<IConnectionPtr> TConnectionPool::Connect( @@ -59,12 +59,10 @@ TFuture<IConnectionPtr> TConnectionPool::Connect( { { auto guard = Guard(SpinLock_); - - while (auto item = Connections_.TryExtract(address)) { - if (item->GetIdleTime() < Config_->ConnectionIdleTimeout && item->IsOK()) { - auto&& connection = item->Connection; - YT_LOG_DEBUG("Connection is extracted from cache (Address: %v, ConnectionId: %v)", - address, + while (auto pooledConnection = Cache_.TryExtract(address)) { + if (CheckPooledConnection(*pooledConnection)) { + auto connection = std::move(pooledConnection->Connection); + YT_LOG_DEBUG("Connection is extracted from cache (ConnectionId: %v)", connection->GetId()); return MakeFuture<IConnectionPtr>(std::move(connection)); } @@ -76,32 +74,47 @@ TFuture<IConnectionPtr> TConnectionPool::Connect( void TConnectionPool::Release(const IConnectionPtr& connection) { - YT_LOG_DEBUG("Connection is put to cache (Address: %v, ConnectionId: %v)", - connection->GetRemoteAddress(), + YT_LOG_DEBUG("Connection is put to cache (ConnectionId: %v)", connection->GetId()); - auto guard = Guard(SpinLock_); - Connections_.Insert(connection->GetRemoteAddress(), {connection, TInstant::Now()}); + { + auto guard = Guard(SpinLock_); + Cache_.Insert(connection->GetRemoteAddress(), {connection, TInstant::Now()}); + } +} + +bool TConnectionPool::CheckPooledConnection(const TPooledConnection& pooledConnection) +{ + auto idleTime = pooledConnection.GetIdleTime(); + if (idleTime > Config_->ConnectionIdleTimeout) { + YT_LOG_DEBUG("Connection evicted from cache due to idle timeout (ConnectionId: %v)", + pooledConnection.Connection->GetId()); + return false; + } + + if (!pooledConnection.IsValid()) { + YT_LOG_DEBUG("Connection evicted from cache due to invalid state (ConnectionId: %v)", + pooledConnection.Connection->GetId()); + return false; + } + + return true; } void TConnectionPool::DropExpiredConnections() { auto guard = Guard(SpinLock_); - decltype(Connections_) validConnections(Config_->MaxIdleConnections); + decltype(Cache_) newCache(Config_->MaxIdleConnections); - while (Connections_.GetSize() > 0) { - auto idleConnection = Connections_.Pop(); - if (idleConnection.GetIdleTime() < Config_->ConnectionIdleTimeout && idleConnection.IsOK()) { - validConnections.Insert(idleConnection.Connection->GetRemoteAddress(), idleConnection); - } else { - YT_LOG_DEBUG("Connection expired from cache (Address: %v, ConnectionId: %v)", - idleConnection.Connection->GetRemoteAddress(), - idleConnection.Connection->GetId()); + while (Cache_.GetSize() > 0) { + auto pooledConnection = Cache_.Pop(); + if (CheckPooledConnection(pooledConnection)) { + newCache.Insert(pooledConnection.Connection->GetRemoteAddress(), pooledConnection); } } - Connections_ = std::move(validConnections); + Cache_ = std::move(newCache); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/http/connection_pool.h b/yt/yt/core/http/connection_pool.h index cacde74e14..d977f58efb 100644 --- a/yt/yt/core/http/connection_pool.h +++ b/yt/yt/core/http/connection_pool.h @@ -16,17 +16,6 @@ namespace NYT::NHttp { //////////////////////////////////////////////////////////////////////////////// -struct TIdleConnection -{ - NNet::IConnectionPtr Connection; - TInstant InsertionTime; - - TDuration GetIdleTime() const; - bool IsOK() const; -}; - -//////////////////////////////////////////////////////////////////////////////// - class TConnectionPool : public TRefCounted { @@ -48,10 +37,20 @@ private: const NNet::IDialerPtr Dialer_; const TClientConfigPtr Config_; + struct TPooledConnection + { + NNet::IConnectionPtr Connection; + TInstant InsertionTime; + + TDuration GetIdleTime() const; + bool IsValid() const; + }; + YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_); - TMultiLruCache<NNet::TNetworkAddress, TIdleConnection> Connections_; - NConcurrency::TPeriodicExecutorPtr ExpiredConnectionsCollector_; + TMultiLruCache<NNet::TNetworkAddress, TPooledConnection> Cache_; + NConcurrency::TPeriodicExecutorPtr ExpirationExecutor_; + bool CheckPooledConnection(const TPooledConnection& pooledConnection); void DropExpiredConnections(); }; diff --git a/yt/yt/core/http/connection_reuse_helpers.h b/yt/yt/core/http/connection_reuse_helpers.h index 5d7039ebad..fad3243117 100644 --- a/yt/yt/core/http/connection_reuse_helpers.h +++ b/yt/yt/core/http/connection_reuse_helpers.h @@ -33,7 +33,7 @@ class TConnectionReuseWrapper public: using T::T; - ~TConnectionReuseWrapper() override; + ~TConnectionReuseWrapper(); void SetReusableState(TReusableConnectionStatePtr reusableState); diff --git a/yt/yt/core/http/stream.cpp b/yt/yt/core/http/stream.cpp index 632e035a4c..ec76d4e845 100644 --- a/yt/yt/core/http/stream.cpp +++ b/yt/yt/core/http/stream.cpp @@ -5,6 +5,8 @@ #include <yt/yt/core/misc/finally.h> +#include <library/cpp/yt/misc/global.h> + #include <util/generic/buffer.h> #include <util/string/escape.h> @@ -14,10 +16,31 @@ namespace NYT::NHttp { using namespace NConcurrency; using namespace NNet; +//////////////////////////////////////////////////////////////////////////////// + static constexpr auto& Logger = HttpLogger; //////////////////////////////////////////////////////////////////////////////// +namespace { + +using TFilteredHeaderMap = THashSet<TString, TCaseInsensitiveStringHasher, TCaseInsensitiveStringEqualityComparer>; +YT_DEFINE_GLOBAL(const TFilteredHeaderMap, FilteredHeaders, { + "transfer-encoding", + "content-length", + "connection", + "host", +}); + +YT_DEFINE_GLOBAL(const TSharedRef, Http100ContinueBuffer, TSharedRef::FromString("HTTP/1.1 100 Continue\r\n\r\n")); +YT_DEFINE_GLOBAL(const TSharedRef, CrLfBuffer, TSharedRef::FromString("\r\n")); +YT_DEFINE_GLOBAL(const TSharedRef, ZeroCrLfBuffer, TSharedRef::FromString("0\r\n")); +YT_DEFINE_GLOBAL(const TSharedRef, ZeroCrLfCrLfBuffer, TSharedRef::FromString("0\r\n\r\n")); + +} // namespace + +//////////////////////////////////////////////////////////////////////////////// + http_parser_settings THttpParser::GetParserSettings() { http_parser_settings settings; @@ -225,21 +248,21 @@ struct THttpParserTag { }; THttpInput::THttpInput( - const IConnectionPtr& connection, + IConnectionPtr connection, const TNetworkAddress& remoteAddress, - const IInvokerPtr& readInvoker, + IInvokerPtr readInvoker, EMessageType messageType, - const THttpIOConfigPtr& config) - : Connection_(connection) + THttpIOConfigPtr config) + : Connection_(std::move(connection)) , RemoteAddress_(remoteAddress) , MessageType_(messageType) - , Config_(config) + , Config_(std::move(config)) + , ReadInvoker_(std::move(readInvoker)) , InputBuffer_(TSharedMutableRef::Allocate<THttpParserTag>(Config_->ReadBufferSize)) , Parser_(messageType == EMessageType::Request ? HTTP_REQUEST : HTTP_RESPONSE) - , StartByteCount_(connection->GetReadByteCount()) - , StartStatistics_(connection->GetReadStatistics()) + , StartByteCount_(Connection_->GetReadByteCount()) + , StartStatistics_(Connection_->GetReadStatistics()) , LastProgressLogTime_(TInstant::Now()) - , ReadInvoker_(readInvoker) { } std::pair<int, int> THttpInput::GetVersion() @@ -279,7 +302,7 @@ EStatusCode THttpInput::GetStatusCode() const THeadersPtr& THttpInput::GetTrailers() { if (Parser_.GetState() != EParserState::MessageFinished) { - THROW_ERROR_EXCEPTION("Cannot access trailers while body is not fully consumed"); + THROW_ERROR(AnnotateError(TError("Cannot access trailers while body is not fully consumed"))); } const auto& trailers = Parser_.GetTrailers(); @@ -336,6 +359,13 @@ void THttpInput::Reset() StartStatistics_ = Connection_->GetReadStatistics(); } +TError THttpInput::AnnotateError(const TError& error) +{ + return error + << TErrorAttribute("connection_id", Connection_->GetId()) + << TErrorAttribute("request_id", RequestId_); +} + void THttpInput::FinishHeaders() { HeadersReceived_ = true; @@ -350,7 +380,7 @@ void THttpInput::FinishHeaders() void THttpInput::EnsureHeadersReceived() { if (!ReceiveHeaders()) { - THROW_ERROR_EXCEPTION("Connection was closed before the first byte of HTTP message"); + THROW_ERROR(AnnotateError(TError("Connection was closed before the first byte of HTTP message"))); } } @@ -361,7 +391,7 @@ bool THttpInput::ReceiveHeaders() } bool idleConnection = MessageType_ == EMessageType::Request; - TInstant start = TInstant::Now(); + auto start = TInstant::Now(); if (idleConnection) { Connection_->SetReadDeadline(start + Config_->ConnectionIdleTimeout); @@ -392,7 +422,7 @@ bool THttpInput::ReceiveHeaders() UnconsumedData_ = Parser_.Feed(UnconsumedData_); } catch (const std::exception& ex) { if (!readResult.IsOK()) { - THROW_ERROR_EXCEPTION(ex) << readResult; + THROW_ERROR(AnnotateError(TError(ex) << readResult)); } else { throw; } @@ -543,25 +573,29 @@ std::optional<TString> THttpInput::TryGetRedirectUrl() //////////////////////////////////////////////////////////////////////////////// THttpOutput::THttpOutput( - const THeadersPtr& headers, - const IConnectionPtr& connection, + THeadersPtr headers, + IConnectionPtr connection, EMessageType messageType, - const THttpIOConfigPtr& config) - : Connection_(connection) + THttpIOConfigPtr config) + : Connection_(std::move(connection)) , MessageType_(messageType) - , Config_(config) + , Config_(std::move(config)) , OnWriteFinish_(BIND_NO_PROPAGATE(&THttpOutput::OnWriteFinish, MakeWeak(this))) - , StartByteCount_(connection->GetWriteByteCount()) - , StartStatistics_(connection->GetWriteStatistics()) + , StartByteCount_(Connection_->GetWriteByteCount()) + , StartStatistics_(Connection_->GetWriteStatistics()) , LastProgressLogTime_(TInstant::Now()) - , Headers_(headers) + , Headers_(std::move(headers)) { } THttpOutput::THttpOutput( - const IConnectionPtr& connection, + IConnectionPtr connection, EMessageType messageType, - const THttpIOConfigPtr& config) - : THttpOutput(New<THeaders>(), connection, messageType, config) + THttpIOConfigPtr config) + : THttpOutput( + New<THeaders>(), + std::move(connection), + messageType, + std::move(config)) { } const THeadersPtr& THttpOutput::GetHeaders() @@ -609,6 +643,8 @@ bool THttpOutput::IsSafeToReuse() const void THttpOutput::Reset() { + RequestId_ = {}; + StartByteCount_ = Connection_->GetWriteByteCount(); StartStatistics_ = Connection_->GetWriteStatistics(); HeadersLogged_ = false; @@ -687,7 +723,7 @@ TSharedRef THttpOutput::GetHeadersPart(std::optional<size_t> contentLength) messageHeaders << "Host: " << *HostHeader_ << "\r\n"; } - Headers_->WriteTo(&messageHeaders, &FilteredHeaders_); + Headers_->WriteTo(&messageHeaders, &FilteredHeaders()); TString headers; messageHeaders.Buffer().AsString(headers); @@ -698,7 +734,7 @@ TSharedRef THttpOutput::GetTrailersPart() { TBufferOutput messageTrailers; - Trailers_->WriteTo(&messageTrailers, &FilteredHeaders_); + Trailers_->WriteTo(&messageTrailers, &FilteredHeaders()); TString trailers; messageTrailers.Buffer().AsString(trailers); @@ -713,31 +749,31 @@ TSharedRef THttpOutput::GetChunkHeader(size_t size) void THttpOutput::Flush100Continue() { if (HeadersFlushed_) { - THROW_ERROR_EXCEPTION("Cannot send 100 Continue after headers"); + THROW_ERROR(AnnotateError(TError("Cannot send 100 Continue after headers"))); } Connection_->SetWriteDeadline(TInstant::Now() + Config_->WriteIdleTimeout); - WaitFor(Connection_->Write(Http100Continue).Apply(OnWriteFinish_)) + WaitFor(Connection_->Write(Http100ContinueBuffer()).Apply(OnWriteFinish_)) .ThrowOnError(); } TFuture<void> THttpOutput::Write(const TSharedRef& data) { if (MessageFinished_) { - THROW_ERROR_EXCEPTION("Cannot write to finished HTTP message"); + THROW_ERROR(AnnotateError(TError("Cannot write to finished HTTP message"))); } std::vector<TSharedRef> writeRefs; if (!HeadersFlushed_) { HeadersFlushed_ = true; - writeRefs.emplace_back(GetHeadersPart(std::nullopt)); - writeRefs.emplace_back(CrLf); + writeRefs.push_back(GetHeadersPart(std::nullopt)); + writeRefs.push_back(CrLfBuffer()); } - if (data.Size() != 0) { - writeRefs.emplace_back(GetChunkHeader(data.Size())); - writeRefs.emplace_back(data); - writeRefs.push_back(CrLf); + if (!data.Empty()) { + writeRefs.push_back(GetChunkHeader(data.Size())); + writeRefs.push_back(data); + writeRefs.push_back(CrLfBuffer()); } Connection_->SetWriteDeadline(TInstant::Now() + Config_->WriteIdleTimeout); @@ -763,16 +799,23 @@ TFuture<void> THttpOutput::Close() return FinishChunked(); } +TError THttpOutput::AnnotateError(const TError& error) +{ + return error + << TErrorAttribute("connection_id", Connection_->GetId()) + << TErrorAttribute("request_id", RequestId_); +} + TFuture<void> THttpOutput::FinishChunked() { std::vector<TSharedRef> writeRefs; if (Trailers_) { - writeRefs.emplace_back(ZeroCrLf); - writeRefs.emplace_back(GetTrailersPart()); - writeRefs.emplace_back(CrLf); + writeRefs.push_back(ZeroCrLfBuffer()); + writeRefs.push_back(GetTrailersPart()); + writeRefs.push_back(CrLfBuffer()); } else { - writeRefs.emplace_back(ZeroCrLfCrLf); + writeRefs.push_back(ZeroCrLfCrLfBuffer()); } MessageFinished_ = true; @@ -784,7 +827,7 @@ TFuture<void> THttpOutput::FinishChunked() TFuture<void> THttpOutput::WriteBody(const TSharedRef& smallBody) { if (HeadersFlushed_ || MessageFinished_) { - THROW_ERROR_EXCEPTION("Cannot write body to partially flushed HTTP message"); + THROW_ERROR(AnnotateError(TError("Cannot write body to partially flushed HTTP message"))); } TSharedRefArray writeRefs; @@ -793,16 +836,16 @@ TFuture<void> THttpOutput::WriteBody(const TSharedRef& smallBody) std::array<TSharedRef, 4>{ GetHeadersPart(smallBody.Size()), GetTrailersPart(), - CrLf, - smallBody + CrLfBuffer(), + smallBody, }, TSharedRefArray::TCopyParts{}); } else { writeRefs = TSharedRefArray( std::array<TSharedRef, 3>{ GetHeadersPart(smallBody.Size()), - CrLf, - smallBody + CrLfBuffer(), + smallBody, }, TSharedRefArray::TCopyParts{}); } @@ -852,18 +895,4 @@ void THttpOutput::OnWriteFinish() //////////////////////////////////////////////////////////////////////////////// -const THashSet<TString, TCaseInsensitiveStringHasher, TCaseInsensitiveStringEqualityComparer> THttpOutput::FilteredHeaders_ = { - "transfer-encoding", - "content-length", - "connection", - "host", -}; - -const TSharedRef THttpOutput::Http100Continue = TSharedRef::FromString("HTTP/1.1 100 Continue\r\n\r\n"); -const TSharedRef THttpOutput::CrLf = TSharedRef::FromString("\r\n"); -const TSharedRef THttpOutput::ZeroCrLf = TSharedRef::FromString("0\r\n"); -const TSharedRef THttpOutput::ZeroCrLfCrLf = TSharedRef::FromString("0\r\n\r\n"); - -//////////////////////////////////////////////////////////////////////////////// - } // namespace NYT::NHttp diff --git a/yt/yt/core/http/stream.h b/yt/yt/core/http/stream.h index 9efcea7f7a..a45ab15f0f 100644 --- a/yt/yt/core/http/stream.h +++ b/yt/yt/core/http/stream.h @@ -87,11 +87,11 @@ class THttpInput { public: THttpInput( - const NNet::IConnectionPtr& connection, - const NNet::TNetworkAddress& peerAddress, - const IInvokerPtr& readInvoker, + NNet::IConnectionPtr connection, + const NNet::TNetworkAddress& remoteAddress, + IInvokerPtr readInvoker, EMessageType messageType, - const THttpIOConfigPtr& config); + THttpIOConfigPtr config); EMethod GetMethod() override; const TUrlRef& GetUrl() override; @@ -135,6 +135,7 @@ private: const NNet::TNetworkAddress RemoteAddress_; const EMessageType MessageType_; const THttpIOConfigPtr Config_; + const IInvokerPtr ReadInvoker_; TSharedMutableRef InputBuffer_; TSharedRef UnconsumedData_; @@ -147,7 +148,7 @@ private: int Port_; THeadersPtr Headers_; - // Debug. + // Debug TRequestId RequestId_; i64 StartByteCount_ = 0; NNet::TConnectionStatistics StartStatistics_; @@ -157,12 +158,12 @@ private: bool SafeToReuse_ = false; bool IsHttps_ = false; + TError AnnotateError(const TError& error); + void FinishHeaders(); void FinishMessage(); void EnsureHeadersReceived(); - IInvokerPtr ReadInvoker_; - TSharedRef DoRead(); void MaybeLogSlowProgress(); @@ -179,15 +180,15 @@ class THttpOutput { public: THttpOutput( - const THeadersPtr& headers, - const NNet::IConnectionPtr& connection, + THeadersPtr headers, + NNet::IConnectionPtr connection, EMessageType messageType, - const THttpIOConfigPtr& config); + THttpIOConfigPtr config); THttpOutput( - const NNet::IConnectionPtr& connection, + NNet::IConnectionPtr connection, EMessageType messageType, - const THttpIOConfigPtr& config); + THttpIOConfigPtr config); const THeadersPtr& GetHeaders() override; void SetHeaders(const THeadersPtr& headers); @@ -222,9 +223,9 @@ private: const EMessageType MessageType_; const THttpIOConfigPtr Config_; - TClosure OnWriteFinish_; + const TClosure OnWriteFinish_; - //! Debugging. + // Debug TRequestId RequestId_; i64 StartByteCount_ = 0; NNet::TConnectionStatistics StartStatistics_; @@ -235,7 +236,7 @@ private: bool ConnectionClose_ = false; - //! Headers. + // Headers THeadersPtr Headers_; std::optional<EStatusCode> Status_; std::optional<EMethod> Method_; @@ -244,9 +245,11 @@ private: bool HeadersFlushed_ = false; bool MessageFinished_ = false; - //! Trailers. + // Trailers THeadersPtr Trailers_; + TError AnnotateError(const TError& error); + TFuture<void> FinishChunked(); TSharedRef GetHeadersPart(std::optional<size_t> contentLength); @@ -254,11 +257,6 @@ private: static TSharedRef GetChunkHeader(size_t size); - static const TSharedRef Http100Continue; - static const TSharedRef CrLf; - static const TSharedRef ZeroCrLf; - static const TSharedRef ZeroCrLfCrLf; - void OnWriteFinish(); }; |