aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornadya02 <nadya02@yandex-team.com>2024-09-02 20:49:36 +0300
committernadya02 <nadya02@yandex-team.com>2024-09-02 20:58:58 +0300
commit596b9c6bf6af936e655e5d86a8d2e1d8bcba81df (patch)
treedcc8eee2ffa8f99ded1054be7c7d05f3e91b43e3
parent05243a08c710e68a220b3cc6a792f24bd30a5329 (diff)
downloadydb-596b9c6bf6af936e655e5d86a8d2e1d8bcba81df.tar.gz
YT-22072: Add request dropping in http proxies when memory is overloaded
a26b0149d447c736dd6df3a186c4dc36cf58e98e
-rw-r--r--yt/yt/core/http/server.cpp179
-rw-r--r--yt/yt/core/http/server.h38
-rw-r--r--yt/yt/core/https/server.cpp10
-rw-r--r--yt/yt/core/https/server.h5
4 files changed, 143 insertions, 89 deletions
diff --git a/yt/yt/core/http/server.cpp b/yt/yt/core/http/server.cpp
index 48462c7f8d..7464803307 100644
--- a/yt/yt/core/http/server.cpp
+++ b/yt/yt/core/http/server.cpp
@@ -12,6 +12,8 @@
#include <yt/yt/core/concurrency/thread_pool_poller.h>
#include <yt/yt/core/misc/finally.h>
+#include <yt/yt/core/misc/memory_usage_tracker.h>
+#include <yt/yt/core/misc/public.h>
#include <yt/yt/core/ytree/convert.h>
@@ -56,20 +58,22 @@ class TServer
{
public:
TServer(
- const TServerConfigPtr& config,
- const IListenerPtr& listener,
- const IPollerPtr& poller,
- const IPollerPtr& acceptor,
- const IInvokerPtr& invoker,
- const IRequestPathMatcherPtr& requestPathMatcher,
+ TServerConfigPtr config,
+ IListenerPtr listener,
+ IPollerPtr poller,
+ IPollerPtr acceptor,
+ IInvokerPtr invoker,
+ IMemoryUsageTrackerPtr memoryUsageTracker,
+ IRequestPathMatcherPtr requestPathMatcher,
bool ownPoller = false)
- : Config_(config)
- , Listener_(listener)
- , Poller_(poller)
- , Acceptor_(acceptor)
- , Invoker_(invoker)
- , RequestPathMatcher_(requestPathMatcher)
+ : Config_(std::move(config))
+ , Listener_(std::move(listener))
+ , Poller_(std::move(poller))
+ , Acceptor_(std::move(acceptor))
+ , Invoker_(std::move(invoker))
+ , MemoryUsageTracker_(std::move(memoryUsageTracker))
, OwnPoller_(ownPoller)
+ , RequestPathMatcher_(std::move(requestPathMatcher))
{ }
void AddHandler(const TString& path, const IHttpHandlerPtr& handler) override
@@ -122,9 +126,10 @@ private:
const IPollerPtr Poller_;
const IPollerPtr Acceptor_;
const IInvokerPtr Invoker_;
- IRequestPathMatcherPtr RequestPathMatcher_;
+ const IMemoryUsageTrackerPtr MemoryUsageTracker_;
const bool OwnPoller_ = false;
+ IRequestPathMatcherPtr RequestPathMatcher_;
bool Started_ = false;
std::atomic<bool> Stopped_ = false;
@@ -220,6 +225,14 @@ private:
SetRequestId(response, request->GetRequestId());
+ if (MemoryUsageTracker_ && MemoryUsageTracker_->IsExceeded()) {
+ THROW_ERROR_EXCEPTION(
+ EStatusCode::TooManyRequests,
+ "Request is dropped due to high memory pressure")
+ << TErrorAttribute("total_memory_limit", MemoryUsageTracker_->GetLimit())
+ << TErrorAttribute("memory_usage", MemoryUsageTracker_->GetUsed());
+ }
+
handler->HandleRequest(request, response);
NTracing::FlushCurrentTraceContextElapsedTime();
@@ -379,36 +392,46 @@ private:
////////////////////////////////////////////////////////////////////////////////
IServerPtr CreateServer(
- const TServerConfigPtr& config,
- const IListenerPtr& listener,
- const IPollerPtr& poller,
- const IPollerPtr& acceptor,
- const IInvokerPtr& invoker,
+ TServerConfigPtr config,
+ IListenerPtr listener,
+ IPollerPtr poller,
+ IPollerPtr acceptor,
+ IInvokerPtr invoker,
+ IMemoryUsageTrackerPtr memoryUsageTracker,
bool ownPoller)
{
auto handlers = New<TRequestPathMatcher>();
return New<TServer>(
- config,
- listener,
- poller,
- acceptor,
- invoker,
- handlers,
+ std::move(config),
+ std::move(listener),
+ std::move(poller),
+ std::move(acceptor),
+ std::move(invoker),
+ std::move(memoryUsageTracker),
+ std::move(handlers),
ownPoller);
}
IServerPtr CreateServer(
- const TServerConfigPtr& config,
- const IPollerPtr& poller,
- const IPollerPtr& acceptor,
- const IInvokerPtr& invoker,
+ TServerConfigPtr config,
+ IPollerPtr poller,
+ IPollerPtr acceptor,
+ IInvokerPtr invoker,
+ IMemoryUsageTrackerPtr memoryUsageTracker,
bool ownPoller)
{
auto address = TNetworkAddress::CreateIPv6Any(config->Port);
for (int i = 0;; ++i) {
try {
auto listener = CreateListener(address, poller, acceptor, config->MaxBacklogSize);
- return CreateServer(config, listener, poller, acceptor, invoker, ownPoller);
+ return CreateServer(
+ std::move(config),
+ std::move(listener),
+ std::move(poller),
+ std::move(acceptor),
+ std::move(invoker),
+ std::move(memoryUsageTracker),
+ ownPoller);
} catch (const std::exception& ex) {
if (i + 1 == config->BindRetryCount) {
throw;
@@ -425,80 +448,98 @@ IServerPtr CreateServer(
////////////////////////////////////////////////////////////////////////////////
IServerPtr CreateServer(
- const TServerConfigPtr& config,
- const IListenerPtr& listener,
- const IPollerPtr& poller)
+ TServerConfigPtr config,
+ IListenerPtr listener,
+ IPollerPtr poller)
{
+ auto acceptor = poller;
+ auto invoker = poller->GetInvoker();
return CreateServer(
- config,
- listener,
- poller,
- poller,
- poller->GetInvoker(),
+ std::move(config),
+ std::move(listener),
+ std::move(poller),
+ std::move(acceptor),
+ std::move(invoker),
+ /*memoryUsageTracker*/ GetNullMemoryUsageTracker(),
/*ownPoller*/ false);
}
IServerPtr CreateServer(
- const TServerConfigPtr& config,
- const IListenerPtr& listener,
- const IPollerPtr& poller,
- const IPollerPtr& acceptor)
+ TServerConfigPtr config,
+ IListenerPtr listener,
+ IPollerPtr poller,
+ IPollerPtr acceptor,
+ IMemoryUsageTrackerPtr memoryUsageTracker)
{
+ auto invoker = poller->GetInvoker();
return CreateServer(
- config,
- listener,
- poller,
- acceptor,
- poller->GetInvoker(),
+ std::move(config),
+ std::move(listener),
+ std::move(poller),
+ std::move(acceptor),
+ std::move(invoker),
+ std::move(memoryUsageTracker),
/*ownPoller*/ false);
}
-IServerPtr CreateServer(const TServerConfigPtr& config, const IPollerPtr& poller, const IPollerPtr& acceptor)
+IServerPtr CreateServer(
+ TServerConfigPtr config,
+ IPollerPtr poller,
+ IPollerPtr acceptor,
+ IMemoryUsageTrackerPtr memoryUsageTracker)
{
+ auto invoker = poller->GetInvoker();
return CreateServer(
- config,
- poller,
- acceptor,
- poller->GetInvoker(),
+ std::move(config),
+ std::move(poller),
+ std::move(acceptor),
+ std::move(invoker),
+ std::move(memoryUsageTracker),
/*ownPoller*/ false);
}
-IServerPtr CreateServer(const TServerConfigPtr& config, const IPollerPtr& poller)
+IServerPtr CreateServer(TServerConfigPtr config, IPollerPtr poller)
{
+ auto acceptor = poller;
return CreateServer(
- config,
- poller,
- poller);
+ std::move(config),
+ std::move(poller),
+ std::move(acceptor));
}
-IServerPtr CreateServer(int port, const IPollerPtr& poller)
+IServerPtr CreateServer(int port, IPollerPtr poller)
{
auto config = New<TServerConfig>();
config->Port = port;
- return CreateServer(config, poller);
+ return CreateServer(std::move(config), std::move(poller));
}
-IServerPtr CreateServer(const TServerConfigPtr& config, int pollerThreadCount)
+IServerPtr CreateServer(TServerConfigPtr config, int pollerThreadCount)
{
auto poller = CreateThreadPoolPoller(pollerThreadCount, config->ServerName);
+ auto acceptor = poller;
+ auto invoker = poller->GetInvoker();
return CreateServer(
- config,
- poller,
- poller,
- poller->GetInvoker(),
+ std::move(config),
+ std::move(poller),
+ std::move(acceptor),
+ std::move(invoker),
+ /*memoryUsageTracker*/ GetNullMemoryUsageTracker(),
/*ownPoller*/ true);
}
IServerPtr CreateServer(
- const TServerConfigPtr& config,
- const NConcurrency::IPollerPtr& poller,
- const IInvokerPtr& invoker)
+ TServerConfigPtr config,
+ NConcurrency::IPollerPtr poller,
+ IInvokerPtr invoker)
{
+ auto acceptor = poller;
return CreateServer(
- config,
- poller,
- poller,
- invoker,
+ std::move(config),
+ std::move(poller),
+ std::move(acceptor),
+ std::move(invoker),
+ /*memoryUsageTracker*/ GetNullMemoryUsageTracker(),
/*ownPoller*/ false);
}
diff --git a/yt/yt/core/http/server.h b/yt/yt/core/http/server.h
index 3006f0333f..171fb70399 100644
--- a/yt/yt/core/http/server.h
+++ b/yt/yt/core/http/server.h
@@ -9,6 +9,8 @@
#include <yt/yt/core/actions/future.h>
+#include <yt/yt/core/misc/public.h>
+
#include <library/cpp/yt/memory/ref.h>
namespace NYT::NHttp {
@@ -80,31 +82,33 @@ DEFINE_REFCOUNTED_TYPE(IServer)
////////////////////////////////////////////////////////////////////////////////
IServerPtr CreateServer(
- const TServerConfigPtr& config,
- const NNet::IListenerPtr& listener,
- const NConcurrency::IPollerPtr& poller);
+ TServerConfigPtr config,
+ NNet::IListenerPtr listener,
+ NConcurrency::IPollerPtr poller);
IServerPtr CreateServer(
- const TServerConfigPtr& config,
- const NNet::IListenerPtr& listener,
- const NConcurrency::IPollerPtr& poller,
- const NConcurrency::IPollerPtr& acceptor);
+ TServerConfigPtr config,
+ NNet::IListenerPtr listener,
+ NConcurrency::IPollerPtr poller,
+ NConcurrency::IPollerPtr acceptor,
+ IMemoryUsageTrackerPtr memoryTracker = GetNullMemoryUsageTracker());
IServerPtr CreateServer(
- const TServerConfigPtr& config,
- const NConcurrency::IPollerPtr& poller);
+ TServerConfigPtr config,
+ NConcurrency::IPollerPtr poller);
IServerPtr CreateServer(
- const TServerConfigPtr& config,
- const NConcurrency::IPollerPtr& poller,
- const NConcurrency::IPollerPtr& acceptor);
+ TServerConfigPtr config,
+ NConcurrency::IPollerPtr poller,
+ NConcurrency::IPollerPtr acceptor,
+ IMemoryUsageTrackerPtr memoryTracker = GetNullMemoryUsageTracker());
IServerPtr CreateServer(
int port,
- const NConcurrency::IPollerPtr& poller);
+ NConcurrency::IPollerPtr poller);
IServerPtr CreateServer(
- const TServerConfigPtr& config,
+ TServerConfigPtr config,
int pollerThreadCount = 1);
IServerPtr CreateServer(
- const TServerConfigPtr& config,
- const NConcurrency::IPollerPtr& poller,
- const IInvokerPtr& invoker);
+ TServerConfigPtr config,
+ NConcurrency::IPollerPtr poller,
+ IInvokerPtr invoker);
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/https/server.cpp b/yt/yt/core/https/server.cpp
index 06a34a7176..b97cb801e8 100644
--- a/yt/yt/core/https/server.cpp
+++ b/yt/yt/core/https/server.cpp
@@ -112,7 +112,8 @@ IServerPtr CreateServer(
const TServerConfigPtr& config,
const IPollerPtr& poller,
const IPollerPtr& acceptor,
- const IInvokerPtr& controlInvoker)
+ const IInvokerPtr& controlInvoker,
+ const IMemoryUsageTrackerPtr& memoryTracker)
{
auto sslContext = New<TSslContext>();
ApplySslConfig(sslContext, config->Credentials);
@@ -160,7 +161,12 @@ IServerPtr CreateServer(
auto configCopy = CloneYsonStruct(config);
configCopy->IsHttps = true;
- auto httpServer = NHttp::CreateServer(configCopy, tlsListener, poller, acceptor);
+ auto httpServer = NHttp::CreateServer(
+ configCopy,
+ tlsListener,
+ poller,
+ acceptor,
+ memoryTracker);
return New<TServer>(std::move(httpServer), std::move(certificateUpdater));
}
diff --git a/yt/yt/core/https/server.h b/yt/yt/core/https/server.h
index c6c40eeec3..46994e8ea4 100644
--- a/yt/yt/core/https/server.h
+++ b/yt/yt/core/https/server.h
@@ -8,6 +8,8 @@
#include <yt/yt/core/http/public.h>
+#include <yt/yt/core/misc/memory_usage_tracker.h>
+
namespace NYT::NHttps {
////////////////////////////////////////////////////////////////////////////////
@@ -26,7 +28,8 @@ NHttp::IServerPtr CreateServer(
const TServerConfigPtr& config,
const NConcurrency::IPollerPtr& poller,
const NConcurrency::IPollerPtr& acceptor,
- const IInvokerPtr& controlInvoker);
+ const IInvokerPtr& controlInvoker,
+ const IMemoryUsageTrackerPtr& memoryTracker = GetNullMemoryUsageTracker());
////////////////////////////////////////////////////////////////////////////////