diff options
author | nadya02 <nadya02@yandex-team.com> | 2025-03-03 10:14:47 +0300 |
---|---|---|
committer | nadya02 <nadya02@yandex-team.com> | 2025-03-03 10:47:54 +0300 |
commit | 71a0af2180b56d33bf226059c660cd003f922fbc (patch) | |
tree | cef6e879c187a4e6f484f6e73f8469963c294fe9 | |
parent | 34c355040f7a0148f89890c2398aada57d5adb97 (diff) | |
download | ydb-71a0af2180b56d33bf226059c660cd003f922fbc.tar.gz |
YT-23989: Add memory tracker for http category
* Changelog entry
Type: feature
Component: proxy
Add memory tracker for http user traffic.
commit_hash:713e2e8f8c95bcd6a67cd99bf5799e1b925e7bf6
-rw-r--r-- | yt/yt/core/http/server.cpp | 43 | ||||
-rw-r--r-- | yt/yt/core/http/server.h | 6 | ||||
-rw-r--r-- | yt/yt/core/http/stream.cpp | 25 | ||||
-rw-r--r-- | yt/yt/core/http/stream.h | 11 | ||||
-rw-r--r-- | yt/yt/core/https/server.cpp | 6 | ||||
-rw-r--r-- | yt/yt/core/https/server.h | 5 |
6 files changed, 68 insertions, 28 deletions
diff --git a/yt/yt/core/http/server.cpp b/yt/yt/core/http/server.cpp index d2fda153b53..43ca58c7958 100644 --- a/yt/yt/core/http/server.cpp +++ b/yt/yt/core/http/server.cpp @@ -12,6 +12,7 @@ #include <yt/yt/core/concurrency/thread_pool_poller.h> #include <yt/yt/core/misc/finally.h> +#include <yt/yt/core/misc/memory_usage_tracker.h> #include <yt/yt/core/misc/public.h> #include <yt/yt/core/ytree/convert.h> @@ -62,6 +63,7 @@ public: IPollerPtr poller, IPollerPtr acceptor, IInvokerPtr invoker, + IMemoryUsageTrackerPtr memoryUsageTracker, IRequestPathMatcherPtr requestPathMatcher, bool ownPoller = false) : Config_(std::move(config)) @@ -69,6 +71,7 @@ public: , Poller_(std::move(poller)) , Acceptor_(std::move(acceptor)) , Invoker_(std::move(invoker)) + , MemoryUsageTracker_(std::move(memoryUsageTracker)) , OwnPoller_(ownPoller) , RequestPathMatcher_(std::move(requestPathMatcher)) { } @@ -123,6 +126,7 @@ private: const IPollerPtr Poller_; const IPollerPtr Acceptor_; const IInvokerPtr Invoker_; + const IMemoryUsageTrackerPtr MemoryUsageTracker_; const bool OwnPoller_ = false; IRequestPathMatcherPtr RequestPathMatcher_; @@ -291,7 +295,8 @@ private: connection->GetRemoteAddress(), GetCurrentInvoker(), EMessageType::Request, - Config_); + Config_, + MemoryUsageTracker_); if (Config_->IsHttps) { request->SetHttps(); @@ -302,7 +307,8 @@ private: auto response = New<THttpOutput>( connection, EMessageType::Response, - Config_); + Config_, + MemoryUsageTracker_); while (true) { auto requestId = TRequestId::Create(); @@ -381,6 +387,7 @@ IServerPtr CreateServer( IPollerPtr poller, IPollerPtr acceptor, IInvokerPtr invoker, + IMemoryUsageTrackerPtr memoryUsageTracker, bool ownPoller) { auto handlers = New<TRequestPathMatcher>(); @@ -390,6 +397,7 @@ IServerPtr CreateServer( std::move(poller), std::move(acceptor), std::move(invoker), + std::move(memoryUsageTracker), std::move(handlers), ownPoller); } @@ -399,19 +407,15 @@ IServerPtr CreateServer( IPollerPtr poller, IPollerPtr acceptor, IInvokerPtr invoker, + IMemoryUsageTrackerPtr memoryUsageTracker, bool ownPoller) { auto address = TNetworkAddress::CreateIPv6Any(config->Port); + IListenerPtr listener; for (int i = 0;; ++i) { try { - auto listener = CreateListener(address, poller, acceptor, config->MaxBacklogSize); - return CreateServer( - std::move(config), - std::move(listener), - std::move(poller), - std::move(acceptor), - std::move(invoker), - ownPoller); + listener = CreateListener(address, poller, acceptor, config->MaxBacklogSize); + break; } catch (const std::exception& ex) { if (i + 1 == config->BindRetryCount) { throw; @@ -421,6 +425,14 @@ IServerPtr CreateServer( } } } + return CreateServer( + std::move(config), + std::move(listener), + std::move(poller), + std::move(acceptor), + std::move(invoker), + std::move(memoryUsageTracker), + ownPoller); } } // namespace @@ -440,6 +452,7 @@ IServerPtr CreateServer( std::move(poller), std::move(acceptor), std::move(invoker), + /*memoryUsageTracker*/ GetNullMemoryUsageTracker(), /*ownPoller*/ false); } @@ -447,7 +460,8 @@ IServerPtr CreateServer( TServerConfigPtr config, IListenerPtr listener, IPollerPtr poller, - IPollerPtr acceptor) + IPollerPtr acceptor, + IMemoryUsageTrackerPtr memoryUsageTracker) { auto invoker = poller->GetInvoker(); return CreateServer( @@ -456,13 +470,15 @@ IServerPtr CreateServer( std::move(poller), std::move(acceptor), std::move(invoker), + std::move(memoryUsageTracker), /*ownPoller*/ false); } IServerPtr CreateServer( TServerConfigPtr config, IPollerPtr poller, - IPollerPtr acceptor) + IPollerPtr acceptor, + IMemoryUsageTrackerPtr memoryUsageTracker) { auto invoker = poller->GetInvoker(); return CreateServer( @@ -470,6 +486,7 @@ IServerPtr CreateServer( std::move(poller), std::move(acceptor), std::move(invoker), + std::move(memoryUsageTracker), /*ownPoller*/ false); } @@ -499,6 +516,7 @@ IServerPtr CreateServer(TServerConfigPtr config, int pollerThreadCount) std::move(poller), std::move(acceptor), std::move(invoker), + /*memoryUsageTracker*/ GetNullMemoryUsageTracker(), /*ownPoller*/ true); } @@ -513,6 +531,7 @@ IServerPtr CreateServer( std::move(poller), std::move(acceptor), std::move(invoker), + /*memoryUsageTracker*/ GetNullMemoryUsageTracker(), /*ownPoller*/ false); } diff --git a/yt/yt/core/http/server.h b/yt/yt/core/http/server.h index e96720981b9..171fb70399d 100644 --- a/yt/yt/core/http/server.h +++ b/yt/yt/core/http/server.h @@ -89,14 +89,16 @@ IServerPtr CreateServer( TServerConfigPtr config, NNet::IListenerPtr listener, NConcurrency::IPollerPtr poller, - NConcurrency::IPollerPtr acceptor); + NConcurrency::IPollerPtr acceptor, + IMemoryUsageTrackerPtr memoryTracker = GetNullMemoryUsageTracker()); IServerPtr CreateServer( TServerConfigPtr config, NConcurrency::IPollerPtr poller); IServerPtr CreateServer( TServerConfigPtr config, NConcurrency::IPollerPtr poller, - NConcurrency::IPollerPtr acceptor); + NConcurrency::IPollerPtr acceptor, + IMemoryUsageTrackerPtr memoryTracker = GetNullMemoryUsageTracker()); IServerPtr CreateServer( int port, NConcurrency::IPollerPtr poller); diff --git a/yt/yt/core/http/stream.cpp b/yt/yt/core/http/stream.cpp index ec76d4e845d..7f059df8411 100644 --- a/yt/yt/core/http/stream.cpp +++ b/yt/yt/core/http/stream.cpp @@ -252,12 +252,14 @@ THttpInput::THttpInput( const TNetworkAddress& remoteAddress, IInvokerPtr readInvoker, EMessageType messageType, - THttpIOConfigPtr config) + THttpIOConfigPtr config, + IMemoryUsageTrackerPtr memoryUsageTracker) : Connection_(std::move(connection)) , RemoteAddress_(remoteAddress) , MessageType_(messageType) , Config_(std::move(config)) , ReadInvoker_(std::move(readInvoker)) + , MemoryUsageTracker_(std::move(memoryUsageTracker)) , InputBuffer_(TSharedMutableRef::Allocate<THttpParserTag>(Config_->ReadBufferSize)) , Parser_(messageType == EMessageType::Request ? HTTP_REQUEST : HTTP_RESPONSE) , StartByteCount_(Connection_->GetReadByteCount()) @@ -514,7 +516,8 @@ TSharedRef THttpInput::DoRead() auto chunk = Parser_.GetLastBodyChunk(); if (!chunk.Empty()) { Connection_->SetReadDeadline(std::nullopt); - return chunk; + auto trackedChunk = TrackMemory(MemoryUsageTracker_, chunk); + return trackedChunk; } bool eof = false; @@ -576,11 +579,13 @@ THttpOutput::THttpOutput( THeadersPtr headers, IConnectionPtr connection, EMessageType messageType, - THttpIOConfigPtr config) + THttpIOConfigPtr config, + IMemoryUsageTrackerPtr memoryUsageTracker) : Connection_(std::move(connection)) , MessageType_(messageType) , Config_(std::move(config)) , OnWriteFinish_(BIND_NO_PROPAGATE(&THttpOutput::OnWriteFinish, MakeWeak(this))) + , MemoryUsageTracker_(std::move(memoryUsageTracker)) , StartByteCount_(Connection_->GetWriteByteCount()) , StartStatistics_(Connection_->GetWriteStatistics()) , LastProgressLogTime_(TInstant::Now()) @@ -590,12 +595,14 @@ THttpOutput::THttpOutput( THttpOutput::THttpOutput( IConnectionPtr connection, EMessageType messageType, - THttpIOConfigPtr config) + THttpIOConfigPtr config, + IMemoryUsageTrackerPtr memoryUsageTracker) : THttpOutput( New<THeaders>(), std::move(connection), messageType, - std::move(config)) + std::move(config), + std::move(memoryUsageTracker)) { } const THeadersPtr& THttpOutput::GetHeaders() @@ -763,6 +770,8 @@ TFuture<void> THttpOutput::Write(const TSharedRef& data) THROW_ERROR(AnnotateError(TError("Cannot write to finished HTTP message"))); } + auto trackedData = TrackMemory(MemoryUsageTracker_, data); + std::vector<TSharedRef> writeRefs; if (!HeadersFlushed_) { HeadersFlushed_ = true; @@ -770,9 +779,9 @@ TFuture<void> THttpOutput::Write(const TSharedRef& data) writeRefs.push_back(CrLfBuffer()); } - if (!data.Empty()) { - writeRefs.push_back(GetChunkHeader(data.Size())); - writeRefs.push_back(data); + if (!trackedData.Empty()) { + writeRefs.push_back(GetChunkHeader(trackedData.Size())); + writeRefs.push_back(trackedData); writeRefs.push_back(CrLfBuffer()); } diff --git a/yt/yt/core/http/stream.h b/yt/yt/core/http/stream.h index a45ab15f0ff..4c11b32e11f 100644 --- a/yt/yt/core/http/stream.h +++ b/yt/yt/core/http/stream.h @@ -91,7 +91,8 @@ public: const NNet::TNetworkAddress& remoteAddress, IInvokerPtr readInvoker, EMessageType messageType, - THttpIOConfigPtr config); + THttpIOConfigPtr config, + IMemoryUsageTrackerPtr memoryUsageTracker = GetNullMemoryUsageTracker()); EMethod GetMethod() override; const TUrlRef& GetUrl() override; @@ -136,6 +137,7 @@ private: const EMessageType MessageType_; const THttpIOConfigPtr Config_; const IInvokerPtr ReadInvoker_; + const IMemoryUsageTrackerPtr MemoryUsageTracker_; TSharedMutableRef InputBuffer_; TSharedRef UnconsumedData_; @@ -183,12 +185,14 @@ public: THeadersPtr headers, NNet::IConnectionPtr connection, EMessageType messageType, - THttpIOConfigPtr config); + THttpIOConfigPtr config, + IMemoryUsageTrackerPtr memoryUsageTracker = GetNullMemoryUsageTracker()); THttpOutput( NNet::IConnectionPtr connection, EMessageType messageType, - THttpIOConfigPtr config); + THttpIOConfigPtr config, + IMemoryUsageTrackerPtr memoryUsageTracker = GetNullMemoryUsageTracker()); const THeadersPtr& GetHeaders() override; void SetHeaders(const THeadersPtr& headers); @@ -224,6 +228,7 @@ private: const THttpIOConfigPtr Config_; const TClosure OnWriteFinish_; + const IMemoryUsageTrackerPtr MemoryUsageTracker_; // Debug TRequestId RequestId_; diff --git a/yt/yt/core/https/server.cpp b/yt/yt/core/https/server.cpp index 92bdee379f5..b97cb801e82 100644 --- a/yt/yt/core/https/server.cpp +++ b/yt/yt/core/https/server.cpp @@ -112,7 +112,8 @@ IServerPtr CreateServer( const TServerConfigPtr& config, const IPollerPtr& poller, const IPollerPtr& acceptor, - const IInvokerPtr& controlInvoker) + const IInvokerPtr& controlInvoker, + const IMemoryUsageTrackerPtr& memoryTracker) { auto sslContext = New<TSslContext>(); ApplySslConfig(sslContext, config->Credentials); @@ -164,7 +165,8 @@ IServerPtr CreateServer( configCopy, tlsListener, poller, - acceptor); + acceptor, + memoryTracker); return New<TServer>(std::move(httpServer), std::move(certificateUpdater)); } diff --git a/yt/yt/core/https/server.h b/yt/yt/core/https/server.h index c6c40eeec3d..46994e8ea49 100644 --- a/yt/yt/core/https/server.h +++ b/yt/yt/core/https/server.h @@ -8,6 +8,8 @@ #include <yt/yt/core/http/public.h> +#include <yt/yt/core/misc/memory_usage_tracker.h> + namespace NYT::NHttps { //////////////////////////////////////////////////////////////////////////////// @@ -26,7 +28,8 @@ NHttp::IServerPtr CreateServer( const TServerConfigPtr& config, const NConcurrency::IPollerPtr& poller, const NConcurrency::IPollerPtr& acceptor, - const IInvokerPtr& controlInvoker); + const IInvokerPtr& controlInvoker, + const IMemoryUsageTrackerPtr& memoryTracker = GetNullMemoryUsageTracker()); //////////////////////////////////////////////////////////////////////////////// |