diff options
author | somov <somov@yandex-team.ru> | 2022-02-10 16:45:47 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:47 +0300 |
commit | a5950576e397b1909261050b8c7da16db58f10b1 (patch) | |
tree | 7ba7677f6a4c3e19e2cefab34d16df2c8963b4d4 /library/cpp/http/server | |
parent | 81eddc8c0b55990194e112b02d127b87d54164a9 (diff) | |
download | ydb-a5950576e397b1909261050b8c7da16db58f10b1.tar.gz |
Restoring authorship annotation for <somov@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/http/server')
-rw-r--r-- | library/cpp/http/server/http.cpp | 292 | ||||
-rw-r--r-- | library/cpp/http/server/http.h | 126 | ||||
-rw-r--r-- | library/cpp/http/server/http_ut.cpp | 2 | ||||
-rw-r--r-- | library/cpp/http/server/ut/ya.make | 2 |
4 files changed, 211 insertions, 211 deletions
diff --git a/library/cpp/http/server/http.cpp b/library/cpp/http/server/http.cpp index 128583bdd7..fbd127a652 100644 --- a/library/cpp/http/server/http.cpp +++ b/library/cpp/http/server/http.cpp @@ -1,4 +1,4 @@ -#include "http.h" +#include "http.h" #include "http_ex.h" #include <library/cpp/threading/equeue/equeue.h> @@ -7,25 +7,25 @@ #include <util/generic/cast.h> #include <util/generic/intrlist.h> #include <util/generic/yexception.h> -#include <util/network/address.h> +#include <util/network/address.h> #include <util/network/socket.h> -#include <util/network/poller.h> -#include <util/system/atomic.h> -#include <util/system/compat.h> // stricmp, strnicmp, strlwr, strupr, stpcpy -#include <util/system/defaults.h> -#include <util/system/event.h> -#include <util/system/mutex.h> +#include <util/network/poller.h> +#include <util/system/atomic.h> +#include <util/system/compat.h> // stricmp, strnicmp, strlwr, strupr, stpcpy +#include <util/system/defaults.h> +#include <util/system/event.h> +#include <util/system/mutex.h> #include <util/system/pipe.h> -#include <util/system/thread.h> +#include <util/system/thread.h> #include <util/thread/factory.h> -#include <cerrno> -#include <cstring> -#include <ctime> - -#include <sys/stat.h> -#include <sys/types.h> +#include <cerrno> +#include <cstring> +#include <ctime> +#include <sys/stat.h> +#include <sys/types.h> + using namespace NAddr; namespace { @@ -51,18 +51,18 @@ namespace { } class TClientConnection: public IPollAble, public TIntrusiveListItem<TClientConnection> { -public: +public: TClientConnection(const TSocket& s, THttpServer::TImpl* serv, NAddr::IRemoteAddrRef listenerSockAddrRef); ~TClientConnection() override; void OnPollEvent(TInstant now) override; inline void Activate(TInstant now) noexcept; - inline void DeActivate(); + inline void DeActivate(); inline void Reject(); -public: - TSocket Socket_; +public: + TSocket Socket_; NAddr::IRemoteAddrRef ListenerSockAddrRef_; THttpServer::TImpl* HttpServ_ = nullptr; bool Reject_ = false; @@ -72,70 +72,70 @@ public: }; class THttpServer::TImpl { -public: - class TConnections { +public: + class TConnections { public: inline TConnections(TSocketPoller* poller, const THttpServerOptions& options) - : Poller_(poller) + : Poller_(poller) , Options(options) - { - } + { + } inline ~TConnections() { - } + } inline void Add(TClientConnection* c) noexcept { - TGuard<TMutex> g(Mutex_); + TGuard<TMutex> g(Mutex_); - Conns_.PushBack(c); - Poller_->WaitRead(c->Socket_, (void*)static_cast<const IPollAble*>(c)); - } + Conns_.PushBack(c); + Poller_->WaitRead(c->Socket_, (void*)static_cast<const IPollAble*>(c)); + } inline void Erase(TClientConnection* c, TInstant now) noexcept { - TGuard<TMutex> g(Mutex_); + TGuard<TMutex> g(Mutex_); EraseUnsafe(c); if (Options.ExpirationTimeout > TDuration::Zero()) { TryRemovingUnsafe(now - Options.ExpirationTimeout); } - } + } inline void Clear() noexcept { - TGuard<TMutex> g(Mutex_); + TGuard<TMutex> g(Mutex_); - Conns_.Clear(); - } + Conns_.Clear(); + } inline bool RemoveOld(TInstant border) noexcept { - TGuard<TMutex> g(Mutex_); + TGuard<TMutex> g(Mutex_); return TryRemovingUnsafe(border); } bool TryRemovingUnsafe(TInstant border) noexcept { - if (Conns_.Empty()) { + if (Conns_.Empty()) { return false; - } - TClientConnection* c = &*(Conns_.Begin()); + } + TClientConnection* c = &*(Conns_.Begin()); if (c->LastUsed > border) { return false; } EraseUnsafe(c); - delete c; + delete c; return true; - } + } void EraseUnsafe(TClientConnection* c) noexcept { Poller_->Unwait(c->Socket_); c->Unlink(); } - public: - TMutex Mutex_; - TIntrusiveListWithAutoDelete<TClientConnection, TDelete> Conns_; + public: + TMutex Mutex_; + TIntrusiveListWithAutoDelete<TClientConnection, TDelete> Conns_; TSocketPoller* Poller_ = nullptr; const THttpServerOptions& Options; - }; + }; - static void* ListenSocketFunction(void* param) { + static void* ListenSocketFunction(void* param) { try { ((TImpl*)param)->ListenSocket(); } catch (...) { @@ -143,19 +143,19 @@ public: } return nullptr; - } + } - TAutoPtr<TClientRequest> CreateRequest(TAutoPtr<TClientConnection> c) { - THolder<TClientRequest> obj(Cb_->CreateClient()); + TAutoPtr<TClientRequest> CreateRequest(TAutoPtr<TClientConnection> c) { + THolder<TClientRequest> obj(Cb_->CreateClient()); - obj->Conn_.Reset(c.Release()); - - return obj; - } + obj->Conn_.Reset(c.Release()); + return obj; + } + void AddRequestFromSocket(const TSocket& s, TInstant now, NAddr::IRemoteAddrRef listenerSockAddrRef) { if (MaxRequestsReached()) { - Cb_->OnMaxConn(); + Cb_->OnMaxConn(); bool wasRemoved = Connections->RemoveOld(TInstant::Max()); if (!wasRemoved && Options_.RejectExcessConnections) { (new TClientConnection(s, this, listenerSockAddrRef))->Reject(); @@ -166,27 +166,27 @@ public: auto connection = new TClientConnection(s, this, listenerSockAddrRef); connection->LastUsed = now; connection->DeActivate(); - } + } - void SaveErrorCode() { + void SaveErrorCode() { ErrorCode = WSAGetLastError(); - } + } int GetErrorCode() const { return ErrorCode; - } + } const char* GetError() const { return LastSystemErrorText(ErrorCode); - } + } - bool Start() { - Poller.Reset(new TSocketPoller()); + bool Start() { + Poller.Reset(new TSocketPoller()); Connections.Reset(new TConnections(Poller.Get(), Options_)); - + // Start the listener thread - ListenerRunningOK = false; - + ListenerRunningOK = false; + // throws on error TPipeHandle::Pipe(ListenWakeupReadFd, ListenWakeupWriteFd); @@ -195,31 +195,31 @@ public: Poller->WaitRead(ListenWakeupReadFd, &WakeupPollAble); - ListenStartEvent.Reset(); - try { + ListenStartEvent.Reset(); + try { ListenThread.Reset(new TThread(ListenSocketFunction, this)); ListenThread->Start(); - } catch (const yexception&) { - SaveErrorCode(); - return false; + } catch (const yexception&) { + SaveErrorCode(); + return false; } // Wait until the thread has completely started and return the success indicator - ListenStartEvent.Wait(); + ListenStartEvent.Wait(); - return ListenerRunningOK; - } + return ListenerRunningOK; + } - void Wait() { - Cb_->OnWait(); + void Wait() { + Cb_->OnWait(); TGuard<TMutex> g(StopMutex); if (ListenThread) { ListenThread->Join(); ListenThread.Reset(nullptr); } - } + } - void Stop() { + void Stop() { Shutdown(); TGuard<TMutex> g(StopMutex); @@ -228,19 +228,19 @@ public: ListenThread.Reset(nullptr); } - while (ConnectionCount) { - usleep(10000); - Connections->Clear(); + while (ConnectionCount) { + usleep(10000); + Connections->Clear(); } - Connections.Destroy(); - Poller.Destroy(); - } + Connections.Destroy(); + Poller.Destroy(); + } - void Shutdown() { + void Shutdown() { ListenWakeupWriteFd.Write("", 1); // ignore result - } + } void AddRequest(TAutoPtr<TClientRequest> req, bool fail) { struct TFailRequest: public THttpClientRequestEx { @@ -257,20 +257,20 @@ public: ProcessFailRequest(0); return true; } - }; + }; if (!fail && Requests->Add(req.Get())) { Y_UNUSED(req.Release()); - } else { - req = new TFailRequest(req); + } else { + req = new TFailRequest(req); - if (FailRequests->Add(req.Get())) { + if (FailRequests->Add(req.Get())) { Y_UNUSED(req.Release()); } else { - Cb_->OnFailRequest(-1); + Cb_->OnFailRequest(-1); } } - } + } size_t GetRequestQueueSize() const { return Requests->Size(); @@ -305,8 +305,8 @@ public: if (s == INVALID_SOCKET) { ythrow yexception() << "accept: " << LastSystemErrorText(); - } - + } + Server_->AddRequestFromSocket(s, TInstant::Now(), SockAddrRef_); } @@ -318,13 +318,13 @@ public: TSocket S_; TImpl* Server_ = nullptr; NAddr::IRemoteAddrRef SockAddrRef_; - }; + }; - void ListenSocket() { + void ListenSocket() { TThread::SetCurrentThreadName(Options_.ListenThreadName.c_str()); ErrorCode = 0; - TIntrusiveListWithAutoDelete<TListenSocket, TDelete> Reqs; + TIntrusiveListWithAutoDelete<TListenSocket, TDelete> Reqs; std::function<void(TSocket)> callback = [&](TSocket socket) { THolder<TListenSocket> ls(new TListenSocket(socket, this)); @@ -337,65 +337,65 @@ public: ListenStartEvent.Signal(); return; - } + } - Requests->Start(Options_.nThreads, Options_.MaxQueueSize); - FailRequests->Start(Options_.nFThreads, Options_.MaxFQueueSize); - Cb_->OnListenStart(); - ListenerRunningOK = true; - ListenStartEvent.Signal(); + Requests->Start(Options_.nThreads, Options_.MaxQueueSize); + FailRequests->Start(Options_.nFThreads, Options_.MaxFQueueSize); + Cb_->OnListenStart(); + ListenerRunningOK = true; + ListenStartEvent.Signal(); TVector<void*> events; - events.resize(1); + events.resize(1); TInstant now = TInstant::Now(); for (;;) { - try { + try { const TInstant deadline = Options_.PollTimeout == TDuration::Zero() ? TInstant::Max() : now + Options_.PollTimeout; const size_t ret = Poller->WaitD(events.data(), events.size(), deadline); now = TInstant::Now(); - for (size_t i = 0; i < ret; ++i) { + for (size_t i = 0; i < ret; ++i) { ((IPollAble*)events[i])->OnPollEvent(now); - } + } if (ret == 0 && Options_.ExpirationTimeout > TDuration::Zero()) { Connections->RemoveOld(now - Options_.ExpirationTimeout); } // When MaxConnections is limited or ExpirationTimeout is set, OnPollEvent can call - // RemoveOld and destroy other IPollAble* objects in the - // poller. Thus in this case we can safely process only one - // event from the poller at a time. + // RemoveOld and destroy other IPollAble* objects in the + // poller. Thus in this case we can safely process only one + // event from the poller at a time. if (!Options_.MaxConnections && Options_.ExpirationTimeout == TDuration::Zero()) { if (ret >= events.size()) { - events.resize(ret * 2); + events.resize(ret * 2); } } } catch (const TShouldStop&) { break; - } catch (...) { - Cb_->OnException(); + } catch (...) { + Cb_->OnException(); } } - while (!Reqs.Empty()) { - THolder<TListenSocket> ls(Reqs.PopFront()); + while (!Reqs.Empty()) { + THolder<TListenSocket> ls(Reqs.PopFront()); - Poller->Unwait(ls->GetSocket()); + Poller->Unwait(ls->GetSocket()); } - Requests->Stop(); - FailRequests->Stop(); - Cb_->OnListenStop(); - } + Requests->Stop(); + FailRequests->Stop(); + Cb_->OnListenStop(); + } - void RestartRequestThreads(ui32 nTh, ui32 maxQS) { - Requests->Stop(); - Options_.nThreads = nTh; - Options_.MaxQueueSize = maxQS; - Requests->Start(Options_.nThreads, Options_.MaxQueueSize); - } + void RestartRequestThreads(ui32 nTh, ui32 maxQS) { + Requests->Stop(); + Options_.nThreads = nTh; + Options_.MaxQueueSize = maxQS; + Requests->Start(Options_.nThreads, Options_.MaxQueueSize); + } TImpl(THttpServer* parent, ICallBack* cb, TMtpQueueRef mainWorkers, TMtpQueueRef failWorkers, const TOptions& options_) : Requests(mainWorkers) @@ -415,29 +415,29 @@ public: options) { } - ~TImpl() { - try { - Stop(); - } catch (...) { + ~TImpl() { + try { + Stop(); + } catch (...) { } - } + } inline const TOptions& Options() const noexcept { - return Options_; - } + return Options_; + } inline void DecreaseConnections() noexcept { - AtomicDecrement(ConnectionCount); - } + AtomicDecrement(ConnectionCount); + } inline void IncreaseConnections() noexcept { - AtomicIncrement(ConnectionCount); - } - - inline i64 GetClientCount() const { - return AtomicGet(ConnectionCount); - } - + AtomicIncrement(ConnectionCount); + } + + inline i64 GetClientCount() const { + return AtomicGet(ConnectionCount); + } + inline bool MaxRequestsReached() const { return Options_.MaxConnections && ((size_t)GetClientCount() >= Options_.MaxConnections); } @@ -449,11 +449,11 @@ public: TMtpQueueRef Requests; TMtpQueueRef FailRequests; TAtomic ConnectionCount = 0; - THolder<TSocketPoller> Poller; - THolder<TConnections> Connections; + THolder<TSocketPoller> Poller; + THolder<TConnections> Connections; bool ListenerRunningOK = false; int ErrorCode = 0; - TOptions Options_; + TOptions Options_; ICallBack* Cb_ = nullptr; THttpServer* Parent_ = nullptr; TWakeupPollAble WakeupPollAble; @@ -571,7 +571,7 @@ TClientConnection::~TClientConnection() { } void TClientConnection::OnPollEvent(TInstant now) { - THolder<TClientConnection> this_(this); + THolder<TClientConnection> this_(this); Activate(now); { @@ -588,7 +588,7 @@ void TClientConnection::OnPollEvent(TInstant now) { } } - THolder<TClientRequest> obj(HttpServ_->CreateRequest(this_)); + THolder<TClientRequest> obj(HttpServ_->CreateRequest(this_)); AcceptMoment = now; HttpServ_->AddRequest(obj, Reject_); @@ -601,7 +601,7 @@ void TClientConnection::Activate(TInstant now) noexcept { } void TClientConnection::DeActivate() { - HttpServ_->Connections->Add(this); + HttpServ_->Connections->Add(this); } void TClientConnection::Reject() { @@ -677,7 +677,7 @@ void TClientRequest::ResetConnection() { } void TClientRequest::Process(void* ThreadSpecificResource) { - THolder<TClientRequest> this_(this); + THolder<TClientRequest> this_(this); auto* serverImpl = Conn_->HttpServ_; diff --git a/library/cpp/http/server/http.h b/library/cpp/http/server/http.h index b292d38f27..d0bc92ff7d 100644 --- a/library/cpp/http/server/http.h +++ b/library/cpp/http/server/http.h @@ -15,75 +15,75 @@ class TClientRequest; class TClientConnection; class THttpServer { - friend class TClientRequest; - friend class TClientConnection; + friend class TClientRequest; + friend class TClientConnection; -public: - class ICallBack { +public: + class ICallBack { public: - struct TFailLogData { - int failstate; + struct TFailLogData { + int failstate; TString url; - }; + }; - virtual ~ICallBack() { - } + virtual ~ICallBack() { + } - virtual void OnFailRequest(int /*failstate*/) { - } + virtual void OnFailRequest(int /*failstate*/) { + } - virtual void OnFailRequestEx(const TFailLogData& d) { - OnFailRequest(d.failstate); - } + virtual void OnFailRequestEx(const TFailLogData& d) { + OnFailRequest(d.failstate); + } - virtual void OnException() { - } + virtual void OnException() { + } - virtual void OnMaxConn() { - } + virtual void OnMaxConn() { + } - virtual TClientRequest* CreateClient() = 0; + virtual TClientRequest* CreateClient() = 0; - virtual void OnListenStart() { - } + virtual void OnListenStart() { + } - virtual void OnListenStop() { - } + virtual void OnListenStop() { + } - virtual void OnWait() { - } + virtual void OnWait() { + } virtual void* CreateThreadSpecificResource() { return nullptr; - } + } - virtual void DestroyThreadSpecificResource(void*) { - } - }; + virtual void DestroyThreadSpecificResource(void*) { + } + }; - typedef THttpServerOptions TOptions; + typedef THttpServerOptions TOptions; typedef TSimpleSharedPtr<IThreadPool> TMtpQueueRef; THttpServer(ICallBack* cb, const TOptions& options = TOptions(), IThreadFactory* pool = nullptr); THttpServer(ICallBack* cb, TMtpQueueRef mainWorkers, TMtpQueueRef failWorkers, const TOptions& options = TOptions()); - virtual ~THttpServer(); + virtual ~THttpServer(); - bool Start(); + bool Start(); - // shutdown a.s.a.p. - void Stop(); + // shutdown a.s.a.p. + void Stop(); // graceful shutdown with serving all already open connections - void Shutdown(); + void Shutdown(); - void Wait(); - int GetErrorCode(); - const char* GetError(); - void RestartRequestThreads(ui32 nTh, ui32 maxQS); + void Wait(); + int GetErrorCode(); + const char* GetError(); + void RestartRequestThreads(ui32 nTh, ui32 maxQS); const TOptions& Options() const noexcept; - i64 GetClientCount() const; + i64 GetClientCount() const; - class TImpl; + class TImpl; size_t GetRequestQueueSize() const; size_t GetFailQueueSize() const; @@ -92,30 +92,30 @@ public: static TAtomicBase AcceptReturnsInvalidSocketCounter(); -private: +private: bool MaxRequestsReached() const; private: - THolder<TImpl> Impl_; + THolder<TImpl> Impl_; }; /** * @deprecated Use TRequestReplier instead */ class TClientRequest: public IObjectInQueue { - friend class THttpServer::TImpl; + friend class THttpServer::TImpl; -public: - TClientRequest(); +public: + TClientRequest(); ~TClientRequest() override; inline THttpInput& Input() noexcept { - return *HttpConn_->Input(); - } + return *HttpConn_->Input(); + } inline THttpOutput& Output() noexcept { - return *HttpConn_->Output(); - } + return *HttpConn_->Output(); + } THttpServer* HttpServ() const noexcept; const TSocket& Socket() const noexcept; @@ -123,29 +123,29 @@ public: TInstant AcceptMoment() const noexcept; bool IsLocal() const; - bool CheckLoopback(); - void ProcessFailRequest(int failstate); + bool CheckLoopback(); + void ProcessFailRequest(int failstate); void ReleaseConnection(); void ResetConnection(); -private: - /* - * Processes the request after 'connection' been created and 'Headers' been read - * Returns 'false' if the processing must be continued by the next handler, - * 'true' otherwise ('this' will be deleted) - */ - virtual bool Reply(void* ThreadSpecificResource); +private: + /* + * Processes the request after 'connection' been created and 'Headers' been read + * Returns 'false' if the processing must be continued by the next handler, + * 'true' otherwise ('this' will be deleted) + */ + virtual bool Reply(void* ThreadSpecificResource); void Process(void* ThreadSpecificResource) override; -public: +public: TVector<std::pair<TString, TString>> ParsedHeaders; TString RequestString; -private: - THolder<TClientConnection> Conn_; - THolder<THttpServerConn> HttpConn_; +private: + THolder<TClientConnection> Conn_; + THolder<THttpServerConn> HttpConn_; }; class TRequestReplier: public TClientRequest { diff --git a/library/cpp/http/server/http_ut.cpp b/library/cpp/http/server/http_ut.cpp index cc62bb988e..5f21fc935c 100644 --- a/library/cpp/http/server/http_ut.cpp +++ b/library/cpp/http/server/http_ut.cpp @@ -330,7 +330,7 @@ Y_UNIT_TEST_SUITE(THttpServerTest) { THttpServer server(&serverImpl, THttpServer::TOptions(port).EnableKeepAlive(true).EnableCompression(true)); for (int i = 0; i < 2; ++i) { - UNIT_ASSERT(server.Start()); + UNIT_ASSERT(server.Start()); TTestRequest r(port); r.Content = res; diff --git a/library/cpp/http/server/ut/ya.make b/library/cpp/http/server/ut/ya.make index bcb4d4c0b8..47a208e867 100644 --- a/library/cpp/http/server/ut/ya.make +++ b/library/cpp/http/server/ut/ya.make @@ -1,6 +1,6 @@ UNITTEST_FOR(library/cpp/http/server) -OWNER(pg) +OWNER(pg) SIZE(MEDIUM) |