aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/http/server/http.cpp
diff options
context:
space:
mode:
authorsomov <somov@yandex-team.ru>2022-02-10 16:45:47 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:47 +0300
commita5950576e397b1909261050b8c7da16db58f10b1 (patch)
tree7ba7677f6a4c3e19e2cefab34d16df2c8963b4d4 /library/cpp/http/server/http.cpp
parent81eddc8c0b55990194e112b02d127b87d54164a9 (diff)
downloadydb-a5950576e397b1909261050b8c7da16db58f10b1.tar.gz
Restoring authorship annotation for <somov@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/http/server/http.cpp')
-rw-r--r--library/cpp/http/server/http.cpp292
1 files changed, 146 insertions, 146 deletions
diff --git a/library/cpp/http/server/http.cpp b/library/cpp/http/server/http.cpp
index 128583bdd70..fbd127a6520 100644
--- a/library/cpp/http/server/http.cpp
+++ b/library/cpp/http/server/http.cpp
@@ -1,4 +1,4 @@
-#include "http.h"
+#include "http.h"
#include "http_ex.h"
#include <library/cpp/threading/equeue/equeue.h>
@@ -7,25 +7,25 @@
#include <util/generic/cast.h>
#include <util/generic/intrlist.h>
#include <util/generic/yexception.h>
-#include <util/network/address.h>
+#include <util/network/address.h>
#include <util/network/socket.h>
-#include <util/network/poller.h>
-#include <util/system/atomic.h>
-#include <util/system/compat.h> // stricmp, strnicmp, strlwr, strupr, stpcpy
-#include <util/system/defaults.h>
-#include <util/system/event.h>
-#include <util/system/mutex.h>
+#include <util/network/poller.h>
+#include <util/system/atomic.h>
+#include <util/system/compat.h> // stricmp, strnicmp, strlwr, strupr, stpcpy
+#include <util/system/defaults.h>
+#include <util/system/event.h>
+#include <util/system/mutex.h>
#include <util/system/pipe.h>
-#include <util/system/thread.h>
+#include <util/system/thread.h>
#include <util/thread/factory.h>
-#include <cerrno>
-#include <cstring>
-#include <ctime>
-
-#include <sys/stat.h>
-#include <sys/types.h>
+#include <cerrno>
+#include <cstring>
+#include <ctime>
+#include <sys/stat.h>
+#include <sys/types.h>
+
using namespace NAddr;
namespace {
@@ -51,18 +51,18 @@ namespace {
}
class TClientConnection: public IPollAble, public TIntrusiveListItem<TClientConnection> {
-public:
+public:
TClientConnection(const TSocket& s, THttpServer::TImpl* serv, NAddr::IRemoteAddrRef listenerSockAddrRef);
~TClientConnection() override;
void OnPollEvent(TInstant now) override;
inline void Activate(TInstant now) noexcept;
- inline void DeActivate();
+ inline void DeActivate();
inline void Reject();
-public:
- TSocket Socket_;
+public:
+ TSocket Socket_;
NAddr::IRemoteAddrRef ListenerSockAddrRef_;
THttpServer::TImpl* HttpServ_ = nullptr;
bool Reject_ = false;
@@ -72,70 +72,70 @@ public:
};
class THttpServer::TImpl {
-public:
- class TConnections {
+public:
+ class TConnections {
public:
inline TConnections(TSocketPoller* poller, const THttpServerOptions& options)
- : Poller_(poller)
+ : Poller_(poller)
, Options(options)
- {
- }
+ {
+ }
inline ~TConnections() {
- }
+ }
inline void Add(TClientConnection* c) noexcept {
- TGuard<TMutex> g(Mutex_);
+ TGuard<TMutex> g(Mutex_);
- Conns_.PushBack(c);
- Poller_->WaitRead(c->Socket_, (void*)static_cast<const IPollAble*>(c));
- }
+ Conns_.PushBack(c);
+ Poller_->WaitRead(c->Socket_, (void*)static_cast<const IPollAble*>(c));
+ }
inline void Erase(TClientConnection* c, TInstant now) noexcept {
- TGuard<TMutex> g(Mutex_);
+ TGuard<TMutex> g(Mutex_);
EraseUnsafe(c);
if (Options.ExpirationTimeout > TDuration::Zero()) {
TryRemovingUnsafe(now - Options.ExpirationTimeout);
}
- }
+ }
inline void Clear() noexcept {
- TGuard<TMutex> g(Mutex_);
+ TGuard<TMutex> g(Mutex_);
- Conns_.Clear();
- }
+ Conns_.Clear();
+ }
inline bool RemoveOld(TInstant border) noexcept {
- TGuard<TMutex> g(Mutex_);
+ TGuard<TMutex> g(Mutex_);
return TryRemovingUnsafe(border);
}
bool TryRemovingUnsafe(TInstant border) noexcept {
- if (Conns_.Empty()) {
+ if (Conns_.Empty()) {
return false;
- }
- TClientConnection* c = &*(Conns_.Begin());
+ }
+ TClientConnection* c = &*(Conns_.Begin());
if (c->LastUsed > border) {
return false;
}
EraseUnsafe(c);
- delete c;
+ delete c;
return true;
- }
+ }
void EraseUnsafe(TClientConnection* c) noexcept {
Poller_->Unwait(c->Socket_);
c->Unlink();
}
- public:
- TMutex Mutex_;
- TIntrusiveListWithAutoDelete<TClientConnection, TDelete> Conns_;
+ public:
+ TMutex Mutex_;
+ TIntrusiveListWithAutoDelete<TClientConnection, TDelete> Conns_;
TSocketPoller* Poller_ = nullptr;
const THttpServerOptions& Options;
- };
+ };
- static void* ListenSocketFunction(void* param) {
+ static void* ListenSocketFunction(void* param) {
try {
((TImpl*)param)->ListenSocket();
} catch (...) {
@@ -143,19 +143,19 @@ public:
}
return nullptr;
- }
+ }
- TAutoPtr<TClientRequest> CreateRequest(TAutoPtr<TClientConnection> c) {
- THolder<TClientRequest> obj(Cb_->CreateClient());
+ TAutoPtr<TClientRequest> CreateRequest(TAutoPtr<TClientConnection> c) {
+ THolder<TClientRequest> obj(Cb_->CreateClient());
- obj->Conn_.Reset(c.Release());
-
- return obj;
- }
+ obj->Conn_.Reset(c.Release());
+ return obj;
+ }
+
void AddRequestFromSocket(const TSocket& s, TInstant now, NAddr::IRemoteAddrRef listenerSockAddrRef) {
if (MaxRequestsReached()) {
- Cb_->OnMaxConn();
+ Cb_->OnMaxConn();
bool wasRemoved = Connections->RemoveOld(TInstant::Max());
if (!wasRemoved && Options_.RejectExcessConnections) {
(new TClientConnection(s, this, listenerSockAddrRef))->Reject();
@@ -166,27 +166,27 @@ public:
auto connection = new TClientConnection(s, this, listenerSockAddrRef);
connection->LastUsed = now;
connection->DeActivate();
- }
+ }
- void SaveErrorCode() {
+ void SaveErrorCode() {
ErrorCode = WSAGetLastError();
- }
+ }
int GetErrorCode() const {
return ErrorCode;
- }
+ }
const char* GetError() const {
return LastSystemErrorText(ErrorCode);
- }
+ }
- bool Start() {
- Poller.Reset(new TSocketPoller());
+ bool Start() {
+ Poller.Reset(new TSocketPoller());
Connections.Reset(new TConnections(Poller.Get(), Options_));
-
+
// Start the listener thread
- ListenerRunningOK = false;
-
+ ListenerRunningOK = false;
+
// throws on error
TPipeHandle::Pipe(ListenWakeupReadFd, ListenWakeupWriteFd);
@@ -195,31 +195,31 @@ public:
Poller->WaitRead(ListenWakeupReadFd, &WakeupPollAble);
- ListenStartEvent.Reset();
- try {
+ ListenStartEvent.Reset();
+ try {
ListenThread.Reset(new TThread(ListenSocketFunction, this));
ListenThread->Start();
- } catch (const yexception&) {
- SaveErrorCode();
- return false;
+ } catch (const yexception&) {
+ SaveErrorCode();
+ return false;
}
// Wait until the thread has completely started and return the success indicator
- ListenStartEvent.Wait();
+ ListenStartEvent.Wait();
- return ListenerRunningOK;
- }
+ return ListenerRunningOK;
+ }
- void Wait() {
- Cb_->OnWait();
+ void Wait() {
+ Cb_->OnWait();
TGuard<TMutex> g(StopMutex);
if (ListenThread) {
ListenThread->Join();
ListenThread.Reset(nullptr);
}
- }
+ }
- void Stop() {
+ void Stop() {
Shutdown();
TGuard<TMutex> g(StopMutex);
@@ -228,19 +228,19 @@ public:
ListenThread.Reset(nullptr);
}
- while (ConnectionCount) {
- usleep(10000);
- Connections->Clear();
+ while (ConnectionCount) {
+ usleep(10000);
+ Connections->Clear();
}
- Connections.Destroy();
- Poller.Destroy();
- }
+ Connections.Destroy();
+ Poller.Destroy();
+ }
- void Shutdown() {
+ void Shutdown() {
ListenWakeupWriteFd.Write("", 1);
// ignore result
- }
+ }
void AddRequest(TAutoPtr<TClientRequest> req, bool fail) {
struct TFailRequest: public THttpClientRequestEx {
@@ -257,20 +257,20 @@ public:
ProcessFailRequest(0);
return true;
}
- };
+ };
if (!fail && Requests->Add(req.Get())) {
Y_UNUSED(req.Release());
- } else {
- req = new TFailRequest(req);
+ } else {
+ req = new TFailRequest(req);
- if (FailRequests->Add(req.Get())) {
+ if (FailRequests->Add(req.Get())) {
Y_UNUSED(req.Release());
} else {
- Cb_->OnFailRequest(-1);
+ Cb_->OnFailRequest(-1);
}
}
- }
+ }
size_t GetRequestQueueSize() const {
return Requests->Size();
@@ -305,8 +305,8 @@ public:
if (s == INVALID_SOCKET) {
ythrow yexception() << "accept: " << LastSystemErrorText();
- }
-
+ }
+
Server_->AddRequestFromSocket(s, TInstant::Now(), SockAddrRef_);
}
@@ -318,13 +318,13 @@ public:
TSocket S_;
TImpl* Server_ = nullptr;
NAddr::IRemoteAddrRef SockAddrRef_;
- };
+ };
- void ListenSocket() {
+ void ListenSocket() {
TThread::SetCurrentThreadName(Options_.ListenThreadName.c_str());
ErrorCode = 0;
- TIntrusiveListWithAutoDelete<TListenSocket, TDelete> Reqs;
+ TIntrusiveListWithAutoDelete<TListenSocket, TDelete> Reqs;
std::function<void(TSocket)> callback = [&](TSocket socket) {
THolder<TListenSocket> ls(new TListenSocket(socket, this));
@@ -337,65 +337,65 @@ public:
ListenStartEvent.Signal();
return;
- }
+ }
- Requests->Start(Options_.nThreads, Options_.MaxQueueSize);
- FailRequests->Start(Options_.nFThreads, Options_.MaxFQueueSize);
- Cb_->OnListenStart();
- ListenerRunningOK = true;
- ListenStartEvent.Signal();
+ Requests->Start(Options_.nThreads, Options_.MaxQueueSize);
+ FailRequests->Start(Options_.nFThreads, Options_.MaxFQueueSize);
+ Cb_->OnListenStart();
+ ListenerRunningOK = true;
+ ListenStartEvent.Signal();
TVector<void*> events;
- events.resize(1);
+ events.resize(1);
TInstant now = TInstant::Now();
for (;;) {
- try {
+ try {
const TInstant deadline = Options_.PollTimeout == TDuration::Zero() ? TInstant::Max() : now + Options_.PollTimeout;
const size_t ret = Poller->WaitD(events.data(), events.size(), deadline);
now = TInstant::Now();
- for (size_t i = 0; i < ret; ++i) {
+ for (size_t i = 0; i < ret; ++i) {
((IPollAble*)events[i])->OnPollEvent(now);
- }
+ }
if (ret == 0 && Options_.ExpirationTimeout > TDuration::Zero()) {
Connections->RemoveOld(now - Options_.ExpirationTimeout);
}
// When MaxConnections is limited or ExpirationTimeout is set, OnPollEvent can call
- // RemoveOld and destroy other IPollAble* objects in the
- // poller. Thus in this case we can safely process only one
- // event from the poller at a time.
+ // RemoveOld and destroy other IPollAble* objects in the
+ // poller. Thus in this case we can safely process only one
+ // event from the poller at a time.
if (!Options_.MaxConnections && Options_.ExpirationTimeout == TDuration::Zero()) {
if (ret >= events.size()) {
- events.resize(ret * 2);
+ events.resize(ret * 2);
}
}
} catch (const TShouldStop&) {
break;
- } catch (...) {
- Cb_->OnException();
+ } catch (...) {
+ Cb_->OnException();
}
}
- while (!Reqs.Empty()) {
- THolder<TListenSocket> ls(Reqs.PopFront());
+ while (!Reqs.Empty()) {
+ THolder<TListenSocket> ls(Reqs.PopFront());
- Poller->Unwait(ls->GetSocket());
+ Poller->Unwait(ls->GetSocket());
}
- Requests->Stop();
- FailRequests->Stop();
- Cb_->OnListenStop();
- }
+ Requests->Stop();
+ FailRequests->Stop();
+ Cb_->OnListenStop();
+ }
- void RestartRequestThreads(ui32 nTh, ui32 maxQS) {
- Requests->Stop();
- Options_.nThreads = nTh;
- Options_.MaxQueueSize = maxQS;
- Requests->Start(Options_.nThreads, Options_.MaxQueueSize);
- }
+ void RestartRequestThreads(ui32 nTh, ui32 maxQS) {
+ Requests->Stop();
+ Options_.nThreads = nTh;
+ Options_.MaxQueueSize = maxQS;
+ Requests->Start(Options_.nThreads, Options_.MaxQueueSize);
+ }
TImpl(THttpServer* parent, ICallBack* cb, TMtpQueueRef mainWorkers, TMtpQueueRef failWorkers, const TOptions& options_)
: Requests(mainWorkers)
@@ -415,29 +415,29 @@ public:
options) {
}
- ~TImpl() {
- try {
- Stop();
- } catch (...) {
+ ~TImpl() {
+ try {
+ Stop();
+ } catch (...) {
}
- }
+ }
inline const TOptions& Options() const noexcept {
- return Options_;
- }
+ return Options_;
+ }
inline void DecreaseConnections() noexcept {
- AtomicDecrement(ConnectionCount);
- }
+ AtomicDecrement(ConnectionCount);
+ }
inline void IncreaseConnections() noexcept {
- AtomicIncrement(ConnectionCount);
- }
-
- inline i64 GetClientCount() const {
- return AtomicGet(ConnectionCount);
- }
-
+ AtomicIncrement(ConnectionCount);
+ }
+
+ inline i64 GetClientCount() const {
+ return AtomicGet(ConnectionCount);
+ }
+
inline bool MaxRequestsReached() const {
return Options_.MaxConnections && ((size_t)GetClientCount() >= Options_.MaxConnections);
}
@@ -449,11 +449,11 @@ public:
TMtpQueueRef Requests;
TMtpQueueRef FailRequests;
TAtomic ConnectionCount = 0;
- THolder<TSocketPoller> Poller;
- THolder<TConnections> Connections;
+ THolder<TSocketPoller> Poller;
+ THolder<TConnections> Connections;
bool ListenerRunningOK = false;
int ErrorCode = 0;
- TOptions Options_;
+ TOptions Options_;
ICallBack* Cb_ = nullptr;
THttpServer* Parent_ = nullptr;
TWakeupPollAble WakeupPollAble;
@@ -571,7 +571,7 @@ TClientConnection::~TClientConnection() {
}
void TClientConnection::OnPollEvent(TInstant now) {
- THolder<TClientConnection> this_(this);
+ THolder<TClientConnection> this_(this);
Activate(now);
{
@@ -588,7 +588,7 @@ void TClientConnection::OnPollEvent(TInstant now) {
}
}
- THolder<TClientRequest> obj(HttpServ_->CreateRequest(this_));
+ THolder<TClientRequest> obj(HttpServ_->CreateRequest(this_));
AcceptMoment = now;
HttpServ_->AddRequest(obj, Reject_);
@@ -601,7 +601,7 @@ void TClientConnection::Activate(TInstant now) noexcept {
}
void TClientConnection::DeActivate() {
- HttpServ_->Connections->Add(this);
+ HttpServ_->Connections->Add(this);
}
void TClientConnection::Reject() {
@@ -677,7 +677,7 @@ void TClientRequest::ResetConnection() {
}
void TClientRequest::Process(void* ThreadSpecificResource) {
- THolder<TClientRequest> this_(this);
+ THolder<TClientRequest> this_(this);
auto* serverImpl = Conn_->HttpServ_;