diff options
Diffstat (limited to 'library/cpp/http')
| -rw-r--r-- | library/cpp/http/server/http.cpp | 98 | ||||
| -rw-r--r-- | library/cpp/http/server/http_ut.cpp | 27 | ||||
| -rw-r--r-- | library/cpp/http/server/options.h | 7 |
3 files changed, 108 insertions, 24 deletions
diff --git a/library/cpp/http/server/http.cpp b/library/cpp/http/server/http.cpp index e43b8415c83..58d23d994d4 100644 --- a/library/cpp/http/server/http.cpp +++ b/library/cpp/http/server/http.cpp @@ -56,6 +56,7 @@ public: inline void DeActivate(); inline void Reject(); + void ScheduleDelete(); public: TSocket Socket_; NAddr::IRemoteAddrRef ListenerSockAddrRef_; @@ -64,6 +65,11 @@ public: TInstant LastUsed; TInstant AcceptMoment; size_t ReceivedRequests = 0; + + struct TCleanupState { + ui64 ThreadMask = 0; + bool Closed = false; + } CleanupState_; }; class THttpServer::TImpl { @@ -90,6 +96,24 @@ public: } } + void Cleanup(size_t threadNum) { + if (Options.nListenerThreads < 2) { + return; + } + + TIntrusiveListWithAutoDelete<TClientConnection, TDelete> toDelete; + + { + TGuard<TMutex> g(Mutex_); + + PendingDelete_.ForEach([&toDelete, threadNum](TClientConnection * conn) { + if (!(conn->CleanupState_.ThreadMask &= ~((ui64)1 << threadNum))) { + toDelete.PushBack(conn); + } + }); + } + } + inline void Erase(TClientConnection* c, TInstant now) noexcept { TGuard<TMutex> g(Mutex_); @@ -119,7 +143,14 @@ public: return false; } EraseUnsafe(c); - delete c; + + if (Options.nListenerThreads > 1) { + c->ScheduleDelete(); + PendingDelete_.PushBack(c); + } else { + delete c; + } + return true; } @@ -133,6 +164,7 @@ public: public: TMutex Mutex_; TIntrusiveListWithAutoDelete<TClientConnection, TDelete> Conns_; + TIntrusiveListWithAutoDelete<TClientConnection, TDelete> PendingDelete_; TSocketPoller* Poller_ = nullptr; const THttpServerOptions& Options; }; @@ -207,10 +239,13 @@ public: Cb_->OnListenStart(); try { - ListenThread.Reset(new TThread([this]() { - ListenSocket(); - })); - ListenThread->Start(); + RunningListeners_.store(Options_.nListenerThreads); + for (size_t i = 0; i < Options_.nListenerThreads; ++i) { + ListenThreads.push_back(MakeHolder<TThread>([this, threadNum = i]() { + ListenSocket(threadNum); + })); + ListenThreads.back()->Start(); + } } catch (const yexception&) { SaveErrorCode(); return false; @@ -219,24 +254,24 @@ public: return true; } - void JoinListenerThread() { - if (ListenThread) { - ListenThread->Join(); - ListenThread.Reset(nullptr); + void JoinListenerThreads() { + while (!ListenThreads.empty()) { + ListenThreads.back()->Join(); + ListenThreads.pop_back(); } } void Wait() { Cb_->OnWait(); TGuard<TMutex> g(StopMutex); - JoinListenerThread(); + JoinListenerThreads(); } void Stop() { Shutdown(); TGuard<TMutex> g(StopMutex); - JoinListenerThread(); + JoinListenerThreads(); while (ConnectionCount) { usleep(10000); @@ -334,7 +369,7 @@ public: NAddr::IRemoteAddrRef SockAddrRef_; }; - void ListenSocket() { + void ListenSocket(size_t threadNum) { TThread::SetCurrentThreadName(Options_.ListenThreadName.c_str()); TVector<void*> events; @@ -343,6 +378,8 @@ public: TInstant now = TInstant::Now(); for (;;) { try { + Connections->Cleanup(threadNum); + const TInstant deadline = Options_.PollTimeout == TDuration::Zero() ? TInstant::Max() : now + Options_.PollTimeout; const size_t ret = Poller->WaitD(events.data(), events.size(), deadline); @@ -371,15 +408,17 @@ public: } } - while (!Reqs.Empty()) { - THolder<TListenSocket> ls(Reqs.PopFront()); + if (0 == --RunningListeners_) { + while (!Reqs.Empty()) { + THolder<TListenSocket> ls(Reqs.PopFront()); - Poller->Unwait(ls->GetSocket()); - } + Poller->Unwait(ls->GetSocket()); + } - Requests->Stop(); - FailRequests->Stop(); - Cb_->OnListenStop(); + Requests->Stop(); + FailRequests->Stop(); + Cb_->OnListenStop(); + } } void RestartRequestThreads(ui32 nTh, ui32 maxQS) { @@ -396,6 +435,16 @@ public: , Cb_(cb) , Parent_(parent) { + if (Options_.nListenerThreads > 1) { + Options_.OneShotPoll = true; + + const auto minPollTimeout = TDuration::MilliSeconds(100); + if (!Options_.PollTimeout || Options_.PollTimeout > minPollTimeout) { + Options_.PollTimeout = minPollTimeout; + } + + Y_ENSURE(Options_.nListenerThreads < 64); + } } TImpl(THttpServer* parent, ICallBack* cb, const TOptions& options, IThreadFactory* factory) @@ -434,7 +483,8 @@ public: return Options_.MaxConnections && ((size_t)GetClientCount() >= Options_.MaxConnections); } - THolder<TThread> ListenThread; + TVector<THolder<TThread>> ListenThreads; + std::atomic<size_t> RunningListeners_ = 0; TIntrusiveListWithAutoDelete<TListenSocket, TDelete> Reqs; TPipeHandle ListenWakeupReadFd; TPipeHandle ListenWakeupWriteFd; @@ -558,7 +608,15 @@ TClientConnection::TClientConnection(const TSocket& s, THttpServer::TImpl* serv, } TClientConnection::~TClientConnection() { + if (!CleanupState_.Closed) { + HttpServ_->DecreaseConnections(); + } +} +void TClientConnection::ScheduleDelete() { + Socket_.Close(); HttpServ_->DecreaseConnections(); + CleanupState_.ThreadMask = ((ui64)1 << HttpServ_->Options().nListenerThreads) - 1; + CleanupState_.Closed = true; } void TClientConnection::OnPollEvent(TInstant now) { diff --git a/library/cpp/http/server/http_ut.cpp b/library/cpp/http/server/http_ut.cpp index dc02c44295c..16bbb85cb86 100644 --- a/library/cpp/http/server/http_ut.cpp +++ b/library/cpp/http/server/http_ut.cpp @@ -822,6 +822,25 @@ Y_UNIT_TEST_SUITE(THttpServerTest) { TVector<TCounters> Counters_; }; + struct TTestConfig { + bool OneShot = false; + ui32 ListenerThreads = 1; + }; + + TVector<TTestConfig> testConfigs = { + {.OneShot = false, .ListenerThreads = 1}, + {.OneShot = true, .ListenerThreads = 1}, + {.OneShot = true, .ListenerThreads = 4}, + {.OneShot = true, .ListenerThreads = 63}, + }; + + THttpServer::TOptions ApplyConfig(const THttpServer::TOptions& opts, const TTestConfig& cfg) { + THttpServer::TOptions res = opts; + res.OneShotPoll = cfg.OneShot; + res.nListenerThreads = cfg.ListenerThreads; + return res; + } + Y_UNIT_TEST(TestStartStop) { TPortManager pm; const ui16 port = pm.GetPort(); @@ -830,9 +849,9 @@ Y_UNIT_TEST_SUITE(THttpServerTest) { TShooter shooter(threadCount, port); TString res = TestData(); - for (bool oneShot : {true, false}) { + for (const auto& cfg : testConfigs) { TEchoServer serverImpl(res); - THttpServer server(&serverImpl, THttpServer::TOptions(port).EnableKeepAlive(true).SetOneShotPoll(oneShot)); + THttpServer server(&serverImpl, ApplyConfig(THttpServer::TOptions(port).EnableKeepAlive(true), cfg)); for (size_t i = 0; i < 100; ++i) { UNIT_ASSERT(server.Start()); shooter.WaitProgress(); @@ -884,9 +903,9 @@ Y_UNIT_TEST_SUITE(THttpServerTest) { TString res = TestData(); - for (bool oneShot : {true, false}) { + for (const auto& cfg : testConfigs) { TMaxConnServer serverImpl(res); - THttpServer server(&serverImpl, THttpServer::TOptions(port).EnableKeepAlive(true).SetMaxConnections(maxConnections).SetOneShotPoll(oneShot)); + THttpServer server(&serverImpl, ApplyConfig(THttpServer::TOptions(port).EnableKeepAlive(true).SetMaxConnections(maxConnections), cfg)); UNIT_ASSERT(server.Start()); diff --git a/library/cpp/http/server/options.h b/library/cpp/http/server/options.h index 8a83f4f6dbf..cacd7ebedaf 100644 --- a/library/cpp/http/server/options.h +++ b/library/cpp/http/server/options.h @@ -145,6 +145,12 @@ public: return *this; } + inline THttpServerOptions& SetListenerThreads(ui32 val) { + nListenerThreads = val; + + return *this; + } + struct TAddr { TString Addr; ui16 Port; @@ -182,4 +188,5 @@ public: TString FailRequestsThreadName = "HttpServer"; bool OneShotPoll = false; + ui32 nListenerThreads = 1; }; |
