summaryrefslogtreecommitdiffstats
path: root/library/cpp/http
diff options
context:
space:
mode:
Diffstat (limited to 'library/cpp/http')
-rw-r--r--library/cpp/http/server/http.cpp98
-rw-r--r--library/cpp/http/server/http_ut.cpp27
-rw-r--r--library/cpp/http/server/options.h7
3 files changed, 108 insertions, 24 deletions
diff --git a/library/cpp/http/server/http.cpp b/library/cpp/http/server/http.cpp
index e43b8415c83..58d23d994d4 100644
--- a/library/cpp/http/server/http.cpp
+++ b/library/cpp/http/server/http.cpp
@@ -56,6 +56,7 @@ public:
inline void DeActivate();
inline void Reject();
+ void ScheduleDelete();
public:
TSocket Socket_;
NAddr::IRemoteAddrRef ListenerSockAddrRef_;
@@ -64,6 +65,11 @@ public:
TInstant LastUsed;
TInstant AcceptMoment;
size_t ReceivedRequests = 0;
+
+ struct TCleanupState {
+ ui64 ThreadMask = 0;
+ bool Closed = false;
+ } CleanupState_;
};
class THttpServer::TImpl {
@@ -90,6 +96,24 @@ public:
}
}
+ void Cleanup(size_t threadNum) {
+ if (Options.nListenerThreads < 2) {
+ return;
+ }
+
+ TIntrusiveListWithAutoDelete<TClientConnection, TDelete> toDelete;
+
+ {
+ TGuard<TMutex> g(Mutex_);
+
+ PendingDelete_.ForEach([&toDelete, threadNum](TClientConnection * conn) {
+ if (!(conn->CleanupState_.ThreadMask &= ~((ui64)1 << threadNum))) {
+ toDelete.PushBack(conn);
+ }
+ });
+ }
+ }
+
inline void Erase(TClientConnection* c, TInstant now) noexcept {
TGuard<TMutex> g(Mutex_);
@@ -119,7 +143,14 @@ public:
return false;
}
EraseUnsafe(c);
- delete c;
+
+ if (Options.nListenerThreads > 1) {
+ c->ScheduleDelete();
+ PendingDelete_.PushBack(c);
+ } else {
+ delete c;
+ }
+
return true;
}
@@ -133,6 +164,7 @@ public:
public:
TMutex Mutex_;
TIntrusiveListWithAutoDelete<TClientConnection, TDelete> Conns_;
+ TIntrusiveListWithAutoDelete<TClientConnection, TDelete> PendingDelete_;
TSocketPoller* Poller_ = nullptr;
const THttpServerOptions& Options;
};
@@ -207,10 +239,13 @@ public:
Cb_->OnListenStart();
try {
- ListenThread.Reset(new TThread([this]() {
- ListenSocket();
- }));
- ListenThread->Start();
+ RunningListeners_.store(Options_.nListenerThreads);
+ for (size_t i = 0; i < Options_.nListenerThreads; ++i) {
+ ListenThreads.push_back(MakeHolder<TThread>([this, threadNum = i]() {
+ ListenSocket(threadNum);
+ }));
+ ListenThreads.back()->Start();
+ }
} catch (const yexception&) {
SaveErrorCode();
return false;
@@ -219,24 +254,24 @@ public:
return true;
}
- void JoinListenerThread() {
- if (ListenThread) {
- ListenThread->Join();
- ListenThread.Reset(nullptr);
+ void JoinListenerThreads() {
+ while (!ListenThreads.empty()) {
+ ListenThreads.back()->Join();
+ ListenThreads.pop_back();
}
}
void Wait() {
Cb_->OnWait();
TGuard<TMutex> g(StopMutex);
- JoinListenerThread();
+ JoinListenerThreads();
}
void Stop() {
Shutdown();
TGuard<TMutex> g(StopMutex);
- JoinListenerThread();
+ JoinListenerThreads();
while (ConnectionCount) {
usleep(10000);
@@ -334,7 +369,7 @@ public:
NAddr::IRemoteAddrRef SockAddrRef_;
};
- void ListenSocket() {
+ void ListenSocket(size_t threadNum) {
TThread::SetCurrentThreadName(Options_.ListenThreadName.c_str());
TVector<void*> events;
@@ -343,6 +378,8 @@ public:
TInstant now = TInstant::Now();
for (;;) {
try {
+ Connections->Cleanup(threadNum);
+
const TInstant deadline = Options_.PollTimeout == TDuration::Zero() ? TInstant::Max() : now + Options_.PollTimeout;
const size_t ret = Poller->WaitD(events.data(), events.size(), deadline);
@@ -371,15 +408,17 @@ public:
}
}
- while (!Reqs.Empty()) {
- THolder<TListenSocket> ls(Reqs.PopFront());
+ if (0 == --RunningListeners_) {
+ while (!Reqs.Empty()) {
+ THolder<TListenSocket> ls(Reqs.PopFront());
- Poller->Unwait(ls->GetSocket());
- }
+ Poller->Unwait(ls->GetSocket());
+ }
- Requests->Stop();
- FailRequests->Stop();
- Cb_->OnListenStop();
+ Requests->Stop();
+ FailRequests->Stop();
+ Cb_->OnListenStop();
+ }
}
void RestartRequestThreads(ui32 nTh, ui32 maxQS) {
@@ -396,6 +435,16 @@ public:
, Cb_(cb)
, Parent_(parent)
{
+ if (Options_.nListenerThreads > 1) {
+ Options_.OneShotPoll = true;
+
+ const auto minPollTimeout = TDuration::MilliSeconds(100);
+ if (!Options_.PollTimeout || Options_.PollTimeout > minPollTimeout) {
+ Options_.PollTimeout = minPollTimeout;
+ }
+
+ Y_ENSURE(Options_.nListenerThreads < 64);
+ }
}
TImpl(THttpServer* parent, ICallBack* cb, const TOptions& options, IThreadFactory* factory)
@@ -434,7 +483,8 @@ public:
return Options_.MaxConnections && ((size_t)GetClientCount() >= Options_.MaxConnections);
}
- THolder<TThread> ListenThread;
+ TVector<THolder<TThread>> ListenThreads;
+ std::atomic<size_t> RunningListeners_ = 0;
TIntrusiveListWithAutoDelete<TListenSocket, TDelete> Reqs;
TPipeHandle ListenWakeupReadFd;
TPipeHandle ListenWakeupWriteFd;
@@ -558,7 +608,15 @@ TClientConnection::TClientConnection(const TSocket& s, THttpServer::TImpl* serv,
}
TClientConnection::~TClientConnection() {
+ if (!CleanupState_.Closed) {
+ HttpServ_->DecreaseConnections();
+ }
+}
+void TClientConnection::ScheduleDelete() {
+ Socket_.Close();
HttpServ_->DecreaseConnections();
+ CleanupState_.ThreadMask = ((ui64)1 << HttpServ_->Options().nListenerThreads) - 1;
+ CleanupState_.Closed = true;
}
void TClientConnection::OnPollEvent(TInstant now) {
diff --git a/library/cpp/http/server/http_ut.cpp b/library/cpp/http/server/http_ut.cpp
index dc02c44295c..16bbb85cb86 100644
--- a/library/cpp/http/server/http_ut.cpp
+++ b/library/cpp/http/server/http_ut.cpp
@@ -822,6 +822,25 @@ Y_UNIT_TEST_SUITE(THttpServerTest) {
TVector<TCounters> Counters_;
};
+ struct TTestConfig {
+ bool OneShot = false;
+ ui32 ListenerThreads = 1;
+ };
+
+ TVector<TTestConfig> testConfigs = {
+ {.OneShot = false, .ListenerThreads = 1},
+ {.OneShot = true, .ListenerThreads = 1},
+ {.OneShot = true, .ListenerThreads = 4},
+ {.OneShot = true, .ListenerThreads = 63},
+ };
+
+ THttpServer::TOptions ApplyConfig(const THttpServer::TOptions& opts, const TTestConfig& cfg) {
+ THttpServer::TOptions res = opts;
+ res.OneShotPoll = cfg.OneShot;
+ res.nListenerThreads = cfg.ListenerThreads;
+ return res;
+ }
+
Y_UNIT_TEST(TestStartStop) {
TPortManager pm;
const ui16 port = pm.GetPort();
@@ -830,9 +849,9 @@ Y_UNIT_TEST_SUITE(THttpServerTest) {
TShooter shooter(threadCount, port);
TString res = TestData();
- for (bool oneShot : {true, false}) {
+ for (const auto& cfg : testConfigs) {
TEchoServer serverImpl(res);
- THttpServer server(&serverImpl, THttpServer::TOptions(port).EnableKeepAlive(true).SetOneShotPoll(oneShot));
+ THttpServer server(&serverImpl, ApplyConfig(THttpServer::TOptions(port).EnableKeepAlive(true), cfg));
for (size_t i = 0; i < 100; ++i) {
UNIT_ASSERT(server.Start());
shooter.WaitProgress();
@@ -884,9 +903,9 @@ Y_UNIT_TEST_SUITE(THttpServerTest) {
TString res = TestData();
- for (bool oneShot : {true, false}) {
+ for (const auto& cfg : testConfigs) {
TMaxConnServer serverImpl(res);
- THttpServer server(&serverImpl, THttpServer::TOptions(port).EnableKeepAlive(true).SetMaxConnections(maxConnections).SetOneShotPoll(oneShot));
+ THttpServer server(&serverImpl, ApplyConfig(THttpServer::TOptions(port).EnableKeepAlive(true).SetMaxConnections(maxConnections), cfg));
UNIT_ASSERT(server.Start());
diff --git a/library/cpp/http/server/options.h b/library/cpp/http/server/options.h
index 8a83f4f6dbf..cacd7ebedaf 100644
--- a/library/cpp/http/server/options.h
+++ b/library/cpp/http/server/options.h
@@ -145,6 +145,12 @@ public:
return *this;
}
+ inline THttpServerOptions& SetListenerThreads(ui32 val) {
+ nListenerThreads = val;
+
+ return *this;
+ }
+
struct TAddr {
TString Addr;
ui16 Port;
@@ -182,4 +188,5 @@ public:
TString FailRequestsThreadName = "HttpServer";
bool OneShotPoll = false;
+ ui32 nListenerThreads = 1;
};