aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbabenko <babenko@yandex-team.com>2024-09-30 22:50:52 +0300
committerbabenko <babenko@yandex-team.com>2024-09-30 23:01:01 +0300
commitd19bc73d3c420fac320c383e649321a318c56b82 (patch)
tree84de53a7b0451679447105c96ee3a5be9cf17677
parent803c95f77d7e098750be07c125e78f892ec7c169 (diff)
downloadydb-d19bc73d3c420fac320c383e649321a318c56b82.tar.gz
Cosmetics and better logging HTTP input/output
commit_hash:78c959d8b1d506e78fa0d49b89525db73487a75c
-rw-r--r--yt/yt/core/http/connection_pool.cpp65
-rw-r--r--yt/yt/core/http/connection_pool.h25
-rw-r--r--yt/yt/core/http/connection_reuse_helpers.h2
-rw-r--r--yt/yt/core/http/stream.cpp143
-rw-r--r--yt/yt/core/http/stream.h40
5 files changed, 157 insertions, 118 deletions
diff --git a/yt/yt/core/http/connection_pool.cpp b/yt/yt/core/http/connection_pool.cpp
index 27e6965067..8c3c0fd4fa 100644
--- a/yt/yt/core/http/connection_pool.cpp
+++ b/yt/yt/core/http/connection_pool.cpp
@@ -17,12 +17,12 @@ static constexpr auto& Logger = HttpLogger;
////////////////////////////////////////////////////////////////////////////////
-TDuration TIdleConnection::GetIdleTime() const
+TDuration TConnectionPool::TPooledConnection::GetIdleTime() const
{
return TInstant::Now() - InsertionTime;
}
-bool TIdleConnection::IsOK() const
+bool TConnectionPool::TPooledConnection::IsValid() const
{
return Connection->IsIdle();
}
@@ -35,8 +35,8 @@ TConnectionPool::TConnectionPool(
IInvokerPtr invoker)
: Dialer_(std::move(dialer))
, Config_(std::move(config))
- , Connections_(Config_->MaxIdleConnections)
- , ExpiredConnectionsCollector_(
+ , Cache_(Config_->MaxIdleConnections)
+ , ExpirationExecutor_(
New<TPeriodicExecutor>(
std::move(invoker),
BIND(&TConnectionPool::DropExpiredConnections, MakeWeak(this)),
@@ -44,13 +44,13 @@ TConnectionPool::TConnectionPool(
Config_->ConnectionIdleTimeout)))
{
if (Config_->MaxIdleConnections > 0) {
- ExpiredConnectionsCollector_->Start();
+ ExpirationExecutor_->Start();
}
}
TConnectionPool::~TConnectionPool()
{
- YT_UNUSED_FUTURE(ExpiredConnectionsCollector_->Stop());
+ YT_UNUSED_FUTURE(ExpirationExecutor_->Stop());
}
TFuture<IConnectionPtr> TConnectionPool::Connect(
@@ -59,12 +59,10 @@ TFuture<IConnectionPtr> TConnectionPool::Connect(
{
{
auto guard = Guard(SpinLock_);
-
- while (auto item = Connections_.TryExtract(address)) {
- if (item->GetIdleTime() < Config_->ConnectionIdleTimeout && item->IsOK()) {
- auto&& connection = item->Connection;
- YT_LOG_DEBUG("Connection is extracted from cache (Address: %v, ConnectionId: %v)",
- address,
+ while (auto pooledConnection = Cache_.TryExtract(address)) {
+ if (CheckPooledConnection(*pooledConnection)) {
+ auto connection = std::move(pooledConnection->Connection);
+ YT_LOG_DEBUG("Connection is extracted from cache (ConnectionId: %v)",
connection->GetId());
return MakeFuture<IConnectionPtr>(std::move(connection));
}
@@ -76,32 +74,47 @@ TFuture<IConnectionPtr> TConnectionPool::Connect(
void TConnectionPool::Release(const IConnectionPtr& connection)
{
- YT_LOG_DEBUG("Connection is put to cache (Address: %v, ConnectionId: %v)",
- connection->GetRemoteAddress(),
+ YT_LOG_DEBUG("Connection is put to cache (ConnectionId: %v)",
connection->GetId());
- auto guard = Guard(SpinLock_);
- Connections_.Insert(connection->GetRemoteAddress(), {connection, TInstant::Now()});
+ {
+ auto guard = Guard(SpinLock_);
+ Cache_.Insert(connection->GetRemoteAddress(), {connection, TInstant::Now()});
+ }
+}
+
+bool TConnectionPool::CheckPooledConnection(const TPooledConnection& pooledConnection)
+{
+ auto idleTime = pooledConnection.GetIdleTime();
+ if (idleTime > Config_->ConnectionIdleTimeout) {
+ YT_LOG_DEBUG("Connection evicted from cache due to idle timeout (ConnectionId: %v)",
+ pooledConnection.Connection->GetId());
+ return false;
+ }
+
+ if (!pooledConnection.IsValid()) {
+ YT_LOG_DEBUG("Connection evicted from cache due to invalid state (ConnectionId: %v)",
+ pooledConnection.Connection->GetId());
+ return false;
+ }
+
+ return true;
}
void TConnectionPool::DropExpiredConnections()
{
auto guard = Guard(SpinLock_);
- decltype(Connections_) validConnections(Config_->MaxIdleConnections);
+ decltype(Cache_) newCache(Config_->MaxIdleConnections);
- while (Connections_.GetSize() > 0) {
- auto idleConnection = Connections_.Pop();
- if (idleConnection.GetIdleTime() < Config_->ConnectionIdleTimeout && idleConnection.IsOK()) {
- validConnections.Insert(idleConnection.Connection->GetRemoteAddress(), idleConnection);
- } else {
- YT_LOG_DEBUG("Connection expired from cache (Address: %v, ConnectionId: %v)",
- idleConnection.Connection->GetRemoteAddress(),
- idleConnection.Connection->GetId());
+ while (Cache_.GetSize() > 0) {
+ auto pooledConnection = Cache_.Pop();
+ if (CheckPooledConnection(pooledConnection)) {
+ newCache.Insert(pooledConnection.Connection->GetRemoteAddress(), pooledConnection);
}
}
- Connections_ = std::move(validConnections);
+ Cache_ = std::move(newCache);
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/http/connection_pool.h b/yt/yt/core/http/connection_pool.h
index cacde74e14..d977f58efb 100644
--- a/yt/yt/core/http/connection_pool.h
+++ b/yt/yt/core/http/connection_pool.h
@@ -16,17 +16,6 @@ namespace NYT::NHttp {
////////////////////////////////////////////////////////////////////////////////
-struct TIdleConnection
-{
- NNet::IConnectionPtr Connection;
- TInstant InsertionTime;
-
- TDuration GetIdleTime() const;
- bool IsOK() const;
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
class TConnectionPool
: public TRefCounted
{
@@ -48,10 +37,20 @@ private:
const NNet::IDialerPtr Dialer_;
const TClientConfigPtr Config_;
+ struct TPooledConnection
+ {
+ NNet::IConnectionPtr Connection;
+ TInstant InsertionTime;
+
+ TDuration GetIdleTime() const;
+ bool IsValid() const;
+ };
+
YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_);
- TMultiLruCache<NNet::TNetworkAddress, TIdleConnection> Connections_;
- NConcurrency::TPeriodicExecutorPtr ExpiredConnectionsCollector_;
+ TMultiLruCache<NNet::TNetworkAddress, TPooledConnection> Cache_;
+ NConcurrency::TPeriodicExecutorPtr ExpirationExecutor_;
+ bool CheckPooledConnection(const TPooledConnection& pooledConnection);
void DropExpiredConnections();
};
diff --git a/yt/yt/core/http/connection_reuse_helpers.h b/yt/yt/core/http/connection_reuse_helpers.h
index 5d7039ebad..fad3243117 100644
--- a/yt/yt/core/http/connection_reuse_helpers.h
+++ b/yt/yt/core/http/connection_reuse_helpers.h
@@ -33,7 +33,7 @@ class TConnectionReuseWrapper
public:
using T::T;
- ~TConnectionReuseWrapper() override;
+ ~TConnectionReuseWrapper();
void SetReusableState(TReusableConnectionStatePtr reusableState);
diff --git a/yt/yt/core/http/stream.cpp b/yt/yt/core/http/stream.cpp
index 632e035a4c..ec76d4e845 100644
--- a/yt/yt/core/http/stream.cpp
+++ b/yt/yt/core/http/stream.cpp
@@ -5,6 +5,8 @@
#include <yt/yt/core/misc/finally.h>
+#include <library/cpp/yt/misc/global.h>
+
#include <util/generic/buffer.h>
#include <util/string/escape.h>
@@ -14,10 +16,31 @@ namespace NYT::NHttp {
using namespace NConcurrency;
using namespace NNet;
+////////////////////////////////////////////////////////////////////////////////
+
static constexpr auto& Logger = HttpLogger;
////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+using TFilteredHeaderMap = THashSet<TString, TCaseInsensitiveStringHasher, TCaseInsensitiveStringEqualityComparer>;
+YT_DEFINE_GLOBAL(const TFilteredHeaderMap, FilteredHeaders, {
+ "transfer-encoding",
+ "content-length",
+ "connection",
+ "host",
+});
+
+YT_DEFINE_GLOBAL(const TSharedRef, Http100ContinueBuffer, TSharedRef::FromString("HTTP/1.1 100 Continue\r\n\r\n"));
+YT_DEFINE_GLOBAL(const TSharedRef, CrLfBuffer, TSharedRef::FromString("\r\n"));
+YT_DEFINE_GLOBAL(const TSharedRef, ZeroCrLfBuffer, TSharedRef::FromString("0\r\n"));
+YT_DEFINE_GLOBAL(const TSharedRef, ZeroCrLfCrLfBuffer, TSharedRef::FromString("0\r\n\r\n"));
+
+} // namespace
+
+////////////////////////////////////////////////////////////////////////////////
+
http_parser_settings THttpParser::GetParserSettings()
{
http_parser_settings settings;
@@ -225,21 +248,21 @@ struct THttpParserTag
{ };
THttpInput::THttpInput(
- const IConnectionPtr& connection,
+ IConnectionPtr connection,
const TNetworkAddress& remoteAddress,
- const IInvokerPtr& readInvoker,
+ IInvokerPtr readInvoker,
EMessageType messageType,
- const THttpIOConfigPtr& config)
- : Connection_(connection)
+ THttpIOConfigPtr config)
+ : Connection_(std::move(connection))
, RemoteAddress_(remoteAddress)
, MessageType_(messageType)
- , Config_(config)
+ , Config_(std::move(config))
+ , ReadInvoker_(std::move(readInvoker))
, InputBuffer_(TSharedMutableRef::Allocate<THttpParserTag>(Config_->ReadBufferSize))
, Parser_(messageType == EMessageType::Request ? HTTP_REQUEST : HTTP_RESPONSE)
- , StartByteCount_(connection->GetReadByteCount())
- , StartStatistics_(connection->GetReadStatistics())
+ , StartByteCount_(Connection_->GetReadByteCount())
+ , StartStatistics_(Connection_->GetReadStatistics())
, LastProgressLogTime_(TInstant::Now())
- , ReadInvoker_(readInvoker)
{ }
std::pair<int, int> THttpInput::GetVersion()
@@ -279,7 +302,7 @@ EStatusCode THttpInput::GetStatusCode()
const THeadersPtr& THttpInput::GetTrailers()
{
if (Parser_.GetState() != EParserState::MessageFinished) {
- THROW_ERROR_EXCEPTION("Cannot access trailers while body is not fully consumed");
+ THROW_ERROR(AnnotateError(TError("Cannot access trailers while body is not fully consumed")));
}
const auto& trailers = Parser_.GetTrailers();
@@ -336,6 +359,13 @@ void THttpInput::Reset()
StartStatistics_ = Connection_->GetReadStatistics();
}
+TError THttpInput::AnnotateError(const TError& error)
+{
+ return error
+ << TErrorAttribute("connection_id", Connection_->GetId())
+ << TErrorAttribute("request_id", RequestId_);
+}
+
void THttpInput::FinishHeaders()
{
HeadersReceived_ = true;
@@ -350,7 +380,7 @@ void THttpInput::FinishHeaders()
void THttpInput::EnsureHeadersReceived()
{
if (!ReceiveHeaders()) {
- THROW_ERROR_EXCEPTION("Connection was closed before the first byte of HTTP message");
+ THROW_ERROR(AnnotateError(TError("Connection was closed before the first byte of HTTP message")));
}
}
@@ -361,7 +391,7 @@ bool THttpInput::ReceiveHeaders()
}
bool idleConnection = MessageType_ == EMessageType::Request;
- TInstant start = TInstant::Now();
+ auto start = TInstant::Now();
if (idleConnection) {
Connection_->SetReadDeadline(start + Config_->ConnectionIdleTimeout);
@@ -392,7 +422,7 @@ bool THttpInput::ReceiveHeaders()
UnconsumedData_ = Parser_.Feed(UnconsumedData_);
} catch (const std::exception& ex) {
if (!readResult.IsOK()) {
- THROW_ERROR_EXCEPTION(ex) << readResult;
+ THROW_ERROR(AnnotateError(TError(ex) << readResult));
} else {
throw;
}
@@ -543,25 +573,29 @@ std::optional<TString> THttpInput::TryGetRedirectUrl()
////////////////////////////////////////////////////////////////////////////////
THttpOutput::THttpOutput(
- const THeadersPtr& headers,
- const IConnectionPtr& connection,
+ THeadersPtr headers,
+ IConnectionPtr connection,
EMessageType messageType,
- const THttpIOConfigPtr& config)
- : Connection_(connection)
+ THttpIOConfigPtr config)
+ : Connection_(std::move(connection))
, MessageType_(messageType)
- , Config_(config)
+ , Config_(std::move(config))
, OnWriteFinish_(BIND_NO_PROPAGATE(&THttpOutput::OnWriteFinish, MakeWeak(this)))
- , StartByteCount_(connection->GetWriteByteCount())
- , StartStatistics_(connection->GetWriteStatistics())
+ , StartByteCount_(Connection_->GetWriteByteCount())
+ , StartStatistics_(Connection_->GetWriteStatistics())
, LastProgressLogTime_(TInstant::Now())
- , Headers_(headers)
+ , Headers_(std::move(headers))
{ }
THttpOutput::THttpOutput(
- const IConnectionPtr& connection,
+ IConnectionPtr connection,
EMessageType messageType,
- const THttpIOConfigPtr& config)
- : THttpOutput(New<THeaders>(), connection, messageType, config)
+ THttpIOConfigPtr config)
+ : THttpOutput(
+ New<THeaders>(),
+ std::move(connection),
+ messageType,
+ std::move(config))
{ }
const THeadersPtr& THttpOutput::GetHeaders()
@@ -609,6 +643,8 @@ bool THttpOutput::IsSafeToReuse() const
void THttpOutput::Reset()
{
+ RequestId_ = {};
+
StartByteCount_ = Connection_->GetWriteByteCount();
StartStatistics_ = Connection_->GetWriteStatistics();
HeadersLogged_ = false;
@@ -687,7 +723,7 @@ TSharedRef THttpOutput::GetHeadersPart(std::optional<size_t> contentLength)
messageHeaders << "Host: " << *HostHeader_ << "\r\n";
}
- Headers_->WriteTo(&messageHeaders, &FilteredHeaders_);
+ Headers_->WriteTo(&messageHeaders, &FilteredHeaders());
TString headers;
messageHeaders.Buffer().AsString(headers);
@@ -698,7 +734,7 @@ TSharedRef THttpOutput::GetTrailersPart()
{
TBufferOutput messageTrailers;
- Trailers_->WriteTo(&messageTrailers, &FilteredHeaders_);
+ Trailers_->WriteTo(&messageTrailers, &FilteredHeaders());
TString trailers;
messageTrailers.Buffer().AsString(trailers);
@@ -713,31 +749,31 @@ TSharedRef THttpOutput::GetChunkHeader(size_t size)
void THttpOutput::Flush100Continue()
{
if (HeadersFlushed_) {
- THROW_ERROR_EXCEPTION("Cannot send 100 Continue after headers");
+ THROW_ERROR(AnnotateError(TError("Cannot send 100 Continue after headers")));
}
Connection_->SetWriteDeadline(TInstant::Now() + Config_->WriteIdleTimeout);
- WaitFor(Connection_->Write(Http100Continue).Apply(OnWriteFinish_))
+ WaitFor(Connection_->Write(Http100ContinueBuffer()).Apply(OnWriteFinish_))
.ThrowOnError();
}
TFuture<void> THttpOutput::Write(const TSharedRef& data)
{
if (MessageFinished_) {
- THROW_ERROR_EXCEPTION("Cannot write to finished HTTP message");
+ THROW_ERROR(AnnotateError(TError("Cannot write to finished HTTP message")));
}
std::vector<TSharedRef> writeRefs;
if (!HeadersFlushed_) {
HeadersFlushed_ = true;
- writeRefs.emplace_back(GetHeadersPart(std::nullopt));
- writeRefs.emplace_back(CrLf);
+ writeRefs.push_back(GetHeadersPart(std::nullopt));
+ writeRefs.push_back(CrLfBuffer());
}
- if (data.Size() != 0) {
- writeRefs.emplace_back(GetChunkHeader(data.Size()));
- writeRefs.emplace_back(data);
- writeRefs.push_back(CrLf);
+ if (!data.Empty()) {
+ writeRefs.push_back(GetChunkHeader(data.Size()));
+ writeRefs.push_back(data);
+ writeRefs.push_back(CrLfBuffer());
}
Connection_->SetWriteDeadline(TInstant::Now() + Config_->WriteIdleTimeout);
@@ -763,16 +799,23 @@ TFuture<void> THttpOutput::Close()
return FinishChunked();
}
+TError THttpOutput::AnnotateError(const TError& error)
+{
+ return error
+ << TErrorAttribute("connection_id", Connection_->GetId())
+ << TErrorAttribute("request_id", RequestId_);
+}
+
TFuture<void> THttpOutput::FinishChunked()
{
std::vector<TSharedRef> writeRefs;
if (Trailers_) {
- writeRefs.emplace_back(ZeroCrLf);
- writeRefs.emplace_back(GetTrailersPart());
- writeRefs.emplace_back(CrLf);
+ writeRefs.push_back(ZeroCrLfBuffer());
+ writeRefs.push_back(GetTrailersPart());
+ writeRefs.push_back(CrLfBuffer());
} else {
- writeRefs.emplace_back(ZeroCrLfCrLf);
+ writeRefs.push_back(ZeroCrLfCrLfBuffer());
}
MessageFinished_ = true;
@@ -784,7 +827,7 @@ TFuture<void> THttpOutput::FinishChunked()
TFuture<void> THttpOutput::WriteBody(const TSharedRef& smallBody)
{
if (HeadersFlushed_ || MessageFinished_) {
- THROW_ERROR_EXCEPTION("Cannot write body to partially flushed HTTP message");
+ THROW_ERROR(AnnotateError(TError("Cannot write body to partially flushed HTTP message")));
}
TSharedRefArray writeRefs;
@@ -793,16 +836,16 @@ TFuture<void> THttpOutput::WriteBody(const TSharedRef& smallBody)
std::array<TSharedRef, 4>{
GetHeadersPart(smallBody.Size()),
GetTrailersPart(),
- CrLf,
- smallBody
+ CrLfBuffer(),
+ smallBody,
},
TSharedRefArray::TCopyParts{});
} else {
writeRefs = TSharedRefArray(
std::array<TSharedRef, 3>{
GetHeadersPart(smallBody.Size()),
- CrLf,
- smallBody
+ CrLfBuffer(),
+ smallBody,
},
TSharedRefArray::TCopyParts{});
}
@@ -852,18 +895,4 @@ void THttpOutput::OnWriteFinish()
////////////////////////////////////////////////////////////////////////////////
-const THashSet<TString, TCaseInsensitiveStringHasher, TCaseInsensitiveStringEqualityComparer> THttpOutput::FilteredHeaders_ = {
- "transfer-encoding",
- "content-length",
- "connection",
- "host",
-};
-
-const TSharedRef THttpOutput::Http100Continue = TSharedRef::FromString("HTTP/1.1 100 Continue\r\n\r\n");
-const TSharedRef THttpOutput::CrLf = TSharedRef::FromString("\r\n");
-const TSharedRef THttpOutput::ZeroCrLf = TSharedRef::FromString("0\r\n");
-const TSharedRef THttpOutput::ZeroCrLfCrLf = TSharedRef::FromString("0\r\n\r\n");
-
-////////////////////////////////////////////////////////////////////////////////
-
} // namespace NYT::NHttp
diff --git a/yt/yt/core/http/stream.h b/yt/yt/core/http/stream.h
index 9efcea7f7a..a45ab15f0f 100644
--- a/yt/yt/core/http/stream.h
+++ b/yt/yt/core/http/stream.h
@@ -87,11 +87,11 @@ class THttpInput
{
public:
THttpInput(
- const NNet::IConnectionPtr& connection,
- const NNet::TNetworkAddress& peerAddress,
- const IInvokerPtr& readInvoker,
+ NNet::IConnectionPtr connection,
+ const NNet::TNetworkAddress& remoteAddress,
+ IInvokerPtr readInvoker,
EMessageType messageType,
- const THttpIOConfigPtr& config);
+ THttpIOConfigPtr config);
EMethod GetMethod() override;
const TUrlRef& GetUrl() override;
@@ -135,6 +135,7 @@ private:
const NNet::TNetworkAddress RemoteAddress_;
const EMessageType MessageType_;
const THttpIOConfigPtr Config_;
+ const IInvokerPtr ReadInvoker_;
TSharedMutableRef InputBuffer_;
TSharedRef UnconsumedData_;
@@ -147,7 +148,7 @@ private:
int Port_;
THeadersPtr Headers_;
- // Debug.
+ // Debug
TRequestId RequestId_;
i64 StartByteCount_ = 0;
NNet::TConnectionStatistics StartStatistics_;
@@ -157,12 +158,12 @@ private:
bool SafeToReuse_ = false;
bool IsHttps_ = false;
+ TError AnnotateError(const TError& error);
+
void FinishHeaders();
void FinishMessage();
void EnsureHeadersReceived();
- IInvokerPtr ReadInvoker_;
-
TSharedRef DoRead();
void MaybeLogSlowProgress();
@@ -179,15 +180,15 @@ class THttpOutput
{
public:
THttpOutput(
- const THeadersPtr& headers,
- const NNet::IConnectionPtr& connection,
+ THeadersPtr headers,
+ NNet::IConnectionPtr connection,
EMessageType messageType,
- const THttpIOConfigPtr& config);
+ THttpIOConfigPtr config);
THttpOutput(
- const NNet::IConnectionPtr& connection,
+ NNet::IConnectionPtr connection,
EMessageType messageType,
- const THttpIOConfigPtr& config);
+ THttpIOConfigPtr config);
const THeadersPtr& GetHeaders() override;
void SetHeaders(const THeadersPtr& headers);
@@ -222,9 +223,9 @@ private:
const EMessageType MessageType_;
const THttpIOConfigPtr Config_;
- TClosure OnWriteFinish_;
+ const TClosure OnWriteFinish_;
- //! Debugging.
+ // Debug
TRequestId RequestId_;
i64 StartByteCount_ = 0;
NNet::TConnectionStatistics StartStatistics_;
@@ -235,7 +236,7 @@ private:
bool ConnectionClose_ = false;
- //! Headers.
+ // Headers
THeadersPtr Headers_;
std::optional<EStatusCode> Status_;
std::optional<EMethod> Method_;
@@ -244,9 +245,11 @@ private:
bool HeadersFlushed_ = false;
bool MessageFinished_ = false;
- //! Trailers.
+ // Trailers
THeadersPtr Trailers_;
+ TError AnnotateError(const TError& error);
+
TFuture<void> FinishChunked();
TSharedRef GetHeadersPart(std::optional<size_t> contentLength);
@@ -254,11 +257,6 @@ private:
static TSharedRef GetChunkHeader(size_t size);
- static const TSharedRef Http100Continue;
- static const TSharedRef CrLf;
- static const TSharedRef ZeroCrLf;
- static const TSharedRef ZeroCrLfCrLf;
-
void OnWriteFinish();
};