diff options
author | nadya02 <nadya02@yandex-team.com> | 2024-09-02 20:49:36 +0300 |
---|---|---|
committer | nadya02 <nadya02@yandex-team.com> | 2024-09-02 20:58:58 +0300 |
commit | 596b9c6bf6af936e655e5d86a8d2e1d8bcba81df (patch) | |
tree | dcc8eee2ffa8f99ded1054be7c7d05f3e91b43e3 | |
parent | 05243a08c710e68a220b3cc6a792f24bd30a5329 (diff) | |
download | ydb-596b9c6bf6af936e655e5d86a8d2e1d8bcba81df.tar.gz |
YT-22072: Add request dropping in http proxies when memory is overloaded
a26b0149d447c736dd6df3a186c4dc36cf58e98e
-rw-r--r-- | yt/yt/core/http/server.cpp | 179 | ||||
-rw-r--r-- | yt/yt/core/http/server.h | 38 | ||||
-rw-r--r-- | yt/yt/core/https/server.cpp | 10 | ||||
-rw-r--r-- | yt/yt/core/https/server.h | 5 |
4 files changed, 143 insertions, 89 deletions
diff --git a/yt/yt/core/http/server.cpp b/yt/yt/core/http/server.cpp index 48462c7f8d..7464803307 100644 --- a/yt/yt/core/http/server.cpp +++ b/yt/yt/core/http/server.cpp @@ -12,6 +12,8 @@ #include <yt/yt/core/concurrency/thread_pool_poller.h> #include <yt/yt/core/misc/finally.h> +#include <yt/yt/core/misc/memory_usage_tracker.h> +#include <yt/yt/core/misc/public.h> #include <yt/yt/core/ytree/convert.h> @@ -56,20 +58,22 @@ class TServer { public: TServer( - const TServerConfigPtr& config, - const IListenerPtr& listener, - const IPollerPtr& poller, - const IPollerPtr& acceptor, - const IInvokerPtr& invoker, - const IRequestPathMatcherPtr& requestPathMatcher, + TServerConfigPtr config, + IListenerPtr listener, + IPollerPtr poller, + IPollerPtr acceptor, + IInvokerPtr invoker, + IMemoryUsageTrackerPtr memoryUsageTracker, + IRequestPathMatcherPtr requestPathMatcher, bool ownPoller = false) - : Config_(config) - , Listener_(listener) - , Poller_(poller) - , Acceptor_(acceptor) - , Invoker_(invoker) - , RequestPathMatcher_(requestPathMatcher) + : Config_(std::move(config)) + , Listener_(std::move(listener)) + , Poller_(std::move(poller)) + , Acceptor_(std::move(acceptor)) + , Invoker_(std::move(invoker)) + , MemoryUsageTracker_(std::move(memoryUsageTracker)) , OwnPoller_(ownPoller) + , RequestPathMatcher_(std::move(requestPathMatcher)) { } void AddHandler(const TString& path, const IHttpHandlerPtr& handler) override @@ -122,9 +126,10 @@ private: const IPollerPtr Poller_; const IPollerPtr Acceptor_; const IInvokerPtr Invoker_; - IRequestPathMatcherPtr RequestPathMatcher_; + const IMemoryUsageTrackerPtr MemoryUsageTracker_; const bool OwnPoller_ = false; + IRequestPathMatcherPtr RequestPathMatcher_; bool Started_ = false; std::atomic<bool> Stopped_ = false; @@ -220,6 +225,14 @@ private: SetRequestId(response, request->GetRequestId()); + if (MemoryUsageTracker_ && MemoryUsageTracker_->IsExceeded()) { + THROW_ERROR_EXCEPTION( + EStatusCode::TooManyRequests, + "Request is dropped due to high memory pressure") + << TErrorAttribute("total_memory_limit", MemoryUsageTracker_->GetLimit()) + << TErrorAttribute("memory_usage", MemoryUsageTracker_->GetUsed()); + } + handler->HandleRequest(request, response); NTracing::FlushCurrentTraceContextElapsedTime(); @@ -379,36 +392,46 @@ private: //////////////////////////////////////////////////////////////////////////////// IServerPtr CreateServer( - const TServerConfigPtr& config, - const IListenerPtr& listener, - const IPollerPtr& poller, - const IPollerPtr& acceptor, - const IInvokerPtr& invoker, + TServerConfigPtr config, + IListenerPtr listener, + IPollerPtr poller, + IPollerPtr acceptor, + IInvokerPtr invoker, + IMemoryUsageTrackerPtr memoryUsageTracker, bool ownPoller) { auto handlers = New<TRequestPathMatcher>(); return New<TServer>( - config, - listener, - poller, - acceptor, - invoker, - handlers, + std::move(config), + std::move(listener), + std::move(poller), + std::move(acceptor), + std::move(invoker), + std::move(memoryUsageTracker), + std::move(handlers), ownPoller); } IServerPtr CreateServer( - const TServerConfigPtr& config, - const IPollerPtr& poller, - const IPollerPtr& acceptor, - const IInvokerPtr& invoker, + TServerConfigPtr config, + IPollerPtr poller, + IPollerPtr acceptor, + IInvokerPtr invoker, + IMemoryUsageTrackerPtr memoryUsageTracker, bool ownPoller) { auto address = TNetworkAddress::CreateIPv6Any(config->Port); for (int i = 0;; ++i) { try { auto listener = CreateListener(address, poller, acceptor, config->MaxBacklogSize); - return CreateServer(config, listener, poller, acceptor, invoker, ownPoller); + return CreateServer( + std::move(config), + std::move(listener), + std::move(poller), + std::move(acceptor), + std::move(invoker), + std::move(memoryUsageTracker), + ownPoller); } catch (const std::exception& ex) { if (i + 1 == config->BindRetryCount) { throw; @@ -425,80 +448,98 @@ IServerPtr CreateServer( //////////////////////////////////////////////////////////////////////////////// IServerPtr CreateServer( - const TServerConfigPtr& config, - const IListenerPtr& listener, - const IPollerPtr& poller) + TServerConfigPtr config, + IListenerPtr listener, + IPollerPtr poller) { + auto acceptor = poller; + auto invoker = poller->GetInvoker(); return CreateServer( - config, - listener, - poller, - poller, - poller->GetInvoker(), + std::move(config), + std::move(listener), + std::move(poller), + std::move(acceptor), + std::move(invoker), + /*memoryUsageTracker*/ GetNullMemoryUsageTracker(), /*ownPoller*/ false); } IServerPtr CreateServer( - const TServerConfigPtr& config, - const IListenerPtr& listener, - const IPollerPtr& poller, - const IPollerPtr& acceptor) + TServerConfigPtr config, + IListenerPtr listener, + IPollerPtr poller, + IPollerPtr acceptor, + IMemoryUsageTrackerPtr memoryUsageTracker) { + auto invoker = poller->GetInvoker(); return CreateServer( - config, - listener, - poller, - acceptor, - poller->GetInvoker(), + std::move(config), + std::move(listener), + std::move(poller), + std::move(acceptor), + std::move(invoker), + std::move(memoryUsageTracker), /*ownPoller*/ false); } -IServerPtr CreateServer(const TServerConfigPtr& config, const IPollerPtr& poller, const IPollerPtr& acceptor) +IServerPtr CreateServer( + TServerConfigPtr config, + IPollerPtr poller, + IPollerPtr acceptor, + IMemoryUsageTrackerPtr memoryUsageTracker) { + auto invoker = poller->GetInvoker(); return CreateServer( - config, - poller, - acceptor, - poller->GetInvoker(), + std::move(config), + std::move(poller), + std::move(acceptor), + std::move(invoker), + std::move(memoryUsageTracker), /*ownPoller*/ false); } -IServerPtr CreateServer(const TServerConfigPtr& config, const IPollerPtr& poller) +IServerPtr CreateServer(TServerConfigPtr config, IPollerPtr poller) { + auto acceptor = poller; return CreateServer( - config, - poller, - poller); + std::move(config), + std::move(poller), + std::move(acceptor)); } -IServerPtr CreateServer(int port, const IPollerPtr& poller) +IServerPtr CreateServer(int port, IPollerPtr poller) { auto config = New<TServerConfig>(); config->Port = port; - return CreateServer(config, poller); + return CreateServer(std::move(config), std::move(poller)); } -IServerPtr CreateServer(const TServerConfigPtr& config, int pollerThreadCount) +IServerPtr CreateServer(TServerConfigPtr config, int pollerThreadCount) { auto poller = CreateThreadPoolPoller(pollerThreadCount, config->ServerName); + auto acceptor = poller; + auto invoker = poller->GetInvoker(); return CreateServer( - config, - poller, - poller, - poller->GetInvoker(), + std::move(config), + std::move(poller), + std::move(acceptor), + std::move(invoker), + /*memoryUsageTracker*/ GetNullMemoryUsageTracker(), /*ownPoller*/ true); } IServerPtr CreateServer( - const TServerConfigPtr& config, - const NConcurrency::IPollerPtr& poller, - const IInvokerPtr& invoker) + TServerConfigPtr config, + NConcurrency::IPollerPtr poller, + IInvokerPtr invoker) { + auto acceptor = poller; return CreateServer( - config, - poller, - poller, - invoker, + std::move(config), + std::move(poller), + std::move(acceptor), + std::move(invoker), + /*memoryUsageTracker*/ GetNullMemoryUsageTracker(), /*ownPoller*/ false); } diff --git a/yt/yt/core/http/server.h b/yt/yt/core/http/server.h index 3006f0333f..171fb70399 100644 --- a/yt/yt/core/http/server.h +++ b/yt/yt/core/http/server.h @@ -9,6 +9,8 @@ #include <yt/yt/core/actions/future.h> +#include <yt/yt/core/misc/public.h> + #include <library/cpp/yt/memory/ref.h> namespace NYT::NHttp { @@ -80,31 +82,33 @@ DEFINE_REFCOUNTED_TYPE(IServer) //////////////////////////////////////////////////////////////////////////////// IServerPtr CreateServer( - const TServerConfigPtr& config, - const NNet::IListenerPtr& listener, - const NConcurrency::IPollerPtr& poller); + TServerConfigPtr config, + NNet::IListenerPtr listener, + NConcurrency::IPollerPtr poller); IServerPtr CreateServer( - const TServerConfigPtr& config, - const NNet::IListenerPtr& listener, - const NConcurrency::IPollerPtr& poller, - const NConcurrency::IPollerPtr& acceptor); + TServerConfigPtr config, + NNet::IListenerPtr listener, + NConcurrency::IPollerPtr poller, + NConcurrency::IPollerPtr acceptor, + IMemoryUsageTrackerPtr memoryTracker = GetNullMemoryUsageTracker()); IServerPtr CreateServer( - const TServerConfigPtr& config, - const NConcurrency::IPollerPtr& poller); + TServerConfigPtr config, + NConcurrency::IPollerPtr poller); IServerPtr CreateServer( - const TServerConfigPtr& config, - const NConcurrency::IPollerPtr& poller, - const NConcurrency::IPollerPtr& acceptor); + TServerConfigPtr config, + NConcurrency::IPollerPtr poller, + NConcurrency::IPollerPtr acceptor, + IMemoryUsageTrackerPtr memoryTracker = GetNullMemoryUsageTracker()); IServerPtr CreateServer( int port, - const NConcurrency::IPollerPtr& poller); + NConcurrency::IPollerPtr poller); IServerPtr CreateServer( - const TServerConfigPtr& config, + TServerConfigPtr config, int pollerThreadCount = 1); IServerPtr CreateServer( - const TServerConfigPtr& config, - const NConcurrency::IPollerPtr& poller, - const IInvokerPtr& invoker); + TServerConfigPtr config, + NConcurrency::IPollerPtr poller, + IInvokerPtr invoker); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/https/server.cpp b/yt/yt/core/https/server.cpp index 06a34a7176..b97cb801e8 100644 --- a/yt/yt/core/https/server.cpp +++ b/yt/yt/core/https/server.cpp @@ -112,7 +112,8 @@ IServerPtr CreateServer( const TServerConfigPtr& config, const IPollerPtr& poller, const IPollerPtr& acceptor, - const IInvokerPtr& controlInvoker) + const IInvokerPtr& controlInvoker, + const IMemoryUsageTrackerPtr& memoryTracker) { auto sslContext = New<TSslContext>(); ApplySslConfig(sslContext, config->Credentials); @@ -160,7 +161,12 @@ IServerPtr CreateServer( auto configCopy = CloneYsonStruct(config); configCopy->IsHttps = true; - auto httpServer = NHttp::CreateServer(configCopy, tlsListener, poller, acceptor); + auto httpServer = NHttp::CreateServer( + configCopy, + tlsListener, + poller, + acceptor, + memoryTracker); return New<TServer>(std::move(httpServer), std::move(certificateUpdater)); } diff --git a/yt/yt/core/https/server.h b/yt/yt/core/https/server.h index c6c40eeec3..46994e8ea4 100644 --- a/yt/yt/core/https/server.h +++ b/yt/yt/core/https/server.h @@ -8,6 +8,8 @@ #include <yt/yt/core/http/public.h> +#include <yt/yt/core/misc/memory_usage_tracker.h> + namespace NYT::NHttps { //////////////////////////////////////////////////////////////////////////////// @@ -26,7 +28,8 @@ NHttp::IServerPtr CreateServer( const TServerConfigPtr& config, const NConcurrency::IPollerPtr& poller, const NConcurrency::IPollerPtr& acceptor, - const IInvokerPtr& controlInvoker); + const IInvokerPtr& controlInvoker, + const IMemoryUsageTrackerPtr& memoryTracker = GetNullMemoryUsageTracker()); //////////////////////////////////////////////////////////////////////////////// |