diff options
| author | markov <[email protected]> | 2022-02-10 16:49:38 +0300 | 
|---|---|---|
| committer | Daniil Cherednik <[email protected]> | 2022-02-10 16:49:38 +0300 | 
| commit | c35714eae9ffbb7be89036495af366fa51a8c097 (patch) | |
| tree | d5c744b4b3864e5587519941892e66cd0c9d72da | |
| parent | 71af077a5dfe7e9f932a508422c2dac81a57ebc0 (diff) | |
Restoring authorship annotation for <[email protected]>. Commit 1 of 2.
| -rw-r--r-- | library/cpp/http/server/http.cpp | 48 | ||||
| -rw-r--r-- | library/cpp/http/server/http.h | 6 | ||||
| -rw-r--r-- | library/cpp/http/server/http_ut.cpp | 266 | ||||
| -rw-r--r-- | library/cpp/http/server/options.h | 14 | 
4 files changed, 167 insertions, 167 deletions
| diff --git a/library/cpp/http/server/http.cpp b/library/cpp/http/server/http.cpp index 128583bdd70..7ae2afe5537 100644 --- a/library/cpp/http/server/http.cpp +++ b/library/cpp/http/server/http.cpp @@ -59,13 +59,13 @@ public:      inline void Activate(TInstant now) noexcept;      inline void DeActivate(); -    inline void Reject(); +    inline void Reject();   public:      TSocket Socket_;      NAddr::IRemoteAddrRef ListenerSockAddrRef_;      THttpServer::TImpl* HttpServ_ = nullptr; -    bool Reject_ = false; +    bool Reject_ = false;       TInstant LastUsed;      TInstant AcceptMoment;      size_t ReceivedRequests = 0; @@ -112,7 +112,7 @@ public:          bool TryRemovingUnsafe(TInstant border) noexcept {              if (Conns_.Empty()) { -                return false; +                return false;               }              TClientConnection* c = &*(Conns_.Begin());              if (c->LastUsed > border) { @@ -120,7 +120,7 @@ public:              }              EraseUnsafe(c);              delete c; -            return true; +            return true;           }          void EraseUnsafe(TClientConnection* c) noexcept { @@ -154,13 +154,13 @@ public:      }      void AddRequestFromSocket(const TSocket& s, TInstant now, NAddr::IRemoteAddrRef listenerSockAddrRef) { -        if (MaxRequestsReached()) { +        if (MaxRequestsReached()) {               Cb_->OnMaxConn();              bool wasRemoved = Connections->RemoveOld(TInstant::Max()); -            if (!wasRemoved && Options_.RejectExcessConnections) { +            if (!wasRemoved && Options_.RejectExcessConnections) {                   (new TClientConnection(s, this, listenerSockAddrRef))->Reject(); -                return; -            } +                return;  +            }           }          auto connection = new TClientConnection(s, this, listenerSockAddrRef); @@ -242,7 +242,7 @@ public:          // ignore result      } -    void AddRequest(TAutoPtr<TClientRequest> req, bool fail) { +    void AddRequest(TAutoPtr<TClientRequest> req, bool fail) {           struct TFailRequest: public THttpClientRequestEx {              inline TFailRequest(TAutoPtr<TClientRequest> parent) {                  Conn_.Reset(parent->Conn_.Release()); @@ -259,7 +259,7 @@ public:              }          }; -        if (!fail && Requests->Add(req.Get())) { +        if (!fail && Requests->Add(req.Get())) {               Y_UNUSED(req.Release());          } else {              req = new TFailRequest(req); @@ -438,10 +438,10 @@ public:          return AtomicGet(ConnectionCount);      } -    inline bool MaxRequestsReached() const { +    inline bool MaxRequestsReached() const {           return Options_.MaxConnections && ((size_t)GetClientCount() >= Options_.MaxConnections); -    } - +    }  +       THolder<TThread> ListenThread;      TPipeHandle ListenWakeupReadFd;      TPipeHandle ListenWakeupWriteFd; @@ -547,10 +547,10 @@ const IThreadPool& THttpServer::GetFailQueue() const {      return Impl_->GetFailQueue();  } -bool THttpServer::MaxRequestsReached() const { -    return Impl_->MaxRequestsReached(); -} - +bool THttpServer::MaxRequestsReached() const {  +    return Impl_->MaxRequestsReached();  +}  +   TClientConnection::TClientConnection(const TSocket& s, THttpServer::TImpl* serv, NAddr::IRemoteAddrRef listenerSockAddrRef)      : Socket_(s)      , ListenerSockAddrRef_(listenerSockAddrRef) @@ -591,7 +591,7 @@ void TClientConnection::OnPollEvent(TInstant now) {      THolder<TClientRequest> obj(HttpServ_->CreateRequest(this_));      AcceptMoment = now; -    HttpServ_->AddRequest(obj, Reject_); +    HttpServ_->AddRequest(obj, Reject_);   }  void TClientConnection::Activate(TInstant now) noexcept { @@ -604,12 +604,12 @@ void TClientConnection::DeActivate() {      HttpServ_->Connections->Add(this);  } -void TClientConnection::Reject() { -    Reject_ = true; - -    HttpServ_->Connections->Add(this); -} - +void TClientConnection::Reject() {  +    Reject_ = true;  +  +    HttpServ_->Connections->Add(this);  +}  +   TClientRequest::TClientRequest() {  } diff --git a/library/cpp/http/server/http.h b/library/cpp/http/server/http.h index b292d38f270..1639e3ff59f 100644 --- a/library/cpp/http/server/http.h +++ b/library/cpp/http/server/http.h @@ -93,9 +93,9 @@ public:      static TAtomicBase AcceptReturnsInvalidSocketCounter();  private: -    bool MaxRequestsReached() const; - -private: +    bool MaxRequestsReached() const;  +  +private:       THolder<TImpl> Impl_;  }; diff --git a/library/cpp/http/server/http_ut.cpp b/library/cpp/http/server/http_ut.cpp index cc62bb988e7..1674efa4d21 100644 --- a/library/cpp/http/server/http_ut.cpp +++ b/library/cpp/http/server/http_ut.cpp @@ -8,7 +8,7 @@  #include <util/stream/output.h>  #include <util/stream/zlib.h>  #include <util/system/datetime.h> -#include <util/system/sem.h> +#include <util/system/sem.h>   Y_UNIT_TEST_SUITE(THttpServerTest) {      class TEchoServer: public THttpServer::ICallBack { @@ -53,89 +53,89 @@ Y_UNIT_TEST_SUITE(THttpServerTest) {          TString Res_;      }; -    class TSleepingServer: public THttpServer::ICallBack { -        class TReplier: public TRequestReplier { -        public: -            inline TReplier(TSleepingServer* server) -                : Server(server) -            { -            } - -            bool DoReply(const TReplyParams& params) override { -                Server->FreeThread(); -                Server->Busy(1); -                params.Output.Write("HTTP/1.0 201 Created\nX-Server: sleeping server\n\nZoooo"); -                params.Output.Finish(); -                Server->Replies->Inc(); -                return true; -            } - -        private: -            TSleepingServer* Server = nullptr; -        }; - -    public: -        inline TSleepingServer(unsigned int size) -            : Semaphore("conns", size) -            , Semaphore2("threads", 1) -            , Replies(new TAtomicCounter()) -            , MaxConns(new TAtomicCounter()) -        { -        } - -        void ResetCounters() { -            Replies.Reset(new TAtomicCounter()); -            MaxConns.Reset(new TAtomicCounter()); -        } - -        long RepliesCount() const { -            return Replies->Val(); -        } - -        long MaxConnsCount() const { -            return MaxConns->Val(); -        } - -        TClientRequest* CreateClient() override { -            return new TReplier(this); -        } - -        void OnMaxConn() override { -            MaxConns->Inc(); -        } - -        void OnFailRequest(int) override { -            FreeThread(); -            Busy(1); -        } - -        void Busy(int count) { -            while (count-- > 0) { -                Semaphore.Acquire(); -            } -        } - -        void BusyThread() { -            Semaphore2.Acquire(); -        } - -        void Free(int count) { -            while (count-- > 0) { -                Semaphore.Release(); -            } -        } - -        void FreeThread() { -            Semaphore2.Release(); -        } - -    private: -        TSemaphore Semaphore; -        TSemaphore Semaphore2; -        THolder<TAtomicCounter> Replies; -        THolder<TAtomicCounter> MaxConns; -    }; - +    class TSleepingServer: public THttpServer::ICallBack {  +        class TReplier: public TRequestReplier {  +        public:  +            inline TReplier(TSleepingServer* server)  +                : Server(server)  +            {  +            }  +  +            bool DoReply(const TReplyParams& params) override {  +                Server->FreeThread();  +                Server->Busy(1);  +                params.Output.Write("HTTP/1.0 201 Created\nX-Server: sleeping server\n\nZoooo");  +                params.Output.Finish();  +                Server->Replies->Inc();  +                return true;  +            }  +  +        private:  +            TSleepingServer* Server = nullptr;  +        };  +  +    public:  +        inline TSleepingServer(unsigned int size)  +            : Semaphore("conns", size)  +            , Semaphore2("threads", 1)  +            , Replies(new TAtomicCounter())  +            , MaxConns(new TAtomicCounter())  +        {  +        }  +  +        void ResetCounters() {  +            Replies.Reset(new TAtomicCounter());  +            MaxConns.Reset(new TAtomicCounter());  +        }  +  +        long RepliesCount() const {  +            return Replies->Val();  +        }  +  +        long MaxConnsCount() const {  +            return MaxConns->Val();  +        }  +  +        TClientRequest* CreateClient() override {  +            return new TReplier(this);  +        }  +  +        void OnMaxConn() override {  +            MaxConns->Inc();  +        }  +  +        void OnFailRequest(int) override {  +            FreeThread();  +            Busy(1);  +        }  +  +        void Busy(int count) {  +            while (count-- > 0) {  +                Semaphore.Acquire();  +            }  +        }  +  +        void BusyThread() {  +            Semaphore2.Acquire();  +        }  +  +        void Free(int count) {  +            while (count-- > 0) {  +                Semaphore.Release();  +            }  +        }  +  +        void FreeThread() {  +            Semaphore2.Release();  +        }  +  +    private:  +        TSemaphore Semaphore;  +        TSemaphore Semaphore2;  +        THolder<TAtomicCounter> Replies;  +        THolder<TAtomicCounter> MaxConns;  +    };  +       static const TString CrLf = "\r\n";      struct TTestRequest { @@ -166,10 +166,10 @@ Y_UNIT_TEST_SUITE(THttpServerTest) {              TSocket* s = nullptr;              THolder<TSocket> singleReqSocket;              if (KeepAliveConnection) { -                if (!KeepAlivedSocket) { +                if (!KeepAlivedSocket) {                       KeepAlivedSocket = MakeHolder<TSocket>(TNetworkAddress("localhost", Port), TDuration::Seconds(10)); -                } -                s = KeepAlivedSocket.Get(); +                }  +                s = KeepAlivedSocket.Get();               } else {                  TNetworkAddress addr("localhost", Port);                  singleReqSocket.Reset(new TSocket(addr, TDuration::Seconds(10))); @@ -283,7 +283,7 @@ Y_UNIT_TEST_SUITE(THttpServerTest) {          TString ContentEncoding;          TString Content;          bool KeepAliveConnection = false; -        THolder<TSocket> KeepAlivedSocket; +        THolder<TSocket> KeepAlivedSocket;           bool EnableResponseEncoding = false;          TString Hdr;          bool Expect100Continue = false; @@ -455,7 +455,7 @@ Y_UNIT_TEST_SUITE(THttpServerTest) {          }          server.Stop();      } - +       class TReleaseConnectionServer: public THttpServer::ICallBack {          class TRequest: public THttpClientRequestEx {          public: @@ -686,54 +686,54 @@ Y_UNIT_TEST_SUITE(THttpServerTest) {  #if 0      Y_UNIT_TEST(TestSocketsLeak) { -        const bool trueFalse[] = {true, false}; -        TPortManager portManager; -        const ui16 port = portManager.GetPort(); -        TString res = TestData(25); -        TSleepingServer server(3); -        THttpServer::TOptions options(port); -        options.MaxConnections = 1; -        options.MaxQueueSize = 1; -        options.MaxFQueueSize = 2; -        options.nFThreads = 2; -        options.KeepAliveEnabled = true; -        options.RejectExcessConnections = true; -        THttpServer srv(&server, options); -        UNIT_ASSERT(srv.Start()); - -        for (bool keepAlive : trueFalse) { -            server.ResetCounters(); +        const bool trueFalse[] = {true, false};  +        TPortManager portManager;  +        const ui16 port = portManager.GetPort();  +        TString res = TestData(25);  +        TSleepingServer server(3);  +        THttpServer::TOptions options(port);  +        options.MaxConnections = 1;  +        options.MaxQueueSize = 1;  +        options.MaxFQueueSize = 2;  +        options.nFThreads = 2;  +        options.KeepAliveEnabled = true;  +        options.RejectExcessConnections = true;  +        THttpServer srv(&server, options);  +        UNIT_ASSERT(srv.Start());  +  +        for (bool keepAlive : trueFalse) {  +            server.ResetCounters();               TVector<TAutoPtr<IThreadFactory::IThread>> threads; - -            server.Busy(3); -            server.BusyThread(); - -            for (size_t i = 0; i < 3; ++i) { +  +            server.Busy(3);  +            server.BusyThread();  +  +            for (size_t i = 0; i < 3; ++i) {                   auto func = [&server, port, keepAlive]() { -                    server.BusyThread(); +                    server.BusyThread();                       THolder<TTestRequest> r = MakeHolder<TTestRequest>(port); -                    r->KeepAliveConnection = keepAlive; -                    r->Execute(); -                }; +                    r->KeepAliveConnection = keepAlive;  +                    r->Execute();  +                };                   threads.push_back(SystemThreadFactory()->Run(func)); -            } - -            server.FreeThread(); // all threads get connection & go to processing -            Sleep(TDuration::MilliSeconds(100)); -            server.BusyThread(); // we wait while connections are established by the -                                 // system and accepted by the server +            }  +  +            server.FreeThread(); // all threads get connection & go to processing  +            Sleep(TDuration::MilliSeconds(100));  +            server.BusyThread(); // we wait while connections are established by the  +                                 // system and accepted by the server               server.Free(3);      // we release all connections processing - -            for (auto&& thread : threads) { -                thread->Join(); -            } - -            server.Free(3); -            server.FreeThread(); - -            UNIT_ASSERT_EQUAL_C(server.MaxConnsCount(), 2, "we should get MaxConn notification 2 times, got " + ToString(server.MaxConnsCount())); -            UNIT_ASSERT_EQUAL_C(server.RepliesCount(), 1, "only one request should have been processed, got " + ToString(server.RepliesCount())); -        } -    } +  +            for (auto&& thread : threads) {  +                thread->Join();  +            }  +  +            server.Free(3);  +            server.FreeThread();  +  +            UNIT_ASSERT_EQUAL_C(server.MaxConnsCount(), 2, "we should get MaxConn notification 2 times, got " + ToString(server.MaxConnsCount()));  +            UNIT_ASSERT_EQUAL_C(server.RepliesCount(), 1, "only one request should have been processed, got " + ToString(server.RepliesCount()));  +        }  +    }   #endif  } diff --git a/library/cpp/http/server/options.h b/library/cpp/http/server/options.h index 38eda0e5e78..fae4176ae05 100644 --- a/library/cpp/http/server/options.h +++ b/library/cpp/http/server/options.h @@ -44,12 +44,12 @@ public:          return *this;      } -    inline THttpServerOptions& EnableRejectExcessConnections(bool enable) noexcept { -        RejectExcessConnections = enable; - -        return *this; -    } - +    inline THttpServerOptions& EnableRejectExcessConnections(bool enable) noexcept {  +        RejectExcessConnections = enable;  +  +        return *this;  +    }  +       inline THttpServerOptions& EnableReusePort(bool enable) noexcept {          ReusePort = enable; @@ -148,7 +148,7 @@ public:      bool KeepAliveEnabled = true;      bool CompressionEnabled = false; -    bool RejectExcessConnections = false; +    bool RejectExcessConnections = false;       bool ReusePort = false; // set SO_REUSEPORT socket option      bool ReuseAddress = true; // set SO_REUSEADDR socket option      TAddrs BindSockaddr; | 
