aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/http
diff options
context:
space:
mode:
authorkulikov <kulikov@yandex-team.com>2023-08-29 21:38:44 +0300
committerkulikov <kulikov@yandex-team.com>2023-08-30 02:24:59 +0300
commit6b340452cae7be442a49905343f8b76f1a801bf6 (patch)
tree067a33a699464a5922cc667f5fceb2c88cf0dc1b /library/cpp/http
parent2821dc198c6906c906f42b2e2a5748412222af7c (diff)
downloadydb-6b340452cae7be442a49905343f8b76f1a801bf6.tar.gz
more preparations for more than one listener thread
- apply one shot poll (under option) for listening socket too; - some code rearrangements.
Diffstat (limited to 'library/cpp/http')
-rw-r--r--library/cpp/http/server/http.cpp39
1 files changed, 20 insertions, 19 deletions
diff --git a/library/cpp/http/server/http.cpp b/library/cpp/http/server/http.cpp
index 026dc18d65..e43b8415c8 100644
--- a/library/cpp/http/server/http.cpp
+++ b/library/cpp/http/server/http.cpp
@@ -137,16 +137,6 @@ public:
const THttpServerOptions& Options;
};
- static void* ListenSocketFunction(void* param) {
- try {
- ((TImpl*)param)->ListenSocket();
- } catch (...) {
-
- }
-
- return nullptr;
- }
-
TAutoPtr<TClientRequest> CreateRequest(TAutoPtr<TClientConnection> c) {
THolder<TClientRequest> obj(Cb_->CreateClient());
@@ -198,7 +188,11 @@ public:
std::function<void(TSocket)> callback = [&](TSocket socket) {
THolder<TListenSocket> ls(new TListenSocket(socket, this));
- Poller->WaitRead(socket, static_cast<IPollAble*>(ls.Get()));
+ if (Options_.OneShotPoll) {
+ Poller->WaitReadOneShot(socket, static_cast<IPollAble*>(ls.Get()));
+ } else {
+ Poller->WaitRead(socket, static_cast<IPollAble*>(ls.Get()));
+ }
Reqs.PushBack(ls.Release());
};
@@ -213,7 +207,9 @@ public:
Cb_->OnListenStart();
try {
- ListenThread.Reset(new TThread(ListenSocketFunction, this));
+ ListenThread.Reset(new TThread([this]() {
+ ListenSocket();
+ }));
ListenThread->Start();
} catch (const yexception&) {
SaveErrorCode();
@@ -223,23 +219,24 @@ public:
return true;
}
- void Wait() {
- Cb_->OnWait();
- TGuard<TMutex> g(StopMutex);
+ void JoinListenerThread() {
if (ListenThread) {
ListenThread->Join();
ListenThread.Reset(nullptr);
}
}
+ void Wait() {
+ Cb_->OnWait();
+ TGuard<TMutex> g(StopMutex);
+ JoinListenerThread();
+ }
+
void Stop() {
Shutdown();
TGuard<TMutex> g(StopMutex);
- if (ListenThread) {
- ListenThread->Join();
- ListenThread.Reset(nullptr);
- }
+ JoinListenerThread();
while (ConnectionCount) {
usleep(10000);
@@ -316,6 +313,10 @@ public:
void OnPollEvent(TInstant) override {
SOCKET s = ::accept(S_, nullptr, nullptr);
+ if (Server_->Options_.OneShotPoll) {
+ Server_->Poller->WaitReadOneShot(S_, this);
+ }
+
if (s == INVALID_SOCKET) {
ythrow yexception() << "accept: " << LastSystemErrorText();
}