aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkulikov <kulikov@yandex-team.com>2023-08-24 21:02:28 +0300
committerkulikov <kulikov@yandex-team.com>2023-08-24 21:21:32 +0300
commite952891ca11362042404965378d76b1b579aa424 (patch)
treeb325f41be2e4caa6ff1c2802ad04a9a4dada2658
parent15c36956959084c1fefe469c2d9fda51f77fedb6 (diff)
downloadydb-e952891ca11362042404965378d76b1b579aa424.tar.gz
simplify, prepare for more than one listener thread
- move out listener and thread pools initialization stage from listener thread (no actual changes, this part of code was awaited via ListenStartEvent anyway) ; - remove ListenerStartEvent and ListenerRunningOK flag, no use now; - make Reqs list of listening sockets class member; - leave Reqs list destruction in listener thread (it should happen just after Shutdown but after polling loop stopped to prevent races); - ut for server startup fail.
-rw-r--r--library/cpp/http/server/http.cpp52
-rw-r--r--library/cpp/http/server/http_ut.cpp25
2 files changed, 45 insertions, 32 deletions
diff --git a/library/cpp/http/server/http.cpp b/library/cpp/http/server/http.cpp
index 19a16bdcb5..026dc18d65 100644
--- a/library/cpp/http/server/http.cpp
+++ b/library/cpp/http/server/http.cpp
@@ -186,9 +186,6 @@ public:
Poller.Reset(new TSocketPoller());
Connections.Reset(new TConnections(Poller.Get(), Options_));
- // Start the listener thread
- ListenerRunningOK = false;
-
// throws on error
TPipeHandle::Pipe(ListenWakeupReadFd, ListenWakeupWriteFd);
@@ -197,7 +194,24 @@ public:
Poller->WaitRead(ListenWakeupReadFd, &WakeupPollAble);
- ListenStartEvent.Reset();
+ ErrorCode = 0;
+
+ std::function<void(TSocket)> callback = [&](TSocket socket) {
+ THolder<TListenSocket> ls(new TListenSocket(socket, this));
+ Poller->WaitRead(socket, static_cast<IPollAble*>(ls.Get()));
+ Reqs.PushBack(ls.Release());
+ };
+
+ bool addressesBound = TryToBindAddresses(Options_, &callback);
+ if (!addressesBound) {
+ SaveErrorCode();
+ return false;
+ }
+
+ Requests->Start(Options_.nThreads, Options_.MaxQueueSize);
+ FailRequests->Start(Options_.nFThreads, Options_.MaxFQueueSize);
+ Cb_->OnListenStart();
+
try {
ListenThread.Reset(new TThread(ListenSocketFunction, this));
ListenThread->Start();
@@ -206,10 +220,7 @@ public:
return false;
}
- // Wait until the thread has completely started and return the success indicator
- ListenStartEvent.Wait();
-
- return ListenerRunningOK;
+ return true;
}
void Wait() {
@@ -325,28 +336,6 @@ public:
void ListenSocket() {
TThread::SetCurrentThreadName(Options_.ListenThreadName.c_str());
- ErrorCode = 0;
- TIntrusiveListWithAutoDelete<TListenSocket, TDelete> Reqs;
-
- std::function<void(TSocket)> callback = [&](TSocket socket) {
- THolder<TListenSocket> ls(new TListenSocket(socket, this));
- Poller->WaitRead(socket, static_cast<IPollAble*>(ls.Get()));
- Reqs.PushBack(ls.Release());
- };
- bool addressesBound = TryToBindAddresses(Options_, &callback);
- if (!addressesBound) {
- SaveErrorCode();
- ListenStartEvent.Signal();
-
- return;
- }
-
- Requests->Start(Options_.nThreads, Options_.MaxQueueSize);
- FailRequests->Start(Options_.nFThreads, Options_.MaxFQueueSize);
- Cb_->OnListenStart();
- ListenerRunningOK = true;
- ListenStartEvent.Signal();
-
TVector<void*> events;
events.resize(Options_.EpollMaxEvents);
@@ -445,15 +434,14 @@ public:
}
THolder<TThread> ListenThread;
+ TIntrusiveListWithAutoDelete<TListenSocket, TDelete> Reqs;
TPipeHandle ListenWakeupReadFd;
TPipeHandle ListenWakeupWriteFd;
- TSystemEvent ListenStartEvent;
TMtpQueueRef Requests;
TMtpQueueRef FailRequests;
TAtomic ConnectionCount = 0;
THolder<TSocketPoller> Poller;
THolder<TConnections> Connections;
- bool ListenerRunningOK = false;
int ErrorCode = 0;
TOptions Options_;
ICallBack* Cb_ = nullptr;
diff --git a/library/cpp/http/server/http_ut.cpp b/library/cpp/http/server/http_ut.cpp
index b953a4ef2a..dc02c44295 100644
--- a/library/cpp/http/server/http_ut.cpp
+++ b/library/cpp/http/server/http_ut.cpp
@@ -909,4 +909,29 @@ Y_UNIT_TEST_SUITE(THttpServerTest) {
}
}
}
+
+ Y_UNIT_TEST(StartFail) {
+ TString res = TestData();
+ TEchoServer serverImpl(res);
+ {
+ THttpServer server(&serverImpl, THttpServer::TOptions(1));
+
+ UNIT_ASSERT(!server.GetErrorCode());
+ UNIT_ASSERT(!server.Start());
+ UNIT_ASSERT(server.GetErrorCode());
+ }
+
+ {
+ TPortManager pm;
+ const ui16 port = pm.GetPort();
+ THttpServer server1(&serverImpl, THttpServer::TOptions(port));
+ UNIT_ASSERT(server1.Start());
+ UNIT_ASSERT(!server1.GetErrorCode());
+
+ THttpServer server2(&serverImpl, THttpServer::TOptions(port));
+ UNIT_ASSERT(!server2.Start());
+ UNIT_ASSERT(server2.GetErrorCode());
+ }
+
+ }
}