diff options
author | kulikov <kulikov@yandex-team.com> | 2023-08-24 21:02:28 +0300 |
---|---|---|
committer | kulikov <kulikov@yandex-team.com> | 2023-08-24 21:21:32 +0300 |
commit | e952891ca11362042404965378d76b1b579aa424 (patch) | |
tree | b325f41be2e4caa6ff1c2802ad04a9a4dada2658 | |
parent | 15c36956959084c1fefe469c2d9fda51f77fedb6 (diff) | |
download | ydb-e952891ca11362042404965378d76b1b579aa424.tar.gz |
simplify, prepare for more than one listener thread
- move out listener and thread pools initialization stage from listener thread (no actual changes, this part of code was awaited via ListenStartEvent anyway) ;
- remove ListenerStartEvent and ListenerRunningOK flag, no use now;
- make Reqs list of listening sockets class member;
- leave Reqs list destruction in listener thread (it should happen just after Shutdown but after polling loop stopped to prevent races);
- ut for server startup fail.
-rw-r--r-- | library/cpp/http/server/http.cpp | 52 | ||||
-rw-r--r-- | library/cpp/http/server/http_ut.cpp | 25 |
2 files changed, 45 insertions, 32 deletions
diff --git a/library/cpp/http/server/http.cpp b/library/cpp/http/server/http.cpp index 19a16bdcb5..026dc18d65 100644 --- a/library/cpp/http/server/http.cpp +++ b/library/cpp/http/server/http.cpp @@ -186,9 +186,6 @@ public: Poller.Reset(new TSocketPoller()); Connections.Reset(new TConnections(Poller.Get(), Options_)); - // Start the listener thread - ListenerRunningOK = false; - // throws on error TPipeHandle::Pipe(ListenWakeupReadFd, ListenWakeupWriteFd); @@ -197,7 +194,24 @@ public: Poller->WaitRead(ListenWakeupReadFd, &WakeupPollAble); - ListenStartEvent.Reset(); + ErrorCode = 0; + + std::function<void(TSocket)> callback = [&](TSocket socket) { + THolder<TListenSocket> ls(new TListenSocket(socket, this)); + Poller->WaitRead(socket, static_cast<IPollAble*>(ls.Get())); + Reqs.PushBack(ls.Release()); + }; + + bool addressesBound = TryToBindAddresses(Options_, &callback); + if (!addressesBound) { + SaveErrorCode(); + return false; + } + + Requests->Start(Options_.nThreads, Options_.MaxQueueSize); + FailRequests->Start(Options_.nFThreads, Options_.MaxFQueueSize); + Cb_->OnListenStart(); + try { ListenThread.Reset(new TThread(ListenSocketFunction, this)); ListenThread->Start(); @@ -206,10 +220,7 @@ public: return false; } - // Wait until the thread has completely started and return the success indicator - ListenStartEvent.Wait(); - - return ListenerRunningOK; + return true; } void Wait() { @@ -325,28 +336,6 @@ public: void ListenSocket() { TThread::SetCurrentThreadName(Options_.ListenThreadName.c_str()); - ErrorCode = 0; - TIntrusiveListWithAutoDelete<TListenSocket, TDelete> Reqs; - - std::function<void(TSocket)> callback = [&](TSocket socket) { - THolder<TListenSocket> ls(new TListenSocket(socket, this)); - Poller->WaitRead(socket, static_cast<IPollAble*>(ls.Get())); - Reqs.PushBack(ls.Release()); - }; - bool addressesBound = TryToBindAddresses(Options_, &callback); - if (!addressesBound) { - SaveErrorCode(); - ListenStartEvent.Signal(); - - return; - } - - Requests->Start(Options_.nThreads, Options_.MaxQueueSize); - FailRequests->Start(Options_.nFThreads, Options_.MaxFQueueSize); - Cb_->OnListenStart(); - ListenerRunningOK = true; - ListenStartEvent.Signal(); - TVector<void*> events; events.resize(Options_.EpollMaxEvents); @@ -445,15 +434,14 @@ public: } THolder<TThread> ListenThread; + TIntrusiveListWithAutoDelete<TListenSocket, TDelete> Reqs; TPipeHandle ListenWakeupReadFd; TPipeHandle ListenWakeupWriteFd; - TSystemEvent ListenStartEvent; TMtpQueueRef Requests; TMtpQueueRef FailRequests; TAtomic ConnectionCount = 0; THolder<TSocketPoller> Poller; THolder<TConnections> Connections; - bool ListenerRunningOK = false; int ErrorCode = 0; TOptions Options_; ICallBack* Cb_ = nullptr; diff --git a/library/cpp/http/server/http_ut.cpp b/library/cpp/http/server/http_ut.cpp index b953a4ef2a..dc02c44295 100644 --- a/library/cpp/http/server/http_ut.cpp +++ b/library/cpp/http/server/http_ut.cpp @@ -909,4 +909,29 @@ Y_UNIT_TEST_SUITE(THttpServerTest) { } } } + + Y_UNIT_TEST(StartFail) { + TString res = TestData(); + TEchoServer serverImpl(res); + { + THttpServer server(&serverImpl, THttpServer::TOptions(1)); + + UNIT_ASSERT(!server.GetErrorCode()); + UNIT_ASSERT(!server.Start()); + UNIT_ASSERT(server.GetErrorCode()); + } + + { + TPortManager pm; + const ui16 port = pm.GetPort(); + THttpServer server1(&serverImpl, THttpServer::TOptions(port)); + UNIT_ASSERT(server1.Start()); + UNIT_ASSERT(!server1.GetErrorCode()); + + THttpServer server2(&serverImpl, THttpServer::TOptions(port)); + UNIT_ASSERT(!server2.Start()); + UNIT_ASSERT(server2.GetErrorCode()); + } + + } } |