diff options
author | kulikov <kulikov@yandex-team.com> | 2023-08-29 21:38:44 +0300 |
---|---|---|
committer | kulikov <kulikov@yandex-team.com> | 2023-08-30 02:24:59 +0300 |
commit | 6b340452cae7be442a49905343f8b76f1a801bf6 (patch) | |
tree | 067a33a699464a5922cc667f5fceb2c88cf0dc1b /library/cpp | |
parent | 2821dc198c6906c906f42b2e2a5748412222af7c (diff) | |
download | ydb-6b340452cae7be442a49905343f8b76f1a801bf6.tar.gz |
more preparations for more than one listener thread
- apply one shot poll (under option) for listening socket too;
- some code rearrangements.
Diffstat (limited to 'library/cpp')
-rw-r--r-- | library/cpp/http/server/http.cpp | 39 |
1 files changed, 20 insertions, 19 deletions
diff --git a/library/cpp/http/server/http.cpp b/library/cpp/http/server/http.cpp index 026dc18d65..e43b8415c8 100644 --- a/library/cpp/http/server/http.cpp +++ b/library/cpp/http/server/http.cpp @@ -137,16 +137,6 @@ public: const THttpServerOptions& Options; }; - static void* ListenSocketFunction(void* param) { - try { - ((TImpl*)param)->ListenSocket(); - } catch (...) { - - } - - return nullptr; - } - TAutoPtr<TClientRequest> CreateRequest(TAutoPtr<TClientConnection> c) { THolder<TClientRequest> obj(Cb_->CreateClient()); @@ -198,7 +188,11 @@ public: std::function<void(TSocket)> callback = [&](TSocket socket) { THolder<TListenSocket> ls(new TListenSocket(socket, this)); - Poller->WaitRead(socket, static_cast<IPollAble*>(ls.Get())); + if (Options_.OneShotPoll) { + Poller->WaitReadOneShot(socket, static_cast<IPollAble*>(ls.Get())); + } else { + Poller->WaitRead(socket, static_cast<IPollAble*>(ls.Get())); + } Reqs.PushBack(ls.Release()); }; @@ -213,7 +207,9 @@ public: Cb_->OnListenStart(); try { - ListenThread.Reset(new TThread(ListenSocketFunction, this)); + ListenThread.Reset(new TThread([this]() { + ListenSocket(); + })); ListenThread->Start(); } catch (const yexception&) { SaveErrorCode(); @@ -223,23 +219,24 @@ public: return true; } - void Wait() { - Cb_->OnWait(); - TGuard<TMutex> g(StopMutex); + void JoinListenerThread() { if (ListenThread) { ListenThread->Join(); ListenThread.Reset(nullptr); } } + void Wait() { + Cb_->OnWait(); + TGuard<TMutex> g(StopMutex); + JoinListenerThread(); + } + void Stop() { Shutdown(); TGuard<TMutex> g(StopMutex); - if (ListenThread) { - ListenThread->Join(); - ListenThread.Reset(nullptr); - } + JoinListenerThread(); while (ConnectionCount) { usleep(10000); @@ -316,6 +313,10 @@ public: void OnPollEvent(TInstant) override { SOCKET s = ::accept(S_, nullptr, nullptr); + if (Server_->Options_.OneShotPoll) { + Server_->Poller->WaitReadOneShot(S_, this); + } + if (s == INVALID_SOCKET) { ythrow yexception() << "accept: " << LastSystemErrorText(); } |