aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornadya02 <nadya02@yandex-team.com>2025-03-03 10:14:47 +0300
committernadya02 <nadya02@yandex-team.com>2025-03-03 10:47:54 +0300
commit71a0af2180b56d33bf226059c660cd003f922fbc (patch)
treecef6e879c187a4e6f484f6e73f8469963c294fe9
parent34c355040f7a0148f89890c2398aada57d5adb97 (diff)
downloadydb-71a0af2180b56d33bf226059c660cd003f922fbc.tar.gz
YT-23989: Add memory tracker for http category
* Changelog entry Type: feature Component: proxy Add memory tracker for http user traffic. commit_hash:713e2e8f8c95bcd6a67cd99bf5799e1b925e7bf6
-rw-r--r--yt/yt/core/http/server.cpp43
-rw-r--r--yt/yt/core/http/server.h6
-rw-r--r--yt/yt/core/http/stream.cpp25
-rw-r--r--yt/yt/core/http/stream.h11
-rw-r--r--yt/yt/core/https/server.cpp6
-rw-r--r--yt/yt/core/https/server.h5
6 files changed, 68 insertions, 28 deletions
diff --git a/yt/yt/core/http/server.cpp b/yt/yt/core/http/server.cpp
index d2fda153b53..43ca58c7958 100644
--- a/yt/yt/core/http/server.cpp
+++ b/yt/yt/core/http/server.cpp
@@ -12,6 +12,7 @@
#include <yt/yt/core/concurrency/thread_pool_poller.h>
#include <yt/yt/core/misc/finally.h>
+#include <yt/yt/core/misc/memory_usage_tracker.h>
#include <yt/yt/core/misc/public.h>
#include <yt/yt/core/ytree/convert.h>
@@ -62,6 +63,7 @@ public:
IPollerPtr poller,
IPollerPtr acceptor,
IInvokerPtr invoker,
+ IMemoryUsageTrackerPtr memoryUsageTracker,
IRequestPathMatcherPtr requestPathMatcher,
bool ownPoller = false)
: Config_(std::move(config))
@@ -69,6 +71,7 @@ public:
, Poller_(std::move(poller))
, Acceptor_(std::move(acceptor))
, Invoker_(std::move(invoker))
+ , MemoryUsageTracker_(std::move(memoryUsageTracker))
, OwnPoller_(ownPoller)
, RequestPathMatcher_(std::move(requestPathMatcher))
{ }
@@ -123,6 +126,7 @@ private:
const IPollerPtr Poller_;
const IPollerPtr Acceptor_;
const IInvokerPtr Invoker_;
+ const IMemoryUsageTrackerPtr MemoryUsageTracker_;
const bool OwnPoller_ = false;
IRequestPathMatcherPtr RequestPathMatcher_;
@@ -291,7 +295,8 @@ private:
connection->GetRemoteAddress(),
GetCurrentInvoker(),
EMessageType::Request,
- Config_);
+ Config_,
+ MemoryUsageTracker_);
if (Config_->IsHttps) {
request->SetHttps();
@@ -302,7 +307,8 @@ private:
auto response = New<THttpOutput>(
connection,
EMessageType::Response,
- Config_);
+ Config_,
+ MemoryUsageTracker_);
while (true) {
auto requestId = TRequestId::Create();
@@ -381,6 +387,7 @@ IServerPtr CreateServer(
IPollerPtr poller,
IPollerPtr acceptor,
IInvokerPtr invoker,
+ IMemoryUsageTrackerPtr memoryUsageTracker,
bool ownPoller)
{
auto handlers = New<TRequestPathMatcher>();
@@ -390,6 +397,7 @@ IServerPtr CreateServer(
std::move(poller),
std::move(acceptor),
std::move(invoker),
+ std::move(memoryUsageTracker),
std::move(handlers),
ownPoller);
}
@@ -399,19 +407,15 @@ IServerPtr CreateServer(
IPollerPtr poller,
IPollerPtr acceptor,
IInvokerPtr invoker,
+ IMemoryUsageTrackerPtr memoryUsageTracker,
bool ownPoller)
{
auto address = TNetworkAddress::CreateIPv6Any(config->Port);
+ IListenerPtr listener;
for (int i = 0;; ++i) {
try {
- auto listener = CreateListener(address, poller, acceptor, config->MaxBacklogSize);
- return CreateServer(
- std::move(config),
- std::move(listener),
- std::move(poller),
- std::move(acceptor),
- std::move(invoker),
- ownPoller);
+ listener = CreateListener(address, poller, acceptor, config->MaxBacklogSize);
+ break;
} catch (const std::exception& ex) {
if (i + 1 == config->BindRetryCount) {
throw;
@@ -421,6 +425,14 @@ IServerPtr CreateServer(
}
}
}
+ return CreateServer(
+ std::move(config),
+ std::move(listener),
+ std::move(poller),
+ std::move(acceptor),
+ std::move(invoker),
+ std::move(memoryUsageTracker),
+ ownPoller);
}
} // namespace
@@ -440,6 +452,7 @@ IServerPtr CreateServer(
std::move(poller),
std::move(acceptor),
std::move(invoker),
+ /*memoryUsageTracker*/ GetNullMemoryUsageTracker(),
/*ownPoller*/ false);
}
@@ -447,7 +460,8 @@ IServerPtr CreateServer(
TServerConfigPtr config,
IListenerPtr listener,
IPollerPtr poller,
- IPollerPtr acceptor)
+ IPollerPtr acceptor,
+ IMemoryUsageTrackerPtr memoryUsageTracker)
{
auto invoker = poller->GetInvoker();
return CreateServer(
@@ -456,13 +470,15 @@ IServerPtr CreateServer(
std::move(poller),
std::move(acceptor),
std::move(invoker),
+ std::move(memoryUsageTracker),
/*ownPoller*/ false);
}
IServerPtr CreateServer(
TServerConfigPtr config,
IPollerPtr poller,
- IPollerPtr acceptor)
+ IPollerPtr acceptor,
+ IMemoryUsageTrackerPtr memoryUsageTracker)
{
auto invoker = poller->GetInvoker();
return CreateServer(
@@ -470,6 +486,7 @@ IServerPtr CreateServer(
std::move(poller),
std::move(acceptor),
std::move(invoker),
+ std::move(memoryUsageTracker),
/*ownPoller*/ false);
}
@@ -499,6 +516,7 @@ IServerPtr CreateServer(TServerConfigPtr config, int pollerThreadCount)
std::move(poller),
std::move(acceptor),
std::move(invoker),
+ /*memoryUsageTracker*/ GetNullMemoryUsageTracker(),
/*ownPoller*/ true);
}
@@ -513,6 +531,7 @@ IServerPtr CreateServer(
std::move(poller),
std::move(acceptor),
std::move(invoker),
+ /*memoryUsageTracker*/ GetNullMemoryUsageTracker(),
/*ownPoller*/ false);
}
diff --git a/yt/yt/core/http/server.h b/yt/yt/core/http/server.h
index e96720981b9..171fb70399d 100644
--- a/yt/yt/core/http/server.h
+++ b/yt/yt/core/http/server.h
@@ -89,14 +89,16 @@ IServerPtr CreateServer(
TServerConfigPtr config,
NNet::IListenerPtr listener,
NConcurrency::IPollerPtr poller,
- NConcurrency::IPollerPtr acceptor);
+ NConcurrency::IPollerPtr acceptor,
+ IMemoryUsageTrackerPtr memoryTracker = GetNullMemoryUsageTracker());
IServerPtr CreateServer(
TServerConfigPtr config,
NConcurrency::IPollerPtr poller);
IServerPtr CreateServer(
TServerConfigPtr config,
NConcurrency::IPollerPtr poller,
- NConcurrency::IPollerPtr acceptor);
+ NConcurrency::IPollerPtr acceptor,
+ IMemoryUsageTrackerPtr memoryTracker = GetNullMemoryUsageTracker());
IServerPtr CreateServer(
int port,
NConcurrency::IPollerPtr poller);
diff --git a/yt/yt/core/http/stream.cpp b/yt/yt/core/http/stream.cpp
index ec76d4e845d..7f059df8411 100644
--- a/yt/yt/core/http/stream.cpp
+++ b/yt/yt/core/http/stream.cpp
@@ -252,12 +252,14 @@ THttpInput::THttpInput(
const TNetworkAddress& remoteAddress,
IInvokerPtr readInvoker,
EMessageType messageType,
- THttpIOConfigPtr config)
+ THttpIOConfigPtr config,
+ IMemoryUsageTrackerPtr memoryUsageTracker)
: Connection_(std::move(connection))
, RemoteAddress_(remoteAddress)
, MessageType_(messageType)
, Config_(std::move(config))
, ReadInvoker_(std::move(readInvoker))
+ , MemoryUsageTracker_(std::move(memoryUsageTracker))
, InputBuffer_(TSharedMutableRef::Allocate<THttpParserTag>(Config_->ReadBufferSize))
, Parser_(messageType == EMessageType::Request ? HTTP_REQUEST : HTTP_RESPONSE)
, StartByteCount_(Connection_->GetReadByteCount())
@@ -514,7 +516,8 @@ TSharedRef THttpInput::DoRead()
auto chunk = Parser_.GetLastBodyChunk();
if (!chunk.Empty()) {
Connection_->SetReadDeadline(std::nullopt);
- return chunk;
+ auto trackedChunk = TrackMemory(MemoryUsageTracker_, chunk);
+ return trackedChunk;
}
bool eof = false;
@@ -576,11 +579,13 @@ THttpOutput::THttpOutput(
THeadersPtr headers,
IConnectionPtr connection,
EMessageType messageType,
- THttpIOConfigPtr config)
+ THttpIOConfigPtr config,
+ IMemoryUsageTrackerPtr memoryUsageTracker)
: Connection_(std::move(connection))
, MessageType_(messageType)
, Config_(std::move(config))
, OnWriteFinish_(BIND_NO_PROPAGATE(&THttpOutput::OnWriteFinish, MakeWeak(this)))
+ , MemoryUsageTracker_(std::move(memoryUsageTracker))
, StartByteCount_(Connection_->GetWriteByteCount())
, StartStatistics_(Connection_->GetWriteStatistics())
, LastProgressLogTime_(TInstant::Now())
@@ -590,12 +595,14 @@ THttpOutput::THttpOutput(
THttpOutput::THttpOutput(
IConnectionPtr connection,
EMessageType messageType,
- THttpIOConfigPtr config)
+ THttpIOConfigPtr config,
+ IMemoryUsageTrackerPtr memoryUsageTracker)
: THttpOutput(
New<THeaders>(),
std::move(connection),
messageType,
- std::move(config))
+ std::move(config),
+ std::move(memoryUsageTracker))
{ }
const THeadersPtr& THttpOutput::GetHeaders()
@@ -763,6 +770,8 @@ TFuture<void> THttpOutput::Write(const TSharedRef& data)
THROW_ERROR(AnnotateError(TError("Cannot write to finished HTTP message")));
}
+ auto trackedData = TrackMemory(MemoryUsageTracker_, data);
+
std::vector<TSharedRef> writeRefs;
if (!HeadersFlushed_) {
HeadersFlushed_ = true;
@@ -770,9 +779,9 @@ TFuture<void> THttpOutput::Write(const TSharedRef& data)
writeRefs.push_back(CrLfBuffer());
}
- if (!data.Empty()) {
- writeRefs.push_back(GetChunkHeader(data.Size()));
- writeRefs.push_back(data);
+ if (!trackedData.Empty()) {
+ writeRefs.push_back(GetChunkHeader(trackedData.Size()));
+ writeRefs.push_back(trackedData);
writeRefs.push_back(CrLfBuffer());
}
diff --git a/yt/yt/core/http/stream.h b/yt/yt/core/http/stream.h
index a45ab15f0ff..4c11b32e11f 100644
--- a/yt/yt/core/http/stream.h
+++ b/yt/yt/core/http/stream.h
@@ -91,7 +91,8 @@ public:
const NNet::TNetworkAddress& remoteAddress,
IInvokerPtr readInvoker,
EMessageType messageType,
- THttpIOConfigPtr config);
+ THttpIOConfigPtr config,
+ IMemoryUsageTrackerPtr memoryUsageTracker = GetNullMemoryUsageTracker());
EMethod GetMethod() override;
const TUrlRef& GetUrl() override;
@@ -136,6 +137,7 @@ private:
const EMessageType MessageType_;
const THttpIOConfigPtr Config_;
const IInvokerPtr ReadInvoker_;
+ const IMemoryUsageTrackerPtr MemoryUsageTracker_;
TSharedMutableRef InputBuffer_;
TSharedRef UnconsumedData_;
@@ -183,12 +185,14 @@ public:
THeadersPtr headers,
NNet::IConnectionPtr connection,
EMessageType messageType,
- THttpIOConfigPtr config);
+ THttpIOConfigPtr config,
+ IMemoryUsageTrackerPtr memoryUsageTracker = GetNullMemoryUsageTracker());
THttpOutput(
NNet::IConnectionPtr connection,
EMessageType messageType,
- THttpIOConfigPtr config);
+ THttpIOConfigPtr config,
+ IMemoryUsageTrackerPtr memoryUsageTracker = GetNullMemoryUsageTracker());
const THeadersPtr& GetHeaders() override;
void SetHeaders(const THeadersPtr& headers);
@@ -224,6 +228,7 @@ private:
const THttpIOConfigPtr Config_;
const TClosure OnWriteFinish_;
+ const IMemoryUsageTrackerPtr MemoryUsageTracker_;
// Debug
TRequestId RequestId_;
diff --git a/yt/yt/core/https/server.cpp b/yt/yt/core/https/server.cpp
index 92bdee379f5..b97cb801e82 100644
--- a/yt/yt/core/https/server.cpp
+++ b/yt/yt/core/https/server.cpp
@@ -112,7 +112,8 @@ IServerPtr CreateServer(
const TServerConfigPtr& config,
const IPollerPtr& poller,
const IPollerPtr& acceptor,
- const IInvokerPtr& controlInvoker)
+ const IInvokerPtr& controlInvoker,
+ const IMemoryUsageTrackerPtr& memoryTracker)
{
auto sslContext = New<TSslContext>();
ApplySslConfig(sslContext, config->Credentials);
@@ -164,7 +165,8 @@ IServerPtr CreateServer(
configCopy,
tlsListener,
poller,
- acceptor);
+ acceptor,
+ memoryTracker);
return New<TServer>(std::move(httpServer), std::move(certificateUpdater));
}
diff --git a/yt/yt/core/https/server.h b/yt/yt/core/https/server.h
index c6c40eeec3d..46994e8ea49 100644
--- a/yt/yt/core/https/server.h
+++ b/yt/yt/core/https/server.h
@@ -8,6 +8,8 @@
#include <yt/yt/core/http/public.h>
+#include <yt/yt/core/misc/memory_usage_tracker.h>
+
namespace NYT::NHttps {
////////////////////////////////////////////////////////////////////////////////
@@ -26,7 +28,8 @@ NHttp::IServerPtr CreateServer(
const TServerConfigPtr& config,
const NConcurrency::IPollerPtr& poller,
const NConcurrency::IPollerPtr& acceptor,
- const IInvokerPtr& controlInvoker);
+ const IInvokerPtr& controlInvoker,
+ const IMemoryUsageTrackerPtr& memoryTracker = GetNullMemoryUsageTracker());
////////////////////////////////////////////////////////////////////////////////