diff options
| author | markov <[email protected]> | 2022-02-10 16:49:38 +0300 | 
|---|---|---|
| committer | Daniil Cherednik <[email protected]> | 2022-02-10 16:49:38 +0300 | 
| commit | 22a7d4be7e5b779e4deee4dfe36234a1eae13fbd (patch) | |
| tree | 5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp | |
| parent | c35714eae9ffbb7be89036495af366fa51a8c097 (diff) | |
Restoring authorship annotation for <[email protected]>. Commit 2 of 2.
Diffstat (limited to 'library/cpp')
| -rw-r--r-- | library/cpp/http/server/http.cpp | 48 | ||||
| -rw-r--r-- | library/cpp/http/server/http.h | 6 | ||||
| -rw-r--r-- | library/cpp/http/server/http_ut.cpp | 266 | ||||
| -rw-r--r-- | library/cpp/http/server/options.h | 14 | 
4 files changed, 167 insertions, 167 deletions
| diff --git a/library/cpp/http/server/http.cpp b/library/cpp/http/server/http.cpp index 7ae2afe5537..128583bdd70 100644 --- a/library/cpp/http/server/http.cpp +++ b/library/cpp/http/server/http.cpp @@ -59,13 +59,13 @@ public:      inline void Activate(TInstant now) noexcept;      inline void DeActivate(); -    inline void Reject();  +    inline void Reject();  public:      TSocket Socket_;      NAddr::IRemoteAddrRef ListenerSockAddrRef_;      THttpServer::TImpl* HttpServ_ = nullptr; -    bool Reject_ = false;  +    bool Reject_ = false;      TInstant LastUsed;      TInstant AcceptMoment;      size_t ReceivedRequests = 0; @@ -112,7 +112,7 @@ public:          bool TryRemovingUnsafe(TInstant border) noexcept {              if (Conns_.Empty()) { -                return false;  +                return false;              }              TClientConnection* c = &*(Conns_.Begin());              if (c->LastUsed > border) { @@ -120,7 +120,7 @@ public:              }              EraseUnsafe(c);              delete c; -            return true;  +            return true;          }          void EraseUnsafe(TClientConnection* c) noexcept { @@ -154,13 +154,13 @@ public:      }      void AddRequestFromSocket(const TSocket& s, TInstant now, NAddr::IRemoteAddrRef listenerSockAddrRef) { -        if (MaxRequestsReached()) {  +        if (MaxRequestsReached()) {              Cb_->OnMaxConn();              bool wasRemoved = Connections->RemoveOld(TInstant::Max()); -            if (!wasRemoved && Options_.RejectExcessConnections) {  +            if (!wasRemoved && Options_.RejectExcessConnections) {                  (new TClientConnection(s, this, listenerSockAddrRef))->Reject(); -                return;  -            }  +                return; +            }          }          auto connection = new TClientConnection(s, this, listenerSockAddrRef); @@ -242,7 +242,7 @@ public:          // ignore result      } -    void AddRequest(TAutoPtr<TClientRequest> req, bool fail) {  +    void AddRequest(TAutoPtr<TClientRequest> req, bool fail) {          struct TFailRequest: public THttpClientRequestEx {              inline TFailRequest(TAutoPtr<TClientRequest> parent) {                  Conn_.Reset(parent->Conn_.Release()); @@ -259,7 +259,7 @@ public:              }          }; -        if (!fail && Requests->Add(req.Get())) {  +        if (!fail && Requests->Add(req.Get())) {              Y_UNUSED(req.Release());          } else {              req = new TFailRequest(req); @@ -438,10 +438,10 @@ public:          return AtomicGet(ConnectionCount);      } -    inline bool MaxRequestsReached() const {  +    inline bool MaxRequestsReached() const {          return Options_.MaxConnections && ((size_t)GetClientCount() >= Options_.MaxConnections); -    }  -  +    } +      THolder<TThread> ListenThread;      TPipeHandle ListenWakeupReadFd;      TPipeHandle ListenWakeupWriteFd; @@ -547,10 +547,10 @@ const IThreadPool& THttpServer::GetFailQueue() const {      return Impl_->GetFailQueue();  } -bool THttpServer::MaxRequestsReached() const {  -    return Impl_->MaxRequestsReached();  -}  -  +bool THttpServer::MaxRequestsReached() const { +    return Impl_->MaxRequestsReached(); +} +  TClientConnection::TClientConnection(const TSocket& s, THttpServer::TImpl* serv, NAddr::IRemoteAddrRef listenerSockAddrRef)      : Socket_(s)      , ListenerSockAddrRef_(listenerSockAddrRef) @@ -591,7 +591,7 @@ void TClientConnection::OnPollEvent(TInstant now) {      THolder<TClientRequest> obj(HttpServ_->CreateRequest(this_));      AcceptMoment = now; -    HttpServ_->AddRequest(obj, Reject_);  +    HttpServ_->AddRequest(obj, Reject_);  }  void TClientConnection::Activate(TInstant now) noexcept { @@ -604,12 +604,12 @@ void TClientConnection::DeActivate() {      HttpServ_->Connections->Add(this);  } -void TClientConnection::Reject() {  -    Reject_ = true;  -  -    HttpServ_->Connections->Add(this);  -}  -  +void TClientConnection::Reject() { +    Reject_ = true; + +    HttpServ_->Connections->Add(this); +} +  TClientRequest::TClientRequest() {  } diff --git a/library/cpp/http/server/http.h b/library/cpp/http/server/http.h index 1639e3ff59f..b292d38f270 100644 --- a/library/cpp/http/server/http.h +++ b/library/cpp/http/server/http.h @@ -93,9 +93,9 @@ public:      static TAtomicBase AcceptReturnsInvalidSocketCounter();  private: -    bool MaxRequestsReached() const;  -  -private:  +    bool MaxRequestsReached() const; + +private:      THolder<TImpl> Impl_;  }; diff --git a/library/cpp/http/server/http_ut.cpp b/library/cpp/http/server/http_ut.cpp index 1674efa4d21..cc62bb988e7 100644 --- a/library/cpp/http/server/http_ut.cpp +++ b/library/cpp/http/server/http_ut.cpp @@ -8,7 +8,7 @@  #include <util/stream/output.h>  #include <util/stream/zlib.h>  #include <util/system/datetime.h> -#include <util/system/sem.h>  +#include <util/system/sem.h>  Y_UNIT_TEST_SUITE(THttpServerTest) {      class TEchoServer: public THttpServer::ICallBack { @@ -53,89 +53,89 @@ Y_UNIT_TEST_SUITE(THttpServerTest) {          TString Res_;      }; -    class TSleepingServer: public THttpServer::ICallBack {  -        class TReplier: public TRequestReplier {  -        public:  -            inline TReplier(TSleepingServer* server)  -                : Server(server)  -            {  -            }  -  -            bool DoReply(const TReplyParams& params) override {  -                Server->FreeThread();  -                Server->Busy(1);  -                params.Output.Write("HTTP/1.0 201 Created\nX-Server: sleeping server\n\nZoooo");  -                params.Output.Finish();  -                Server->Replies->Inc();  -                return true;  -            }  -  -        private:  -            TSleepingServer* Server = nullptr;  -        };  -  -    public:  -        inline TSleepingServer(unsigned int size)  -            : Semaphore("conns", size)  -            , Semaphore2("threads", 1)  -            , Replies(new TAtomicCounter())  -            , MaxConns(new TAtomicCounter())  -        {  -        }  -  -        void ResetCounters() {  -            Replies.Reset(new TAtomicCounter());  -            MaxConns.Reset(new TAtomicCounter());  -        }  -  -        long RepliesCount() const {  -            return Replies->Val();  -        }  -  -        long MaxConnsCount() const {  -            return MaxConns->Val();  -        }  -  -        TClientRequest* CreateClient() override {  -            return new TReplier(this);  -        }  -  -        void OnMaxConn() override {  -            MaxConns->Inc();  -        }  -  -        void OnFailRequest(int) override {  -            FreeThread();  -            Busy(1);  -        }  -  -        void Busy(int count) {  -            while (count-- > 0) {  -                Semaphore.Acquire();  -            }  -        }  -  -        void BusyThread() {  -            Semaphore2.Acquire();  -        }  -  -        void Free(int count) {  -            while (count-- > 0) {  -                Semaphore.Release();  -            }  -        }  -  -        void FreeThread() {  -            Semaphore2.Release();  -        }  -  -    private:  -        TSemaphore Semaphore;  -        TSemaphore Semaphore2;  -        THolder<TAtomicCounter> Replies;  -        THolder<TAtomicCounter> MaxConns;  -    };  -  +    class TSleepingServer: public THttpServer::ICallBack { +        class TReplier: public TRequestReplier { +        public: +            inline TReplier(TSleepingServer* server) +                : Server(server) +            { +            } + +            bool DoReply(const TReplyParams& params) override { +                Server->FreeThread(); +                Server->Busy(1); +                params.Output.Write("HTTP/1.0 201 Created\nX-Server: sleeping server\n\nZoooo"); +                params.Output.Finish(); +                Server->Replies->Inc(); +                return true; +            } + +        private: +            TSleepingServer* Server = nullptr; +        }; + +    public: +        inline TSleepingServer(unsigned int size) +            : Semaphore("conns", size) +            , Semaphore2("threads", 1) +            , Replies(new TAtomicCounter()) +            , MaxConns(new TAtomicCounter()) +        { +        } + +        void ResetCounters() { +            Replies.Reset(new TAtomicCounter()); +            MaxConns.Reset(new TAtomicCounter()); +        } + +        long RepliesCount() const { +            return Replies->Val(); +        } + +        long MaxConnsCount() const { +            return MaxConns->Val(); +        } + +        TClientRequest* CreateClient() override { +            return new TReplier(this); +        } + +        void OnMaxConn() override { +            MaxConns->Inc(); +        } + +        void OnFailRequest(int) override { +            FreeThread(); +            Busy(1); +        } + +        void Busy(int count) { +            while (count-- > 0) { +                Semaphore.Acquire(); +            } +        } + +        void BusyThread() { +            Semaphore2.Acquire(); +        } + +        void Free(int count) { +            while (count-- > 0) { +                Semaphore.Release(); +            } +        } + +        void FreeThread() { +            Semaphore2.Release(); +        } + +    private: +        TSemaphore Semaphore; +        TSemaphore Semaphore2; +        THolder<TAtomicCounter> Replies; +        THolder<TAtomicCounter> MaxConns; +    }; +      static const TString CrLf = "\r\n";      struct TTestRequest { @@ -166,10 +166,10 @@ Y_UNIT_TEST_SUITE(THttpServerTest) {              TSocket* s = nullptr;              THolder<TSocket> singleReqSocket;              if (KeepAliveConnection) { -                if (!KeepAlivedSocket) {  +                if (!KeepAlivedSocket) {                      KeepAlivedSocket = MakeHolder<TSocket>(TNetworkAddress("localhost", Port), TDuration::Seconds(10)); -                }  -                s = KeepAlivedSocket.Get();  +                } +                s = KeepAlivedSocket.Get();              } else {                  TNetworkAddress addr("localhost", Port);                  singleReqSocket.Reset(new TSocket(addr, TDuration::Seconds(10))); @@ -283,7 +283,7 @@ Y_UNIT_TEST_SUITE(THttpServerTest) {          TString ContentEncoding;          TString Content;          bool KeepAliveConnection = false; -        THolder<TSocket> KeepAlivedSocket;  +        THolder<TSocket> KeepAlivedSocket;          bool EnableResponseEncoding = false;          TString Hdr;          bool Expect100Continue = false; @@ -455,7 +455,7 @@ Y_UNIT_TEST_SUITE(THttpServerTest) {          }          server.Stop();      } -  +      class TReleaseConnectionServer: public THttpServer::ICallBack {          class TRequest: public THttpClientRequestEx {          public: @@ -686,54 +686,54 @@ Y_UNIT_TEST_SUITE(THttpServerTest) {  #if 0      Y_UNIT_TEST(TestSocketsLeak) { -        const bool trueFalse[] = {true, false};  -        TPortManager portManager;  -        const ui16 port = portManager.GetPort();  -        TString res = TestData(25);  -        TSleepingServer server(3);  -        THttpServer::TOptions options(port);  -        options.MaxConnections = 1;  -        options.MaxQueueSize = 1;  -        options.MaxFQueueSize = 2;  -        options.nFThreads = 2;  -        options.KeepAliveEnabled = true;  -        options.RejectExcessConnections = true;  -        THttpServer srv(&server, options);  -        UNIT_ASSERT(srv.Start());  -  -        for (bool keepAlive : trueFalse) {  -            server.ResetCounters();  +        const bool trueFalse[] = {true, false}; +        TPortManager portManager; +        const ui16 port = portManager.GetPort(); +        TString res = TestData(25); +        TSleepingServer server(3); +        THttpServer::TOptions options(port); +        options.MaxConnections = 1; +        options.MaxQueueSize = 1; +        options.MaxFQueueSize = 2; +        options.nFThreads = 2; +        options.KeepAliveEnabled = true; +        options.RejectExcessConnections = true; +        THttpServer srv(&server, options); +        UNIT_ASSERT(srv.Start()); + +        for (bool keepAlive : trueFalse) { +            server.ResetCounters();              TVector<TAutoPtr<IThreadFactory::IThread>> threads; -  -            server.Busy(3);  -            server.BusyThread();  -  -            for (size_t i = 0; i < 3; ++i) {  + +            server.Busy(3); +            server.BusyThread(); + +            for (size_t i = 0; i < 3; ++i) {                  auto func = [&server, port, keepAlive]() { -                    server.BusyThread();  +                    server.BusyThread();                      THolder<TTestRequest> r = MakeHolder<TTestRequest>(port); -                    r->KeepAliveConnection = keepAlive;  -                    r->Execute();  -                };  +                    r->KeepAliveConnection = keepAlive; +                    r->Execute(); +                };                  threads.push_back(SystemThreadFactory()->Run(func)); -            }  -  -            server.FreeThread(); // all threads get connection & go to processing  -            Sleep(TDuration::MilliSeconds(100));  -            server.BusyThread(); // we wait while connections are established by the  -                                 // system and accepted by the server  +            } + +            server.FreeThread(); // all threads get connection & go to processing +            Sleep(TDuration::MilliSeconds(100)); +            server.BusyThread(); // we wait while connections are established by the +                                 // system and accepted by the server              server.Free(3);      // we release all connections processing -  -            for (auto&& thread : threads) {  -                thread->Join();  -            }  -  -            server.Free(3);  -            server.FreeThread();  -  -            UNIT_ASSERT_EQUAL_C(server.MaxConnsCount(), 2, "we should get MaxConn notification 2 times, got " + ToString(server.MaxConnsCount()));  -            UNIT_ASSERT_EQUAL_C(server.RepliesCount(), 1, "only one request should have been processed, got " + ToString(server.RepliesCount()));  -        }  -    }  + +            for (auto&& thread : threads) { +                thread->Join(); +            } + +            server.Free(3); +            server.FreeThread(); + +            UNIT_ASSERT_EQUAL_C(server.MaxConnsCount(), 2, "we should get MaxConn notification 2 times, got " + ToString(server.MaxConnsCount())); +            UNIT_ASSERT_EQUAL_C(server.RepliesCount(), 1, "only one request should have been processed, got " + ToString(server.RepliesCount())); +        } +    }  #endif  } diff --git a/library/cpp/http/server/options.h b/library/cpp/http/server/options.h index fae4176ae05..38eda0e5e78 100644 --- a/library/cpp/http/server/options.h +++ b/library/cpp/http/server/options.h @@ -44,12 +44,12 @@ public:          return *this;      } -    inline THttpServerOptions& EnableRejectExcessConnections(bool enable) noexcept {  -        RejectExcessConnections = enable;  -  -        return *this;  -    }  -  +    inline THttpServerOptions& EnableRejectExcessConnections(bool enable) noexcept { +        RejectExcessConnections = enable; + +        return *this; +    } +      inline THttpServerOptions& EnableReusePort(bool enable) noexcept {          ReusePort = enable; @@ -148,7 +148,7 @@ public:      bool KeepAliveEnabled = true;      bool CompressionEnabled = false; -    bool RejectExcessConnections = false;  +    bool RejectExcessConnections = false;      bool ReusePort = false; // set SO_REUSEPORT socket option      bool ReuseAddress = true; // set SO_REUSEADDR socket option      TAddrs BindSockaddr; | 
