diff options
author | Anton Samokhvalov <pg83@yandex.ru> | 2022-02-10 16:45:15 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:15 +0300 |
commit | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch) | |
tree | da2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /library/cpp/http/server/http.cpp | |
parent | 778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff) | |
download | ydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz |
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/http/server/http.cpp')
-rw-r--r-- | library/cpp/http/server/http.cpp | 566 |
1 files changed, 283 insertions, 283 deletions
diff --git a/library/cpp/http/server/http.cpp b/library/cpp/http/server/http.cpp index 128583bdd7..581fc77399 100644 --- a/library/cpp/http/server/http.cpp +++ b/library/cpp/http/server/http.cpp @@ -1,12 +1,12 @@ #include "http.h" #include "http_ex.h" - + #include <library/cpp/threading/equeue/equeue.h> #include <util/generic/buffer.h> #include <util/generic/cast.h> -#include <util/generic/intrlist.h> -#include <util/generic/yexception.h> +#include <util/generic/intrlist.h> +#include <util/generic/yexception.h> #include <util/network/address.h> #include <util/network/socket.h> #include <util/network/poller.h> @@ -18,7 +18,7 @@ #include <util/system/pipe.h> #include <util/system/thread.h> #include <util/thread/factory.h> - + #include <cerrno> #include <cstring> #include <ctime> @@ -28,39 +28,39 @@ using namespace NAddr; -namespace { - class IPollAble { - public: +namespace { + class IPollAble { + public: inline IPollAble() noexcept { - } - - virtual ~IPollAble() { - } - + } + + virtual ~IPollAble() { + } + virtual void OnPollEvent(TInstant now) = 0; - }; + }; + + struct TShouldStop { + }; - struct TShouldStop { - }; - - struct TWakeupPollAble: public IPollAble { + struct TWakeupPollAble: public IPollAble { void OnPollEvent(TInstant) override { - throw TShouldStop(); - } - }; -} - -class TClientConnection: public IPollAble, public TIntrusiveListItem<TClientConnection> { + throw TShouldStop(); + } + }; +} + +class TClientConnection: public IPollAble, public TIntrusiveListItem<TClientConnection> { public: TClientConnection(const TSocket& s, THttpServer::TImpl* serv, NAddr::IRemoteAddrRef listenerSockAddrRef); ~TClientConnection() override; - + void OnPollEvent(TInstant now) override; - + inline void Activate(TInstant now) noexcept; inline void DeActivate(); inline void Reject(); - + public: TSocket Socket_; NAddr::IRemoteAddrRef ListenerSockAddrRef_; @@ -69,28 +69,28 @@ public: TInstant LastUsed; TInstant AcceptMoment; size_t ReceivedRequests = 0; -}; - -class THttpServer::TImpl { +}; + +class THttpServer::TImpl { public: class TConnections { - public: + public: inline TConnections(TSocketPoller* poller, const THttpServerOptions& options) : Poller_(poller) , Options(options) { } - + inline ~TConnections() { } - + inline void Add(TClientConnection* c) noexcept { TGuard<TMutex> g(Mutex_); - + Conns_.PushBack(c); Poller_->WaitRead(c->Socket_, (void*)static_cast<const IPollAble*>(c)); } - + inline void Erase(TClientConnection* c, TInstant now) noexcept { TGuard<TMutex> g(Mutex_); EraseUnsafe(c); @@ -98,18 +98,18 @@ public: TryRemovingUnsafe(now - Options.ExpirationTimeout); } } - + inline void Clear() noexcept { TGuard<TMutex> g(Mutex_); - + Conns_.Clear(); } - + inline bool RemoveOld(TInstant border) noexcept { TGuard<TMutex> g(Mutex_); return TryRemovingUnsafe(border); } - + bool TryRemovingUnsafe(TInstant border) noexcept { if (Conns_.Empty()) { return false; @@ -122,7 +122,7 @@ public: delete c; return true; } - + void EraseUnsafe(TClientConnection* c) noexcept { Poller_->Unwait(c->Socket_); c->Unlink(); @@ -134,7 +134,7 @@ public: TSocketPoller* Poller_ = nullptr; const THttpServerOptions& Options; }; - + static void* ListenSocketFunction(void* param) { try { ((TImpl*)param)->ListenSocket(); @@ -144,10 +144,10 @@ public: return nullptr; } - + TAutoPtr<TClientRequest> CreateRequest(TAutoPtr<TClientConnection> c) { THolder<TClientRequest> obj(Cb_->CreateClient()); - + obj->Conn_.Reset(c.Release()); return obj; @@ -161,13 +161,13 @@ public: (new TClientConnection(s, this, listenerSockAddrRef))->Reject(); return; } - } - + } + auto connection = new TClientConnection(s, this, listenerSockAddrRef); connection->LastUsed = now; connection->DeActivate(); } - + void SaveErrorCode() { ErrorCode = WSAGetLastError(); } @@ -175,11 +175,11 @@ public: int GetErrorCode() const { return ErrorCode; } - + const char* GetError() const { return LastSystemErrorText(ErrorCode); } - + bool Start() { Poller.Reset(new TSocketPoller()); Connections.Reset(new TConnections(Poller.Get(), Options_)); @@ -202,14 +202,14 @@ public: } catch (const yexception&) { SaveErrorCode(); return false; - } - + } + // Wait until the thread has completely started and return the success indicator ListenStartEvent.Wait(); - + return ListenerRunningOK; } - + void Wait() { Cb_->OnWait(); TGuard<TMutex> g(StopMutex); @@ -227,46 +227,46 @@ public: ListenThread->Join(); ListenThread.Reset(nullptr); } - + while (ConnectionCount) { usleep(10000); Connections->Clear(); - } - + } + Connections.Destroy(); Poller.Destroy(); } - + void Shutdown() { ListenWakeupWriteFd.Write("", 1); // ignore result } - + void AddRequest(TAutoPtr<TClientRequest> req, bool fail) { struct TFailRequest: public THttpClientRequestEx { - inline TFailRequest(TAutoPtr<TClientRequest> parent) { - Conn_.Reset(parent->Conn_.Release()); - HttpConn_.Reset(parent->HttpConn_.Release()); - } - + inline TFailRequest(TAutoPtr<TClientRequest> parent) { + Conn_.Reset(parent->Conn_.Release()); + HttpConn_.Reset(parent->HttpConn_.Release()); + } + bool Reply(void*) override { if (!ProcessHeaders()) { return true; } - ProcessFailRequest(0); - return true; - } + ProcessFailRequest(0); + return true; + } }; - + if (!fail && Requests->Add(req.Get())) { Y_UNUSED(req.Release()); } else { req = new TFailRequest(req); - + if (FailRequests->Add(req.Get())) { Y_UNUSED(req.Release()); - } else { + } else { Cb_->OnFailRequest(-1); } } @@ -288,44 +288,44 @@ public: return *FailRequests; } - class TListenSocket: public IPollAble, public TIntrusiveListItem<TListenSocket> { - public: - inline TListenSocket(const TSocket& s, TImpl* parent) - : S_(s) - , Server_(parent) + class TListenSocket: public IPollAble, public TIntrusiveListItem<TListenSocket> { + public: + inline TListenSocket(const TSocket& s, TImpl* parent) + : S_(s) + , Server_(parent) , SockAddrRef_(GetSockAddr(S_)) - { - } - + { + } + ~TListenSocket() override { - } - + } + void OnPollEvent(TInstant) override { SOCKET s = ::accept(S_, nullptr, nullptr); - - if (s == INVALID_SOCKET) { - ythrow yexception() << "accept: " << LastSystemErrorText(); + + if (s == INVALID_SOCKET) { + ythrow yexception() << "accept: " << LastSystemErrorText(); } Server_->AddRequestFromSocket(s, TInstant::Now(), SockAddrRef_); - } + } SOCKET GetSocket() const noexcept { - return S_; - } - - private: - TSocket S_; + return S_; + } + + private: + TSocket S_; TImpl* Server_ = nullptr; NAddr::IRemoteAddrRef SockAddrRef_; }; - + void ListenSocket() { TThread::SetCurrentThreadName(Options_.ListenThreadName.c_str()); ErrorCode = 0; TIntrusiveListWithAutoDelete<TListenSocket, TDelete> Reqs; - + std::function<void(TSocket)> callback = [&](TSocket socket) { THolder<TListenSocket> ls(new TListenSocket(socket, this)); Poller->WaitRead(socket, static_cast<IPollAble*>(ls.Get())); @@ -335,7 +335,7 @@ public: if (!addressesBound) { SaveErrorCode(); ListenStartEvent.Signal(); - + return; } @@ -347,18 +347,18 @@ public: TVector<void*> events; events.resize(1); - + TInstant now = TInstant::Now(); - for (;;) { + for (;;) { try { const TInstant deadline = Options_.PollTimeout == TDuration::Zero() ? TInstant::Max() : now + Options_.PollTimeout; const size_t ret = Poller->WaitD(events.data(), events.size(), deadline); - + now = TInstant::Now(); for (size_t i = 0; i < ret; ++i) { ((IPollAble*)events[i])->OnPollEvent(now); } - + if (ret == 0 && Options_.ExpirationTimeout > TDuration::Zero()) { Connections->RemoveOld(now - Options_.ExpirationTimeout); } @@ -370,26 +370,26 @@ public: if (!Options_.MaxConnections && Options_.ExpirationTimeout == TDuration::Zero()) { if (ret >= events.size()) { events.resize(ret * 2); - } - } - } catch (const TShouldStop&) { - break; + } + } + } catch (const TShouldStop&) { + break; } catch (...) { Cb_->OnException(); - } - } + } + } while (!Reqs.Empty()) { THolder<TListenSocket> ls(Reqs.PopFront()); - + Poller->Unwait(ls->GetSocket()); - } - + } + Requests->Stop(); FailRequests->Stop(); Cb_->OnListenStop(); } - + void RestartRequestThreads(ui32 nTh, ui32 maxQS) { Requests->Stop(); Options_.nThreads = nTh; @@ -408,24 +408,24 @@ public: TImpl(THttpServer* parent, ICallBack* cb, const TOptions& options, IThreadFactory* factory) : TImpl( - parent, - cb, + parent, + cb, MakeThreadPool<TSimpleThreadPool>(factory, options.UseElasticQueues, cb, options.RequestsThreadName), MakeThreadPool<TThreadPool>(factory, options.UseElasticQueues, nullptr, options.FailRequestsThreadName), - options) { + options) { } ~TImpl() { try { Stop(); } catch (...) { - } + } } - + inline const TOptions& Options() const noexcept { return Options_; } - + inline void DecreaseConnections() noexcept { AtomicDecrement(ConnectionCount); } @@ -439,7 +439,7 @@ public: } inline bool MaxRequestsReached() const { - return Options_.MaxConnections && ((size_t)GetClientCount() >= Options_.MaxConnections); + return Options_.MaxConnections && ((size_t)GetClientCount() >= Options_.MaxConnections); } THolder<TThread> ListenThread; @@ -480,57 +480,57 @@ private: return pool; } -}; +}; THttpServer::THttpServer(ICallBack* cb, const TOptions& options, IThreadFactory* pool) : Impl_(new TImpl(this, cb, options, pool)) -{ -} - +{ +} + THttpServer::THttpServer(ICallBack* cb, TMtpQueueRef mainWorkers, TMtpQueueRef failWorkers, const TOptions& options) : Impl_(new TImpl(this, cb, mainWorkers, failWorkers, options)) { } -THttpServer::~THttpServer() { -} - +THttpServer::~THttpServer() { +} + i64 THttpServer::GetClientCount() const { return Impl_->GetClientCount(); } -bool THttpServer::Start() { - return Impl_->Start(); -} - -void THttpServer::Stop() { - Impl_->Stop(); -} - -void THttpServer::Shutdown() { - Impl_->Shutdown(); -} - -void THttpServer::Wait() { - Impl_->Wait(); -} - -int THttpServer::GetErrorCode() { - return Impl_->GetErrorCode(); -} - -const char* THttpServer::GetError() { - return Impl_->GetError(); -} - -void THttpServer::RestartRequestThreads(ui32 n, ui32 queue) { - Impl_->RestartRequestThreads(n, queue); -} - +bool THttpServer::Start() { + return Impl_->Start(); +} + +void THttpServer::Stop() { + Impl_->Stop(); +} + +void THttpServer::Shutdown() { + Impl_->Shutdown(); +} + +void THttpServer::Wait() { + Impl_->Wait(); +} + +int THttpServer::GetErrorCode() { + return Impl_->GetErrorCode(); +} + +const char* THttpServer::GetError() { + return Impl_->GetError(); +} + +void THttpServer::RestartRequestThreads(ui32 n, ui32 queue) { + Impl_->RestartRequestThreads(n, queue); +} + const THttpServer::TOptions& THttpServer::Options() const noexcept { - return Impl_->Options(); -} - + return Impl_->Options(); +} + size_t THttpServer::GetRequestQueueSize() const { return Impl_->GetRequestQueueSize(); } @@ -552,45 +552,45 @@ bool THttpServer::MaxRequestsReached() const { } TClientConnection::TClientConnection(const TSocket& s, THttpServer::TImpl* serv, NAddr::IRemoteAddrRef listenerSockAddrRef) - : Socket_(s) + : Socket_(s) , ListenerSockAddrRef_(listenerSockAddrRef) - , HttpServ_(serv) -{ - SetNoDelay(Socket_, true); - + , HttpServ_(serv) +{ + SetNoDelay(Socket_, true); + const TDuration& clientTimeout = HttpServ_->Options().ClientTimeout; if (clientTimeout != TDuration::Zero()) { SetSocketTimeout(Socket_, (long)clientTimeout.Seconds(), clientTimeout.MilliSecondsOfSecond()); } - HttpServ_->IncreaseConnections(); -} - + HttpServ_->IncreaseConnections(); +} + TClientConnection::~TClientConnection() { - HttpServ_->DecreaseConnections(); -} - + HttpServ_->DecreaseConnections(); +} + void TClientConnection::OnPollEvent(TInstant now) { THolder<TClientConnection> this_(this); Activate(now); - - { - char tmp[1]; - - if (::recv(Socket_, tmp, 1, MSG_PEEK) < 1) { - /* - * We can received a FIN so our socket was moved to - * TCP_CLOSE_WAIT state. Check it before adding work - * for this socket. - */ - - return; - } - } - + + { + char tmp[1]; + + if (::recv(Socket_, tmp, 1, MSG_PEEK) < 1) { + /* + * We can received a FIN so our socket was moved to + * TCP_CLOSE_WAIT state. Check it before adding work + * for this socket. + */ + + return; + } + } + THolder<TClientRequest> obj(HttpServ_->CreateRequest(this_)); AcceptMoment = now; - + HttpServ_->AddRequest(obj, Reject_); } @@ -600,35 +600,35 @@ void TClientConnection::Activate(TInstant now) noexcept { ++ReceivedRequests; } -void TClientConnection::DeActivate() { +void TClientConnection::DeActivate() { HttpServ_->Connections->Add(this); -} - +} + void TClientConnection::Reject() { Reject_ = true; HttpServ_->Connections->Add(this); } -TClientRequest::TClientRequest() { -} - -TClientRequest::~TClientRequest() { -} - -bool TClientRequest::Reply(void* /*ThreadSpecificResource*/) { +TClientRequest::TClientRequest() { +} + +TClientRequest::~TClientRequest() { +} + +bool TClientRequest::Reply(void* /*ThreadSpecificResource*/) { if (strnicmp(RequestString.data(), "GET ", 4)) { - Output() << "HTTP/1.0 501 Not Implemented\r\n\r\n"; - } else { - Output() << "HTTP/1.0 200 OK\r\n" - "Content-Type: text/html\r\n" - "\r\n" - "Hello World!\r\n"; - } - - return true; -} - + Output() << "HTTP/1.0 501 Not Implemented\r\n\r\n"; + } else { + Output() << "HTTP/1.0 200 OK\r\n" + "Content-Type: text/html\r\n" + "\r\n" + "Hello World!\r\n"; + } + + return true; +} + bool TClientRequest::IsLocal() const { return HasLocalAddress(Socket()); } @@ -639,29 +639,29 @@ bool TClientRequest::CheckLoopback() { try { isLocal = IsLocal(); } catch (const yexception& e) { - Output() << "HTTP/1.0 500 Oops\r\n\r\n" - << e.what() << "\r\n"; - return false; + Output() << "HTTP/1.0 500 Oops\r\n\r\n" + << e.what() << "\r\n"; + return false; } if (!isLocal) { - Output() << "HTTP/1.0 403 Permission denied\r\n" - "Content-Type: text/html; charset=windows-1251\r\n" - "Connection: close\r\n" - "\r\n" - "<html><head><title>Permission denied</title></head>" - "<body><h1>Permission denied</h1>" - "<p>This request must be sent from the localhost.</p>" - "</body></html>\r\n"; - - return false; - } - - return true; -} - + Output() << "HTTP/1.0 403 Permission denied\r\n" + "Content-Type: text/html; charset=windows-1251\r\n" + "Connection: close\r\n" + "\r\n" + "<html><head><title>Permission denied</title></head>" + "<body><h1>Permission denied</h1>" + "<p>This request must be sent from the localhost.</p>" + "</body></html>\r\n"; + + return false; + } + + return true; +} + void TClientRequest::ReleaseConnection() { - if (Conn_ && HttpConn_ && HttpServ()->Options().KeepAliveEnabled && HttpConn_->CanBeKeepAlive() && (!HttpServ()->Options().RejectExcessConnections || !HttpServ()->MaxRequestsReached())) { + if (Conn_ && HttpConn_ && HttpServ()->Options().KeepAliveEnabled && HttpConn_->CanBeKeepAlive() && (!HttpServ()->Options().RejectExcessConnections || !HttpServ()->MaxRequestsReached())) { Output().Finish(); Conn_->DeActivate(); Y_UNUSED(Conn_.Release()); @@ -676,101 +676,101 @@ void TClientRequest::ResetConnection() { } } -void TClientRequest::Process(void* ThreadSpecificResource) { +void TClientRequest::Process(void* ThreadSpecificResource) { THolder<TClientRequest> this_(this); - + auto* serverImpl = Conn_->HttpServ_; - try { - if (!HttpConn_) { + try { + if (!HttpConn_) { const size_t outputBufferSize = HttpServ()->Options().OutputBufferSize; if (outputBufferSize) { HttpConn_.Reset(new THttpServerConn(Socket(), outputBufferSize)); } else { HttpConn_.Reset(new THttpServerConn(Socket())); } - + auto maxRequestsPerConnection = HttpServ()->Options().MaxRequestsPerConnection; HttpConn_->Output()->EnableKeepAlive(HttpServ()->Options().KeepAliveEnabled && (!maxRequestsPerConnection || Conn_->ReceivedRequests < maxRequestsPerConnection)); - HttpConn_->Output()->EnableCompression(HttpServ()->Options().CompressionEnabled); - } - + HttpConn_->Output()->EnableCompression(HttpServ()->Options().CompressionEnabled); + } + if (ParsedHeaders.empty()) { RequestString = Input().FirstLine(); - - const THttpHeaders& h = Input().Headers(); + + const THttpHeaders& h = Input().Headers(); ParsedHeaders.reserve(h.Count()); - for (THttpHeaders::TConstIterator it = h.Begin(); it != h.End(); ++it) { + for (THttpHeaders::TConstIterator it = h.Begin(); it != h.End(); ++it) { ParsedHeaders.emplace_back(it->Name(), it->Value()); - } - } - - if (Reply(ThreadSpecificResource)) { + } + } + + if (Reply(ThreadSpecificResource)) { ReleaseConnection(); - - /* - * *this will be destroyed... - */ - - return; - } - } catch (...) { + + /* + * *this will be destroyed... + */ + + return; + } + } catch (...) { serverImpl->Cb_->OnException(); - - throw; - } - + + throw; + } + Y_UNUSED(this_.Release()); -} - -void TClientRequest::ProcessFailRequest(int failstate) { +} + +void TClientRequest::ProcessFailRequest(int failstate) { Output() << "HTTP/1.1 503 Service Unavailable\r\n" "Content-Type: text/plain\r\n" "Content-Length: 21\r\n" "\r\n" "Service Unavailable\r\n"; - + TString url; - + if (!strnicmp(RequestString.data(), "GET ", 4)) { - // Trying to extract url... + // Trying to extract url... const char* str = RequestString.data(); - - // Skipping spaces before url... - size_t start = 3; + + // Skipping spaces before url... + size_t start = 3; while (str[start] == ' ') { - ++start; - } - - if (str[start]) { - // Traversing url... - size_t idx = start; - - while (str[idx] != ' ' && str[idx]) { - ++idx; - } - + ++start; + } + + if (str[start]) { + // Traversing url... + size_t idx = start; + + while (str[idx] != ' ' && str[idx]) { + ++idx; + } + url = RequestString.substr(start, idx - start); - } - } - - const THttpServer::ICallBack::TFailLogData d = { - failstate, - url, - }; - - // Handling failure... - Conn_->HttpServ_->Cb_->OnFailRequestEx(d); - Output().Flush(); -} - + } + } + + const THttpServer::ICallBack::TFailLogData d = { + failstate, + url, + }; + + // Handling failure... + Conn_->HttpServ_->Cb_->OnFailRequestEx(d); + Output().Flush(); +} + THttpServer* TClientRequest::HttpServ() const noexcept { - return Conn_->HttpServ_->Parent_; -} - + return Conn_->HttpServ_->Parent_; +} + const TSocket& TClientRequest::Socket() const noexcept { - return Conn_->Socket_; -} + return Conn_->Socket_; +} NAddr::IRemoteAddrRef TClientRequest::GetListenerSockAddrRef() const noexcept { return Conn_->ListenerSockAddrRef_; @@ -791,8 +791,8 @@ TRequestReplier::~TRequestReplier() { bool TRequestReplier::Reply(void* threadSpecificResource) { const TReplyParams params = { - threadSpecificResource, Input(), Output()}; - + threadSpecificResource, Input(), Output()}; + return DoReply(params); } @@ -833,7 +833,7 @@ bool TryToBindAddresses(const THttpServerOptions& options, const std::function<v return false; } - if (callbackOnBoundAddress != nullptr) { + if (callbackOnBoundAddress != nullptr) { (*callbackOnBoundAddress)(socket); } } |