diff options
author | somov <somov@yandex-team.ru> | 2022-02-10 16:45:47 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:47 +0300 |
commit | a5950576e397b1909261050b8c7da16db58f10b1 (patch) | |
tree | 7ba7677f6a4c3e19e2cefab34d16df2c8963b4d4 /library/cpp/http/server/http.cpp | |
parent | 81eddc8c0b55990194e112b02d127b87d54164a9 (diff) | |
download | ydb-a5950576e397b1909261050b8c7da16db58f10b1.tar.gz |
Restoring authorship annotation for <somov@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/http/server/http.cpp')
-rw-r--r-- | library/cpp/http/server/http.cpp | 292 |
1 files changed, 146 insertions, 146 deletions
diff --git a/library/cpp/http/server/http.cpp b/library/cpp/http/server/http.cpp index 128583bdd70..fbd127a6520 100644 --- a/library/cpp/http/server/http.cpp +++ b/library/cpp/http/server/http.cpp @@ -1,4 +1,4 @@ -#include "http.h" +#include "http.h" #include "http_ex.h" #include <library/cpp/threading/equeue/equeue.h> @@ -7,25 +7,25 @@ #include <util/generic/cast.h> #include <util/generic/intrlist.h> #include <util/generic/yexception.h> -#include <util/network/address.h> +#include <util/network/address.h> #include <util/network/socket.h> -#include <util/network/poller.h> -#include <util/system/atomic.h> -#include <util/system/compat.h> // stricmp, strnicmp, strlwr, strupr, stpcpy -#include <util/system/defaults.h> -#include <util/system/event.h> -#include <util/system/mutex.h> +#include <util/network/poller.h> +#include <util/system/atomic.h> +#include <util/system/compat.h> // stricmp, strnicmp, strlwr, strupr, stpcpy +#include <util/system/defaults.h> +#include <util/system/event.h> +#include <util/system/mutex.h> #include <util/system/pipe.h> -#include <util/system/thread.h> +#include <util/system/thread.h> #include <util/thread/factory.h> -#include <cerrno> -#include <cstring> -#include <ctime> - -#include <sys/stat.h> -#include <sys/types.h> +#include <cerrno> +#include <cstring> +#include <ctime> +#include <sys/stat.h> +#include <sys/types.h> + using namespace NAddr; namespace { @@ -51,18 +51,18 @@ namespace { } class TClientConnection: public IPollAble, public TIntrusiveListItem<TClientConnection> { -public: +public: TClientConnection(const TSocket& s, THttpServer::TImpl* serv, NAddr::IRemoteAddrRef listenerSockAddrRef); ~TClientConnection() override; void OnPollEvent(TInstant now) override; inline void Activate(TInstant now) noexcept; - inline void DeActivate(); + inline void DeActivate(); inline void Reject(); -public: - TSocket Socket_; +public: + TSocket Socket_; NAddr::IRemoteAddrRef ListenerSockAddrRef_; THttpServer::TImpl* HttpServ_ = nullptr; bool Reject_ = false; @@ -72,70 +72,70 @@ public: }; class THttpServer::TImpl { -public: - class TConnections { +public: + class TConnections { public: inline TConnections(TSocketPoller* poller, const THttpServerOptions& options) - : Poller_(poller) + : Poller_(poller) , Options(options) - { - } + { + } inline ~TConnections() { - } + } inline void Add(TClientConnection* c) noexcept { - TGuard<TMutex> g(Mutex_); + TGuard<TMutex> g(Mutex_); - Conns_.PushBack(c); - Poller_->WaitRead(c->Socket_, (void*)static_cast<const IPollAble*>(c)); - } + Conns_.PushBack(c); + Poller_->WaitRead(c->Socket_, (void*)static_cast<const IPollAble*>(c)); + } inline void Erase(TClientConnection* c, TInstant now) noexcept { - TGuard<TMutex> g(Mutex_); + TGuard<TMutex> g(Mutex_); EraseUnsafe(c); if (Options.ExpirationTimeout > TDuration::Zero()) { TryRemovingUnsafe(now - Options.ExpirationTimeout); } - } + } inline void Clear() noexcept { - TGuard<TMutex> g(Mutex_); + TGuard<TMutex> g(Mutex_); - Conns_.Clear(); - } + Conns_.Clear(); + } inline bool RemoveOld(TInstant border) noexcept { - TGuard<TMutex> g(Mutex_); + TGuard<TMutex> g(Mutex_); return TryRemovingUnsafe(border); } bool TryRemovingUnsafe(TInstant border) noexcept { - if (Conns_.Empty()) { + if (Conns_.Empty()) { return false; - } - TClientConnection* c = &*(Conns_.Begin()); + } + TClientConnection* c = &*(Conns_.Begin()); if (c->LastUsed > border) { return false; } EraseUnsafe(c); - delete c; + delete c; return true; - } + } void EraseUnsafe(TClientConnection* c) noexcept { Poller_->Unwait(c->Socket_); c->Unlink(); } - public: - TMutex Mutex_; - TIntrusiveListWithAutoDelete<TClientConnection, TDelete> Conns_; + public: + TMutex Mutex_; + TIntrusiveListWithAutoDelete<TClientConnection, TDelete> Conns_; TSocketPoller* Poller_ = nullptr; const THttpServerOptions& Options; - }; + }; - static void* ListenSocketFunction(void* param) { + static void* ListenSocketFunction(void* param) { try { ((TImpl*)param)->ListenSocket(); } catch (...) { @@ -143,19 +143,19 @@ public: } return nullptr; - } + } - TAutoPtr<TClientRequest> CreateRequest(TAutoPtr<TClientConnection> c) { - THolder<TClientRequest> obj(Cb_->CreateClient()); + TAutoPtr<TClientRequest> CreateRequest(TAutoPtr<TClientConnection> c) { + THolder<TClientRequest> obj(Cb_->CreateClient()); - obj->Conn_.Reset(c.Release()); - - return obj; - } + obj->Conn_.Reset(c.Release()); + return obj; + } + void AddRequestFromSocket(const TSocket& s, TInstant now, NAddr::IRemoteAddrRef listenerSockAddrRef) { if (MaxRequestsReached()) { - Cb_->OnMaxConn(); + Cb_->OnMaxConn(); bool wasRemoved = Connections->RemoveOld(TInstant::Max()); if (!wasRemoved && Options_.RejectExcessConnections) { (new TClientConnection(s, this, listenerSockAddrRef))->Reject(); @@ -166,27 +166,27 @@ public: auto connection = new TClientConnection(s, this, listenerSockAddrRef); connection->LastUsed = now; connection->DeActivate(); - } + } - void SaveErrorCode() { + void SaveErrorCode() { ErrorCode = WSAGetLastError(); - } + } int GetErrorCode() const { return ErrorCode; - } + } const char* GetError() const { return LastSystemErrorText(ErrorCode); - } + } - bool Start() { - Poller.Reset(new TSocketPoller()); + bool Start() { + Poller.Reset(new TSocketPoller()); Connections.Reset(new TConnections(Poller.Get(), Options_)); - + // Start the listener thread - ListenerRunningOK = false; - + ListenerRunningOK = false; + // throws on error TPipeHandle::Pipe(ListenWakeupReadFd, ListenWakeupWriteFd); @@ -195,31 +195,31 @@ public: Poller->WaitRead(ListenWakeupReadFd, &WakeupPollAble); - ListenStartEvent.Reset(); - try { + ListenStartEvent.Reset(); + try { ListenThread.Reset(new TThread(ListenSocketFunction, this)); ListenThread->Start(); - } catch (const yexception&) { - SaveErrorCode(); - return false; + } catch (const yexception&) { + SaveErrorCode(); + return false; } // Wait until the thread has completely started and return the success indicator - ListenStartEvent.Wait(); + ListenStartEvent.Wait(); - return ListenerRunningOK; - } + return ListenerRunningOK; + } - void Wait() { - Cb_->OnWait(); + void Wait() { + Cb_->OnWait(); TGuard<TMutex> g(StopMutex); if (ListenThread) { ListenThread->Join(); ListenThread.Reset(nullptr); } - } + } - void Stop() { + void Stop() { Shutdown(); TGuard<TMutex> g(StopMutex); @@ -228,19 +228,19 @@ public: ListenThread.Reset(nullptr); } - while (ConnectionCount) { - usleep(10000); - Connections->Clear(); + while (ConnectionCount) { + usleep(10000); + Connections->Clear(); } - Connections.Destroy(); - Poller.Destroy(); - } + Connections.Destroy(); + Poller.Destroy(); + } - void Shutdown() { + void Shutdown() { ListenWakeupWriteFd.Write("", 1); // ignore result - } + } void AddRequest(TAutoPtr<TClientRequest> req, bool fail) { struct TFailRequest: public THttpClientRequestEx { @@ -257,20 +257,20 @@ public: ProcessFailRequest(0); return true; } - }; + }; if (!fail && Requests->Add(req.Get())) { Y_UNUSED(req.Release()); - } else { - req = new TFailRequest(req); + } else { + req = new TFailRequest(req); - if (FailRequests->Add(req.Get())) { + if (FailRequests->Add(req.Get())) { Y_UNUSED(req.Release()); } else { - Cb_->OnFailRequest(-1); + Cb_->OnFailRequest(-1); } } - } + } size_t GetRequestQueueSize() const { return Requests->Size(); @@ -305,8 +305,8 @@ public: if (s == INVALID_SOCKET) { ythrow yexception() << "accept: " << LastSystemErrorText(); - } - + } + Server_->AddRequestFromSocket(s, TInstant::Now(), SockAddrRef_); } @@ -318,13 +318,13 @@ public: TSocket S_; TImpl* Server_ = nullptr; NAddr::IRemoteAddrRef SockAddrRef_; - }; + }; - void ListenSocket() { + void ListenSocket() { TThread::SetCurrentThreadName(Options_.ListenThreadName.c_str()); ErrorCode = 0; - TIntrusiveListWithAutoDelete<TListenSocket, TDelete> Reqs; + TIntrusiveListWithAutoDelete<TListenSocket, TDelete> Reqs; std::function<void(TSocket)> callback = [&](TSocket socket) { THolder<TListenSocket> ls(new TListenSocket(socket, this)); @@ -337,65 +337,65 @@ public: ListenStartEvent.Signal(); return; - } + } - Requests->Start(Options_.nThreads, Options_.MaxQueueSize); - FailRequests->Start(Options_.nFThreads, Options_.MaxFQueueSize); - Cb_->OnListenStart(); - ListenerRunningOK = true; - ListenStartEvent.Signal(); + Requests->Start(Options_.nThreads, Options_.MaxQueueSize); + FailRequests->Start(Options_.nFThreads, Options_.MaxFQueueSize); + Cb_->OnListenStart(); + ListenerRunningOK = true; + ListenStartEvent.Signal(); TVector<void*> events; - events.resize(1); + events.resize(1); TInstant now = TInstant::Now(); for (;;) { - try { + try { const TInstant deadline = Options_.PollTimeout == TDuration::Zero() ? TInstant::Max() : now + Options_.PollTimeout; const size_t ret = Poller->WaitD(events.data(), events.size(), deadline); now = TInstant::Now(); - for (size_t i = 0; i < ret; ++i) { + for (size_t i = 0; i < ret; ++i) { ((IPollAble*)events[i])->OnPollEvent(now); - } + } if (ret == 0 && Options_.ExpirationTimeout > TDuration::Zero()) { Connections->RemoveOld(now - Options_.ExpirationTimeout); } // When MaxConnections is limited or ExpirationTimeout is set, OnPollEvent can call - // RemoveOld and destroy other IPollAble* objects in the - // poller. Thus in this case we can safely process only one - // event from the poller at a time. + // RemoveOld and destroy other IPollAble* objects in the + // poller. Thus in this case we can safely process only one + // event from the poller at a time. if (!Options_.MaxConnections && Options_.ExpirationTimeout == TDuration::Zero()) { if (ret >= events.size()) { - events.resize(ret * 2); + events.resize(ret * 2); } } } catch (const TShouldStop&) { break; - } catch (...) { - Cb_->OnException(); + } catch (...) { + Cb_->OnException(); } } - while (!Reqs.Empty()) { - THolder<TListenSocket> ls(Reqs.PopFront()); + while (!Reqs.Empty()) { + THolder<TListenSocket> ls(Reqs.PopFront()); - Poller->Unwait(ls->GetSocket()); + Poller->Unwait(ls->GetSocket()); } - Requests->Stop(); - FailRequests->Stop(); - Cb_->OnListenStop(); - } + Requests->Stop(); + FailRequests->Stop(); + Cb_->OnListenStop(); + } - void RestartRequestThreads(ui32 nTh, ui32 maxQS) { - Requests->Stop(); - Options_.nThreads = nTh; - Options_.MaxQueueSize = maxQS; - Requests->Start(Options_.nThreads, Options_.MaxQueueSize); - } + void RestartRequestThreads(ui32 nTh, ui32 maxQS) { + Requests->Stop(); + Options_.nThreads = nTh; + Options_.MaxQueueSize = maxQS; + Requests->Start(Options_.nThreads, Options_.MaxQueueSize); + } TImpl(THttpServer* parent, ICallBack* cb, TMtpQueueRef mainWorkers, TMtpQueueRef failWorkers, const TOptions& options_) : Requests(mainWorkers) @@ -415,29 +415,29 @@ public: options) { } - ~TImpl() { - try { - Stop(); - } catch (...) { + ~TImpl() { + try { + Stop(); + } catch (...) { } - } + } inline const TOptions& Options() const noexcept { - return Options_; - } + return Options_; + } inline void DecreaseConnections() noexcept { - AtomicDecrement(ConnectionCount); - } + AtomicDecrement(ConnectionCount); + } inline void IncreaseConnections() noexcept { - AtomicIncrement(ConnectionCount); - } - - inline i64 GetClientCount() const { - return AtomicGet(ConnectionCount); - } - + AtomicIncrement(ConnectionCount); + } + + inline i64 GetClientCount() const { + return AtomicGet(ConnectionCount); + } + inline bool MaxRequestsReached() const { return Options_.MaxConnections && ((size_t)GetClientCount() >= Options_.MaxConnections); } @@ -449,11 +449,11 @@ public: TMtpQueueRef Requests; TMtpQueueRef FailRequests; TAtomic ConnectionCount = 0; - THolder<TSocketPoller> Poller; - THolder<TConnections> Connections; + THolder<TSocketPoller> Poller; + THolder<TConnections> Connections; bool ListenerRunningOK = false; int ErrorCode = 0; - TOptions Options_; + TOptions Options_; ICallBack* Cb_ = nullptr; THttpServer* Parent_ = nullptr; TWakeupPollAble WakeupPollAble; @@ -571,7 +571,7 @@ TClientConnection::~TClientConnection() { } void TClientConnection::OnPollEvent(TInstant now) { - THolder<TClientConnection> this_(this); + THolder<TClientConnection> this_(this); Activate(now); { @@ -588,7 +588,7 @@ void TClientConnection::OnPollEvent(TInstant now) { } } - THolder<TClientRequest> obj(HttpServ_->CreateRequest(this_)); + THolder<TClientRequest> obj(HttpServ_->CreateRequest(this_)); AcceptMoment = now; HttpServ_->AddRequest(obj, Reject_); @@ -601,7 +601,7 @@ void TClientConnection::Activate(TInstant now) noexcept { } void TClientConnection::DeActivate() { - HttpServ_->Connections->Add(this); + HttpServ_->Connections->Add(this); } void TClientConnection::Reject() { @@ -677,7 +677,7 @@ void TClientRequest::ResetConnection() { } void TClientRequest::Process(void* ThreadSpecificResource) { - THolder<TClientRequest> this_(this); + THolder<TClientRequest> this_(this); auto* serverImpl = Conn_->HttpServ_; |