diff options
| author | Anton Samokhvalov <[email protected]> | 2022-02-10 16:45:15 +0300 | 
|---|---|---|
| committer | Daniil Cherednik <[email protected]> | 2022-02-10 16:45:15 +0300 | 
| commit | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch) | |
| tree | da2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /library/cpp/http/server/http.cpp | |
| parent | 778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff) | |
Restoring authorship annotation for Anton Samokhvalov <[email protected]>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/http/server/http.cpp')
| -rw-r--r-- | library/cpp/http/server/http.cpp | 566 | 
1 files changed, 283 insertions, 283 deletions
| diff --git a/library/cpp/http/server/http.cpp b/library/cpp/http/server/http.cpp index 128583bdd70..581fc773995 100644 --- a/library/cpp/http/server/http.cpp +++ b/library/cpp/http/server/http.cpp @@ -1,12 +1,12 @@  #include "http.h"  #include "http_ex.h" - +   #include <library/cpp/threading/equeue/equeue.h>  #include <util/generic/buffer.h>  #include <util/generic/cast.h> -#include <util/generic/intrlist.h> -#include <util/generic/yexception.h> +#include <util/generic/intrlist.h>  +#include <util/generic/yexception.h>   #include <util/network/address.h>  #include <util/network/socket.h>  #include <util/network/poller.h> @@ -18,7 +18,7 @@  #include <util/system/pipe.h>  #include <util/system/thread.h>  #include <util/thread/factory.h> - +   #include <cerrno>  #include <cstring>  #include <ctime> @@ -28,39 +28,39 @@  using namespace NAddr; -namespace { -    class IPollAble { -    public: +namespace {  +    class IPollAble {  +    public:           inline IPollAble() noexcept { -        } - -        virtual ~IPollAble() { -        } - +        }  +  +        virtual ~IPollAble() {  +        }  +           virtual void OnPollEvent(TInstant now) = 0; -    }; +    };  +  +    struct TShouldStop {  +    };  -    struct TShouldStop { -    }; - -    struct TWakeupPollAble: public IPollAble { +    struct TWakeupPollAble: public IPollAble {           void OnPollEvent(TInstant) override { -            throw TShouldStop(); -        } -    }; -} - -class TClientConnection: public IPollAble, public TIntrusiveListItem<TClientConnection> { +            throw TShouldStop();  +        }  +    };  +}  +  +class TClientConnection: public IPollAble, public TIntrusiveListItem<TClientConnection> {   public:      TClientConnection(const TSocket& s, THttpServer::TImpl* serv, NAddr::IRemoteAddrRef listenerSockAddrRef);      ~TClientConnection() override; - +       void OnPollEvent(TInstant now) override; - +       inline void Activate(TInstant now) noexcept;      inline void DeActivate();      inline void Reject(); - +   public:      TSocket Socket_;      NAddr::IRemoteAddrRef ListenerSockAddrRef_; @@ -69,28 +69,28 @@ public:      TInstant LastUsed;      TInstant AcceptMoment;      size_t ReceivedRequests = 0; -}; - -class THttpServer::TImpl { +};  +  +class THttpServer::TImpl {   public:      class TConnections { -    public: +    public:           inline TConnections(TSocketPoller* poller, const THttpServerOptions& options)              : Poller_(poller)              , Options(options)          {          } - +           inline ~TConnections() {          } - +           inline void Add(TClientConnection* c) noexcept {              TGuard<TMutex> g(Mutex_); - +               Conns_.PushBack(c);              Poller_->WaitRead(c->Socket_, (void*)static_cast<const IPollAble*>(c));          } - +           inline void Erase(TClientConnection* c, TInstant now) noexcept {              TGuard<TMutex> g(Mutex_);              EraseUnsafe(c); @@ -98,18 +98,18 @@ public:                  TryRemovingUnsafe(now - Options.ExpirationTimeout);              }          } - +           inline void Clear() noexcept {              TGuard<TMutex> g(Mutex_); - +               Conns_.Clear();          } - +           inline bool RemoveOld(TInstant border) noexcept {              TGuard<TMutex> g(Mutex_);              return TryRemovingUnsafe(border);          } - +           bool TryRemovingUnsafe(TInstant border) noexcept {              if (Conns_.Empty()) {                  return false; @@ -122,7 +122,7 @@ public:              delete c;              return true;          } - +           void EraseUnsafe(TClientConnection* c) noexcept {              Poller_->Unwait(c->Socket_);              c->Unlink(); @@ -134,7 +134,7 @@ public:          TSocketPoller* Poller_ = nullptr;          const THttpServerOptions& Options;      }; - +       static void* ListenSocketFunction(void* param) {          try {              ((TImpl*)param)->ListenSocket(); @@ -144,10 +144,10 @@ public:          return nullptr;      } - +       TAutoPtr<TClientRequest> CreateRequest(TAutoPtr<TClientConnection> c) {          THolder<TClientRequest> obj(Cb_->CreateClient()); - +           obj->Conn_.Reset(c.Release());          return obj; @@ -161,13 +161,13 @@ public:                  (new TClientConnection(s, this, listenerSockAddrRef))->Reject();                  return;              } -        } - +        }  +           auto connection = new TClientConnection(s, this, listenerSockAddrRef);          connection->LastUsed = now;          connection->DeActivate();      } - +       void SaveErrorCode() {          ErrorCode = WSAGetLastError();      } @@ -175,11 +175,11 @@ public:      int GetErrorCode() const {          return ErrorCode;      } - +       const char* GetError() const {          return LastSystemErrorText(ErrorCode);      } - +       bool Start() {          Poller.Reset(new TSocketPoller());          Connections.Reset(new TConnections(Poller.Get(), Options_)); @@ -202,14 +202,14 @@ public:          } catch (const yexception&) {              SaveErrorCode();              return false; -        } - +        }  +           // Wait until the thread has completely started and return the success indicator          ListenStartEvent.Wait(); - +           return ListenerRunningOK;      } - +       void Wait() {          Cb_->OnWait();          TGuard<TMutex> g(StopMutex); @@ -227,46 +227,46 @@ public:              ListenThread->Join();              ListenThread.Reset(nullptr);          } - +           while (ConnectionCount) {              usleep(10000);              Connections->Clear(); -        } - +        }  +           Connections.Destroy();          Poller.Destroy();      } - +       void Shutdown() {          ListenWakeupWriteFd.Write("", 1);          // ignore result      } - +       void AddRequest(TAutoPtr<TClientRequest> req, bool fail) {          struct TFailRequest: public THttpClientRequestEx { -            inline TFailRequest(TAutoPtr<TClientRequest> parent) { -                Conn_.Reset(parent->Conn_.Release()); -                HttpConn_.Reset(parent->HttpConn_.Release()); -            } - +            inline TFailRequest(TAutoPtr<TClientRequest> parent) {  +                Conn_.Reset(parent->Conn_.Release());  +                HttpConn_.Reset(parent->HttpConn_.Release());  +            }  +               bool Reply(void*) override {                  if (!ProcessHeaders()) {                      return true;                  } -                ProcessFailRequest(0); -                return true; -            } +                ProcessFailRequest(0);  +                return true;  +            }           }; - +           if (!fail && Requests->Add(req.Get())) {              Y_UNUSED(req.Release());          } else {              req = new TFailRequest(req); - +               if (FailRequests->Add(req.Get())) {                  Y_UNUSED(req.Release()); -            } else { +            } else {                   Cb_->OnFailRequest(-1);              }          } @@ -288,44 +288,44 @@ public:          return *FailRequests;      } -    class TListenSocket: public IPollAble, public TIntrusiveListItem<TListenSocket> { -    public: -        inline TListenSocket(const TSocket& s, TImpl* parent) -            : S_(s) -            , Server_(parent) +    class TListenSocket: public IPollAble, public TIntrusiveListItem<TListenSocket> {  +    public:  +        inline TListenSocket(const TSocket& s, TImpl* parent)  +            : S_(s)  +            , Server_(parent)               , SockAddrRef_(GetSockAddr(S_)) -        { -        } - +        {  +        }  +           ~TListenSocket() override { -        } - +        }  +           void OnPollEvent(TInstant) override {              SOCKET s = ::accept(S_, nullptr, nullptr); - -            if (s == INVALID_SOCKET) { -                ythrow yexception() << "accept: " << LastSystemErrorText(); +  +            if (s == INVALID_SOCKET) {  +                ythrow yexception() << "accept: " << LastSystemErrorText();               }              Server_->AddRequestFromSocket(s, TInstant::Now(), SockAddrRef_); -        } +        }           SOCKET GetSocket() const noexcept { -            return S_; -        } - -    private: -        TSocket S_; +            return S_;  +        }  +  +    private:  +        TSocket S_;           TImpl* Server_ = nullptr;          NAddr::IRemoteAddrRef SockAddrRef_;      }; - +       void ListenSocket() {          TThread::SetCurrentThreadName(Options_.ListenThreadName.c_str());          ErrorCode = 0;          TIntrusiveListWithAutoDelete<TListenSocket, TDelete> Reqs; - +           std::function<void(TSocket)> callback = [&](TSocket socket) {              THolder<TListenSocket> ls(new TListenSocket(socket, this));              Poller->WaitRead(socket, static_cast<IPollAble*>(ls.Get())); @@ -335,7 +335,7 @@ public:          if (!addressesBound) {              SaveErrorCode();              ListenStartEvent.Signal(); - +               return;          } @@ -347,18 +347,18 @@ public:          TVector<void*> events;          events.resize(1); - +           TInstant now = TInstant::Now(); -        for (;;) { +        for (;;) {               try {                  const TInstant deadline = Options_.PollTimeout == TDuration::Zero() ? TInstant::Max() : now + Options_.PollTimeout;                  const size_t ret = Poller->WaitD(events.data(), events.size(), deadline); - +                   now = TInstant::Now();                  for (size_t i = 0; i < ret; ++i) {                      ((IPollAble*)events[i])->OnPollEvent(now);                  } - +                   if (ret == 0 && Options_.ExpirationTimeout > TDuration::Zero()) {                      Connections->RemoveOld(now - Options_.ExpirationTimeout);                  } @@ -370,26 +370,26 @@ public:                  if (!Options_.MaxConnections && Options_.ExpirationTimeout == TDuration::Zero()) {                      if (ret >= events.size()) {                          events.resize(ret * 2); -                    } -                } -            } catch (const TShouldStop&) { -                break; +                    }  +                }  +            } catch (const TShouldStop&) {  +                break;               } catch (...) {                  Cb_->OnException(); -            } -        } +            }  +        }           while (!Reqs.Empty()) {              THolder<TListenSocket> ls(Reqs.PopFront()); - +               Poller->Unwait(ls->GetSocket()); -        } - +        }  +           Requests->Stop();          FailRequests->Stop();          Cb_->OnListenStop();      } - +       void RestartRequestThreads(ui32 nTh, ui32 maxQS) {          Requests->Stop();          Options_.nThreads = nTh; @@ -408,24 +408,24 @@ public:      TImpl(THttpServer* parent, ICallBack* cb, const TOptions& options, IThreadFactory* factory)          : TImpl( -              parent, -              cb, +              parent,  +              cb,                 MakeThreadPool<TSimpleThreadPool>(factory, options.UseElasticQueues, cb, options.RequestsThreadName),                MakeThreadPool<TThreadPool>(factory, options.UseElasticQueues, nullptr, options.FailRequestsThreadName), -              options) { +              options) {       }      ~TImpl() {          try {              Stop();          } catch (...) { -        } +        }       } - +       inline const TOptions& Options() const noexcept {          return Options_;      } - +       inline void DecreaseConnections() noexcept {          AtomicDecrement(ConnectionCount);      } @@ -439,7 +439,7 @@ public:      }      inline bool MaxRequestsReached() const { -        return Options_.MaxConnections && ((size_t)GetClientCount() >= Options_.MaxConnections); +        return Options_.MaxConnections && ((size_t)GetClientCount() >= Options_.MaxConnections);       }      THolder<TThread> ListenThread; @@ -480,57 +480,57 @@ private:          return pool;      } -}; +};   THttpServer::THttpServer(ICallBack* cb, const TOptions& options, IThreadFactory* pool)      : Impl_(new TImpl(this, cb, options, pool)) -{ -} - +{  +}  +   THttpServer::THttpServer(ICallBack* cb, TMtpQueueRef mainWorkers, TMtpQueueRef failWorkers, const TOptions& options)      : Impl_(new TImpl(this, cb, mainWorkers, failWorkers, options))  {  } -THttpServer::~THttpServer() { -} - +THttpServer::~THttpServer() {  +}  +   i64 THttpServer::GetClientCount() const {      return Impl_->GetClientCount();  } -bool THttpServer::Start() { -    return Impl_->Start(); -} - -void THttpServer::Stop() { -    Impl_->Stop(); -} - -void THttpServer::Shutdown() { -    Impl_->Shutdown(); -} - -void THttpServer::Wait() { -    Impl_->Wait(); -} - -int THttpServer::GetErrorCode() { -    return Impl_->GetErrorCode(); -} - -const char* THttpServer::GetError() { -    return Impl_->GetError(); -} - -void THttpServer::RestartRequestThreads(ui32 n, ui32 queue) { -    Impl_->RestartRequestThreads(n, queue); -} - +bool THttpServer::Start() {  +    return Impl_->Start();  +}  +  +void THttpServer::Stop() {  +    Impl_->Stop();  +}  +  +void THttpServer::Shutdown() {  +    Impl_->Shutdown();  +}  +  +void THttpServer::Wait() {  +    Impl_->Wait();  +}  +  +int THttpServer::GetErrorCode() {  +    return Impl_->GetErrorCode();  +}  +  +const char* THttpServer::GetError() {  +    return Impl_->GetError();  +}  +  +void THttpServer::RestartRequestThreads(ui32 n, ui32 queue) {  +    Impl_->RestartRequestThreads(n, queue);  +}  +   const THttpServer::TOptions& THttpServer::Options() const noexcept { -    return Impl_->Options(); -} - +    return Impl_->Options();  +}  +   size_t THttpServer::GetRequestQueueSize() const {      return Impl_->GetRequestQueueSize();  } @@ -552,45 +552,45 @@ bool THttpServer::MaxRequestsReached() const {  }  TClientConnection::TClientConnection(const TSocket& s, THttpServer::TImpl* serv, NAddr::IRemoteAddrRef listenerSockAddrRef) -    : Socket_(s) +    : Socket_(s)       , ListenerSockAddrRef_(listenerSockAddrRef) -    , HttpServ_(serv) -{ -    SetNoDelay(Socket_, true); - +    , HttpServ_(serv)  +{  +    SetNoDelay(Socket_, true);  +       const TDuration& clientTimeout = HttpServ_->Options().ClientTimeout;      if (clientTimeout != TDuration::Zero()) {          SetSocketTimeout(Socket_, (long)clientTimeout.Seconds(), clientTimeout.MilliSecondsOfSecond());      } -    HttpServ_->IncreaseConnections(); -} - +    HttpServ_->IncreaseConnections();  +}  +   TClientConnection::~TClientConnection() { -    HttpServ_->DecreaseConnections(); -} - +    HttpServ_->DecreaseConnections();  +}  +   void TClientConnection::OnPollEvent(TInstant now) {      THolder<TClientConnection> this_(this);      Activate(now); - -    { -        char tmp[1]; - -        if (::recv(Socket_, tmp, 1, MSG_PEEK) < 1) { -            /* -             * We can received a FIN so our socket was moved to -             * TCP_CLOSE_WAIT state. Check it before adding work -             * for this socket. -             */ - -            return; -        } -    } - +  +    {  +        char tmp[1];  +  +        if (::recv(Socket_, tmp, 1, MSG_PEEK) < 1) {  +            /*  +             * We can received a FIN so our socket was moved to  +             * TCP_CLOSE_WAIT state. Check it before adding work  +             * for this socket.  +             */  +  +            return;  +        }  +    } +       THolder<TClientRequest> obj(HttpServ_->CreateRequest(this_));      AcceptMoment = now; - +       HttpServ_->AddRequest(obj, Reject_);  } @@ -600,35 +600,35 @@ void TClientConnection::Activate(TInstant now) noexcept {      ++ReceivedRequests;  } -void TClientConnection::DeActivate() { +void TClientConnection::DeActivate() {       HttpServ_->Connections->Add(this); -} - +}  +   void TClientConnection::Reject() {      Reject_ = true;      HttpServ_->Connections->Add(this);  } -TClientRequest::TClientRequest() { -} - -TClientRequest::~TClientRequest() { -} - -bool TClientRequest::Reply(void* /*ThreadSpecificResource*/) { +TClientRequest::TClientRequest() {  +}  +  +TClientRequest::~TClientRequest() {  +}  +  +bool TClientRequest::Reply(void* /*ThreadSpecificResource*/) {       if (strnicmp(RequestString.data(), "GET ", 4)) { -        Output() << "HTTP/1.0 501 Not Implemented\r\n\r\n"; -    } else { -        Output() << "HTTP/1.0 200 OK\r\n" -                    "Content-Type: text/html\r\n" -                    "\r\n" -                    "Hello World!\r\n"; -    } - -    return true; -} - +        Output() << "HTTP/1.0 501 Not Implemented\r\n\r\n";  +    } else {  +        Output() << "HTTP/1.0 200 OK\r\n"  +                    "Content-Type: text/html\r\n"  +                    "\r\n"  +                    "Hello World!\r\n";  +    }  +  +    return true;  +}  +   bool TClientRequest::IsLocal() const {      return HasLocalAddress(Socket());  } @@ -639,29 +639,29 @@ bool TClientRequest::CheckLoopback() {      try {          isLocal = IsLocal();      } catch (const yexception& e) { -        Output() << "HTTP/1.0 500 Oops\r\n\r\n" -                 << e.what() << "\r\n"; -        return false; +        Output() << "HTTP/1.0 500 Oops\r\n\r\n"  +                 << e.what() << "\r\n";  +        return false;       }      if (!isLocal) { -        Output() << "HTTP/1.0 403 Permission denied\r\n" -                    "Content-Type: text/html; charset=windows-1251\r\n" -                    "Connection: close\r\n" -                    "\r\n" -                    "<html><head><title>Permission denied</title></head>" -                    "<body><h1>Permission denied</h1>" -                    "<p>This request must be sent from the localhost.</p>" -                    "</body></html>\r\n"; - -        return false; -    } - -    return true; -} - +        Output() << "HTTP/1.0 403 Permission denied\r\n"  +                    "Content-Type: text/html; charset=windows-1251\r\n"  +                    "Connection: close\r\n"  +                    "\r\n"  +                    "<html><head><title>Permission denied</title></head>"  +                    "<body><h1>Permission denied</h1>"  +                    "<p>This request must be sent from the localhost.</p>"  +                    "</body></html>\r\n";  +  +        return false;  +    }  +  +    return true;  +}  +   void TClientRequest::ReleaseConnection() { -    if (Conn_ && HttpConn_ && HttpServ()->Options().KeepAliveEnabled && HttpConn_->CanBeKeepAlive() && (!HttpServ()->Options().RejectExcessConnections || !HttpServ()->MaxRequestsReached())) { +    if (Conn_ && HttpConn_ && HttpServ()->Options().KeepAliveEnabled && HttpConn_->CanBeKeepAlive() && (!HttpServ()->Options().RejectExcessConnections || !HttpServ()->MaxRequestsReached())) {           Output().Finish();          Conn_->DeActivate();          Y_UNUSED(Conn_.Release()); @@ -676,101 +676,101 @@ void TClientRequest::ResetConnection() {      }  } -void TClientRequest::Process(void* ThreadSpecificResource) { +void TClientRequest::Process(void* ThreadSpecificResource) {       THolder<TClientRequest> this_(this); - +       auto* serverImpl = Conn_->HttpServ_; -    try { -        if (!HttpConn_) { +    try {  +        if (!HttpConn_) {               const size_t outputBufferSize = HttpServ()->Options().OutputBufferSize;              if (outputBufferSize) {                  HttpConn_.Reset(new THttpServerConn(Socket(), outputBufferSize));              } else {                  HttpConn_.Reset(new THttpServerConn(Socket()));              } - +               auto maxRequestsPerConnection = HttpServ()->Options().MaxRequestsPerConnection;              HttpConn_->Output()->EnableKeepAlive(HttpServ()->Options().KeepAliveEnabled && (!maxRequestsPerConnection || Conn_->ReceivedRequests < maxRequestsPerConnection)); -            HttpConn_->Output()->EnableCompression(HttpServ()->Options().CompressionEnabled); -        } - +            HttpConn_->Output()->EnableCompression(HttpServ()->Options().CompressionEnabled);  +        }  +           if (ParsedHeaders.empty()) {              RequestString = Input().FirstLine(); - -            const THttpHeaders& h = Input().Headers(); +  +            const THttpHeaders& h = Input().Headers();               ParsedHeaders.reserve(h.Count()); -            for (THttpHeaders::TConstIterator it = h.Begin(); it != h.End(); ++it) { +            for (THttpHeaders::TConstIterator it = h.Begin(); it != h.End(); ++it) {                   ParsedHeaders.emplace_back(it->Name(), it->Value()); -            } -        } - -        if (Reply(ThreadSpecificResource)) { +            }  +        }  +  +        if (Reply(ThreadSpecificResource)) {               ReleaseConnection(); - -            /* -             * *this will be destroyed... -             */ - -            return; -        } -    } catch (...) { +  +            /*  +             * *this will be destroyed...  +             */  +  +            return;  +        }  +    } catch (...) {           serverImpl->Cb_->OnException(); - -        throw; -    } - +  +        throw;  +    }  +       Y_UNUSED(this_.Release()); -} - -void TClientRequest::ProcessFailRequest(int failstate) { +}  +  +void TClientRequest::ProcessFailRequest(int failstate) {       Output() << "HTTP/1.1 503 Service Unavailable\r\n"                  "Content-Type: text/plain\r\n"                  "Content-Length: 21\r\n"                  "\r\n"                  "Service Unavailable\r\n"; - +       TString url; - +       if (!strnicmp(RequestString.data(), "GET ", 4)) { -        // Trying to extract url... +        // Trying to extract url...           const char* str = RequestString.data(); - -        // Skipping spaces before url... -        size_t start = 3; +  +        // Skipping spaces before url...  +        size_t start = 3;           while (str[start] == ' ') { -            ++start; -        } - -        if (str[start]) { -            // Traversing url... -            size_t idx = start; - -            while (str[idx] != ' ' && str[idx]) { -                ++idx; -            } - +            ++start;  +        }  +  +        if (str[start]) {  +            // Traversing url...  +            size_t idx = start;  +  +            while (str[idx] != ' ' && str[idx]) {  +                ++idx;  +            }  +               url = RequestString.substr(start, idx - start); -        } -    } - -    const THttpServer::ICallBack::TFailLogData d = { -        failstate, -        url, -    }; - -    // Handling failure... -    Conn_->HttpServ_->Cb_->OnFailRequestEx(d); -    Output().Flush(); -} - +        }  +    }  +  +    const THttpServer::ICallBack::TFailLogData d = {  +        failstate,  +        url,  +    };  +  +    // Handling failure...  +    Conn_->HttpServ_->Cb_->OnFailRequestEx(d);  +    Output().Flush();  +}  +   THttpServer* TClientRequest::HttpServ() const noexcept { -    return Conn_->HttpServ_->Parent_; -} - +    return Conn_->HttpServ_->Parent_;  +}  +   const TSocket& TClientRequest::Socket() const noexcept { -    return Conn_->Socket_; -} +    return Conn_->Socket_;  +}   NAddr::IRemoteAddrRef TClientRequest::GetListenerSockAddrRef() const noexcept {      return Conn_->ListenerSockAddrRef_; @@ -791,8 +791,8 @@ TRequestReplier::~TRequestReplier() {  bool TRequestReplier::Reply(void* threadSpecificResource) {      const TReplyParams params = { -        threadSpecificResource, Input(), Output()}; - +        threadSpecificResource, Input(), Output()};  +       return DoReply(params);  } @@ -833,7 +833,7 @@ bool TryToBindAddresses(const THttpServerOptions& options, const std::function<v                  return false;              } -            if (callbackOnBoundAddress != nullptr) { +            if (callbackOnBoundAddress != nullptr) {                   (*callbackOnBoundAddress)(socket);              }          } | 
