diff options
author | markov <markov@yandex-team.ru> | 2022-02-10 16:49:38 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:49:38 +0300 |
commit | 22a7d4be7e5b779e4deee4dfe36234a1eae13fbd (patch) | |
tree | 5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp | |
parent | c35714eae9ffbb7be89036495af366fa51a8c097 (diff) | |
download | ydb-22a7d4be7e5b779e4deee4dfe36234a1eae13fbd.tar.gz |
Restoring authorship annotation for <markov@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp')
-rw-r--r-- | library/cpp/http/server/http.cpp | 48 | ||||
-rw-r--r-- | library/cpp/http/server/http.h | 6 | ||||
-rw-r--r-- | library/cpp/http/server/http_ut.cpp | 266 | ||||
-rw-r--r-- | library/cpp/http/server/options.h | 14 |
4 files changed, 167 insertions, 167 deletions
diff --git a/library/cpp/http/server/http.cpp b/library/cpp/http/server/http.cpp index 7ae2afe553..128583bdd7 100644 --- a/library/cpp/http/server/http.cpp +++ b/library/cpp/http/server/http.cpp @@ -59,13 +59,13 @@ public: inline void Activate(TInstant now) noexcept; inline void DeActivate(); - inline void Reject(); + inline void Reject(); public: TSocket Socket_; NAddr::IRemoteAddrRef ListenerSockAddrRef_; THttpServer::TImpl* HttpServ_ = nullptr; - bool Reject_ = false; + bool Reject_ = false; TInstant LastUsed; TInstant AcceptMoment; size_t ReceivedRequests = 0; @@ -112,7 +112,7 @@ public: bool TryRemovingUnsafe(TInstant border) noexcept { if (Conns_.Empty()) { - return false; + return false; } TClientConnection* c = &*(Conns_.Begin()); if (c->LastUsed > border) { @@ -120,7 +120,7 @@ public: } EraseUnsafe(c); delete c; - return true; + return true; } void EraseUnsafe(TClientConnection* c) noexcept { @@ -154,13 +154,13 @@ public: } void AddRequestFromSocket(const TSocket& s, TInstant now, NAddr::IRemoteAddrRef listenerSockAddrRef) { - if (MaxRequestsReached()) { + if (MaxRequestsReached()) { Cb_->OnMaxConn(); bool wasRemoved = Connections->RemoveOld(TInstant::Max()); - if (!wasRemoved && Options_.RejectExcessConnections) { + if (!wasRemoved && Options_.RejectExcessConnections) { (new TClientConnection(s, this, listenerSockAddrRef))->Reject(); - return; - } + return; + } } auto connection = new TClientConnection(s, this, listenerSockAddrRef); @@ -242,7 +242,7 @@ public: // ignore result } - void AddRequest(TAutoPtr<TClientRequest> req, bool fail) { + void AddRequest(TAutoPtr<TClientRequest> req, bool fail) { struct TFailRequest: public THttpClientRequestEx { inline TFailRequest(TAutoPtr<TClientRequest> parent) { Conn_.Reset(parent->Conn_.Release()); @@ -259,7 +259,7 @@ public: } }; - if (!fail && Requests->Add(req.Get())) { + if (!fail && Requests->Add(req.Get())) { Y_UNUSED(req.Release()); } else { req = new TFailRequest(req); @@ -438,10 +438,10 @@ public: return AtomicGet(ConnectionCount); } - inline bool MaxRequestsReached() const { + inline bool MaxRequestsReached() const { return Options_.MaxConnections && ((size_t)GetClientCount() >= Options_.MaxConnections); - } - + } + THolder<TThread> ListenThread; TPipeHandle ListenWakeupReadFd; TPipeHandle ListenWakeupWriteFd; @@ -547,10 +547,10 @@ const IThreadPool& THttpServer::GetFailQueue() const { return Impl_->GetFailQueue(); } -bool THttpServer::MaxRequestsReached() const { - return Impl_->MaxRequestsReached(); -} - +bool THttpServer::MaxRequestsReached() const { + return Impl_->MaxRequestsReached(); +} + TClientConnection::TClientConnection(const TSocket& s, THttpServer::TImpl* serv, NAddr::IRemoteAddrRef listenerSockAddrRef) : Socket_(s) , ListenerSockAddrRef_(listenerSockAddrRef) @@ -591,7 +591,7 @@ void TClientConnection::OnPollEvent(TInstant now) { THolder<TClientRequest> obj(HttpServ_->CreateRequest(this_)); AcceptMoment = now; - HttpServ_->AddRequest(obj, Reject_); + HttpServ_->AddRequest(obj, Reject_); } void TClientConnection::Activate(TInstant now) noexcept { @@ -604,12 +604,12 @@ void TClientConnection::DeActivate() { HttpServ_->Connections->Add(this); } -void TClientConnection::Reject() { - Reject_ = true; - - HttpServ_->Connections->Add(this); -} - +void TClientConnection::Reject() { + Reject_ = true; + + HttpServ_->Connections->Add(this); +} + TClientRequest::TClientRequest() { } diff --git a/library/cpp/http/server/http.h b/library/cpp/http/server/http.h index 1639e3ff59..b292d38f27 100644 --- a/library/cpp/http/server/http.h +++ b/library/cpp/http/server/http.h @@ -93,9 +93,9 @@ public: static TAtomicBase AcceptReturnsInvalidSocketCounter(); private: - bool MaxRequestsReached() const; - -private: + bool MaxRequestsReached() const; + +private: THolder<TImpl> Impl_; }; diff --git a/library/cpp/http/server/http_ut.cpp b/library/cpp/http/server/http_ut.cpp index 1674efa4d2..cc62bb988e 100644 --- a/library/cpp/http/server/http_ut.cpp +++ b/library/cpp/http/server/http_ut.cpp @@ -8,7 +8,7 @@ #include <util/stream/output.h> #include <util/stream/zlib.h> #include <util/system/datetime.h> -#include <util/system/sem.h> +#include <util/system/sem.h> Y_UNIT_TEST_SUITE(THttpServerTest) { class TEchoServer: public THttpServer::ICallBack { @@ -53,89 +53,89 @@ Y_UNIT_TEST_SUITE(THttpServerTest) { TString Res_; }; - class TSleepingServer: public THttpServer::ICallBack { - class TReplier: public TRequestReplier { - public: - inline TReplier(TSleepingServer* server) - : Server(server) - { - } - - bool DoReply(const TReplyParams& params) override { - Server->FreeThread(); - Server->Busy(1); - params.Output.Write("HTTP/1.0 201 Created\nX-Server: sleeping server\n\nZoooo"); - params.Output.Finish(); - Server->Replies->Inc(); - return true; - } - - private: - TSleepingServer* Server = nullptr; - }; - - public: - inline TSleepingServer(unsigned int size) - : Semaphore("conns", size) - , Semaphore2("threads", 1) - , Replies(new TAtomicCounter()) - , MaxConns(new TAtomicCounter()) - { - } - - void ResetCounters() { - Replies.Reset(new TAtomicCounter()); - MaxConns.Reset(new TAtomicCounter()); - } - - long RepliesCount() const { - return Replies->Val(); - } - - long MaxConnsCount() const { - return MaxConns->Val(); - } - - TClientRequest* CreateClient() override { - return new TReplier(this); - } - - void OnMaxConn() override { - MaxConns->Inc(); - } - - void OnFailRequest(int) override { - FreeThread(); - Busy(1); - } - - void Busy(int count) { - while (count-- > 0) { - Semaphore.Acquire(); - } - } - - void BusyThread() { - Semaphore2.Acquire(); - } - - void Free(int count) { - while (count-- > 0) { - Semaphore.Release(); - } - } - - void FreeThread() { - Semaphore2.Release(); - } - - private: - TSemaphore Semaphore; - TSemaphore Semaphore2; - THolder<TAtomicCounter> Replies; - THolder<TAtomicCounter> MaxConns; - }; - + class TSleepingServer: public THttpServer::ICallBack { + class TReplier: public TRequestReplier { + public: + inline TReplier(TSleepingServer* server) + : Server(server) + { + } + + bool DoReply(const TReplyParams& params) override { + Server->FreeThread(); + Server->Busy(1); + params.Output.Write("HTTP/1.0 201 Created\nX-Server: sleeping server\n\nZoooo"); + params.Output.Finish(); + Server->Replies->Inc(); + return true; + } + + private: + TSleepingServer* Server = nullptr; + }; + + public: + inline TSleepingServer(unsigned int size) + : Semaphore("conns", size) + , Semaphore2("threads", 1) + , Replies(new TAtomicCounter()) + , MaxConns(new TAtomicCounter()) + { + } + + void ResetCounters() { + Replies.Reset(new TAtomicCounter()); + MaxConns.Reset(new TAtomicCounter()); + } + + long RepliesCount() const { + return Replies->Val(); + } + + long MaxConnsCount() const { + return MaxConns->Val(); + } + + TClientRequest* CreateClient() override { + return new TReplier(this); + } + + void OnMaxConn() override { + MaxConns->Inc(); + } + + void OnFailRequest(int) override { + FreeThread(); + Busy(1); + } + + void Busy(int count) { + while (count-- > 0) { + Semaphore.Acquire(); + } + } + + void BusyThread() { + Semaphore2.Acquire(); + } + + void Free(int count) { + while (count-- > 0) { + Semaphore.Release(); + } + } + + void FreeThread() { + Semaphore2.Release(); + } + + private: + TSemaphore Semaphore; + TSemaphore Semaphore2; + THolder<TAtomicCounter> Replies; + THolder<TAtomicCounter> MaxConns; + }; + static const TString CrLf = "\r\n"; struct TTestRequest { @@ -166,10 +166,10 @@ Y_UNIT_TEST_SUITE(THttpServerTest) { TSocket* s = nullptr; THolder<TSocket> singleReqSocket; if (KeepAliveConnection) { - if (!KeepAlivedSocket) { + if (!KeepAlivedSocket) { KeepAlivedSocket = MakeHolder<TSocket>(TNetworkAddress("localhost", Port), TDuration::Seconds(10)); - } - s = KeepAlivedSocket.Get(); + } + s = KeepAlivedSocket.Get(); } else { TNetworkAddress addr("localhost", Port); singleReqSocket.Reset(new TSocket(addr, TDuration::Seconds(10))); @@ -283,7 +283,7 @@ Y_UNIT_TEST_SUITE(THttpServerTest) { TString ContentEncoding; TString Content; bool KeepAliveConnection = false; - THolder<TSocket> KeepAlivedSocket; + THolder<TSocket> KeepAlivedSocket; bool EnableResponseEncoding = false; TString Hdr; bool Expect100Continue = false; @@ -455,7 +455,7 @@ Y_UNIT_TEST_SUITE(THttpServerTest) { } server.Stop(); } - + class TReleaseConnectionServer: public THttpServer::ICallBack { class TRequest: public THttpClientRequestEx { public: @@ -686,54 +686,54 @@ Y_UNIT_TEST_SUITE(THttpServerTest) { #if 0 Y_UNIT_TEST(TestSocketsLeak) { - const bool trueFalse[] = {true, false}; - TPortManager portManager; - const ui16 port = portManager.GetPort(); - TString res = TestData(25); - TSleepingServer server(3); - THttpServer::TOptions options(port); - options.MaxConnections = 1; - options.MaxQueueSize = 1; - options.MaxFQueueSize = 2; - options.nFThreads = 2; - options.KeepAliveEnabled = true; - options.RejectExcessConnections = true; - THttpServer srv(&server, options); - UNIT_ASSERT(srv.Start()); - - for (bool keepAlive : trueFalse) { - server.ResetCounters(); + const bool trueFalse[] = {true, false}; + TPortManager portManager; + const ui16 port = portManager.GetPort(); + TString res = TestData(25); + TSleepingServer server(3); + THttpServer::TOptions options(port); + options.MaxConnections = 1; + options.MaxQueueSize = 1; + options.MaxFQueueSize = 2; + options.nFThreads = 2; + options.KeepAliveEnabled = true; + options.RejectExcessConnections = true; + THttpServer srv(&server, options); + UNIT_ASSERT(srv.Start()); + + for (bool keepAlive : trueFalse) { + server.ResetCounters(); TVector<TAutoPtr<IThreadFactory::IThread>> threads; - - server.Busy(3); - server.BusyThread(); - - for (size_t i = 0; i < 3; ++i) { + + server.Busy(3); + server.BusyThread(); + + for (size_t i = 0; i < 3; ++i) { auto func = [&server, port, keepAlive]() { - server.BusyThread(); + server.BusyThread(); THolder<TTestRequest> r = MakeHolder<TTestRequest>(port); - r->KeepAliveConnection = keepAlive; - r->Execute(); - }; + r->KeepAliveConnection = keepAlive; + r->Execute(); + }; threads.push_back(SystemThreadFactory()->Run(func)); - } - - server.FreeThread(); // all threads get connection & go to processing - Sleep(TDuration::MilliSeconds(100)); - server.BusyThread(); // we wait while connections are established by the - // system and accepted by the server + } + + server.FreeThread(); // all threads get connection & go to processing + Sleep(TDuration::MilliSeconds(100)); + server.BusyThread(); // we wait while connections are established by the + // system and accepted by the server server.Free(3); // we release all connections processing - - for (auto&& thread : threads) { - thread->Join(); - } - - server.Free(3); - server.FreeThread(); - - UNIT_ASSERT_EQUAL_C(server.MaxConnsCount(), 2, "we should get MaxConn notification 2 times, got " + ToString(server.MaxConnsCount())); - UNIT_ASSERT_EQUAL_C(server.RepliesCount(), 1, "only one request should have been processed, got " + ToString(server.RepliesCount())); - } - } + + for (auto&& thread : threads) { + thread->Join(); + } + + server.Free(3); + server.FreeThread(); + + UNIT_ASSERT_EQUAL_C(server.MaxConnsCount(), 2, "we should get MaxConn notification 2 times, got " + ToString(server.MaxConnsCount())); + UNIT_ASSERT_EQUAL_C(server.RepliesCount(), 1, "only one request should have been processed, got " + ToString(server.RepliesCount())); + } + } #endif } diff --git a/library/cpp/http/server/options.h b/library/cpp/http/server/options.h index fae4176ae0..38eda0e5e7 100644 --- a/library/cpp/http/server/options.h +++ b/library/cpp/http/server/options.h @@ -44,12 +44,12 @@ public: return *this; } - inline THttpServerOptions& EnableRejectExcessConnections(bool enable) noexcept { - RejectExcessConnections = enable; - - return *this; - } - + inline THttpServerOptions& EnableRejectExcessConnections(bool enable) noexcept { + RejectExcessConnections = enable; + + return *this; + } + inline THttpServerOptions& EnableReusePort(bool enable) noexcept { ReusePort = enable; @@ -148,7 +148,7 @@ public: bool KeepAliveEnabled = true; bool CompressionEnabled = false; - bool RejectExcessConnections = false; + bool RejectExcessConnections = false; bool ReusePort = false; // set SO_REUSEPORT socket option bool ReuseAddress = true; // set SO_REUSEADDR socket option TAddrs BindSockaddr; |