aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/http/server
diff options
context:
space:
mode:
authorsomov <somov@yandex-team.ru>2022-02-10 16:45:47 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:47 +0300
commita5950576e397b1909261050b8c7da16db58f10b1 (patch)
tree7ba7677f6a4c3e19e2cefab34d16df2c8963b4d4 /library/cpp/http/server
parent81eddc8c0b55990194e112b02d127b87d54164a9 (diff)
downloadydb-a5950576e397b1909261050b8c7da16db58f10b1.tar.gz
Restoring authorship annotation for <somov@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/http/server')
-rw-r--r--library/cpp/http/server/http.cpp292
-rw-r--r--library/cpp/http/server/http.h126
-rw-r--r--library/cpp/http/server/http_ut.cpp2
-rw-r--r--library/cpp/http/server/ut/ya.make2
4 files changed, 211 insertions, 211 deletions
diff --git a/library/cpp/http/server/http.cpp b/library/cpp/http/server/http.cpp
index 128583bdd7..fbd127a652 100644
--- a/library/cpp/http/server/http.cpp
+++ b/library/cpp/http/server/http.cpp
@@ -1,4 +1,4 @@
-#include "http.h"
+#include "http.h"
#include "http_ex.h"
#include <library/cpp/threading/equeue/equeue.h>
@@ -7,25 +7,25 @@
#include <util/generic/cast.h>
#include <util/generic/intrlist.h>
#include <util/generic/yexception.h>
-#include <util/network/address.h>
+#include <util/network/address.h>
#include <util/network/socket.h>
-#include <util/network/poller.h>
-#include <util/system/atomic.h>
-#include <util/system/compat.h> // stricmp, strnicmp, strlwr, strupr, stpcpy
-#include <util/system/defaults.h>
-#include <util/system/event.h>
-#include <util/system/mutex.h>
+#include <util/network/poller.h>
+#include <util/system/atomic.h>
+#include <util/system/compat.h> // stricmp, strnicmp, strlwr, strupr, stpcpy
+#include <util/system/defaults.h>
+#include <util/system/event.h>
+#include <util/system/mutex.h>
#include <util/system/pipe.h>
-#include <util/system/thread.h>
+#include <util/system/thread.h>
#include <util/thread/factory.h>
-#include <cerrno>
-#include <cstring>
-#include <ctime>
-
-#include <sys/stat.h>
-#include <sys/types.h>
+#include <cerrno>
+#include <cstring>
+#include <ctime>
+#include <sys/stat.h>
+#include <sys/types.h>
+
using namespace NAddr;
namespace {
@@ -51,18 +51,18 @@ namespace {
}
class TClientConnection: public IPollAble, public TIntrusiveListItem<TClientConnection> {
-public:
+public:
TClientConnection(const TSocket& s, THttpServer::TImpl* serv, NAddr::IRemoteAddrRef listenerSockAddrRef);
~TClientConnection() override;
void OnPollEvent(TInstant now) override;
inline void Activate(TInstant now) noexcept;
- inline void DeActivate();
+ inline void DeActivate();
inline void Reject();
-public:
- TSocket Socket_;
+public:
+ TSocket Socket_;
NAddr::IRemoteAddrRef ListenerSockAddrRef_;
THttpServer::TImpl* HttpServ_ = nullptr;
bool Reject_ = false;
@@ -72,70 +72,70 @@ public:
};
class THttpServer::TImpl {
-public:
- class TConnections {
+public:
+ class TConnections {
public:
inline TConnections(TSocketPoller* poller, const THttpServerOptions& options)
- : Poller_(poller)
+ : Poller_(poller)
, Options(options)
- {
- }
+ {
+ }
inline ~TConnections() {
- }
+ }
inline void Add(TClientConnection* c) noexcept {
- TGuard<TMutex> g(Mutex_);
+ TGuard<TMutex> g(Mutex_);
- Conns_.PushBack(c);
- Poller_->WaitRead(c->Socket_, (void*)static_cast<const IPollAble*>(c));
- }
+ Conns_.PushBack(c);
+ Poller_->WaitRead(c->Socket_, (void*)static_cast<const IPollAble*>(c));
+ }
inline void Erase(TClientConnection* c, TInstant now) noexcept {
- TGuard<TMutex> g(Mutex_);
+ TGuard<TMutex> g(Mutex_);
EraseUnsafe(c);
if (Options.ExpirationTimeout > TDuration::Zero()) {
TryRemovingUnsafe(now - Options.ExpirationTimeout);
}
- }
+ }
inline void Clear() noexcept {
- TGuard<TMutex> g(Mutex_);
+ TGuard<TMutex> g(Mutex_);
- Conns_.Clear();
- }
+ Conns_.Clear();
+ }
inline bool RemoveOld(TInstant border) noexcept {
- TGuard<TMutex> g(Mutex_);
+ TGuard<TMutex> g(Mutex_);
return TryRemovingUnsafe(border);
}
bool TryRemovingUnsafe(TInstant border) noexcept {
- if (Conns_.Empty()) {
+ if (Conns_.Empty()) {
return false;
- }
- TClientConnection* c = &*(Conns_.Begin());
+ }
+ TClientConnection* c = &*(Conns_.Begin());
if (c->LastUsed > border) {
return false;
}
EraseUnsafe(c);
- delete c;
+ delete c;
return true;
- }
+ }
void EraseUnsafe(TClientConnection* c) noexcept {
Poller_->Unwait(c->Socket_);
c->Unlink();
}
- public:
- TMutex Mutex_;
- TIntrusiveListWithAutoDelete<TClientConnection, TDelete> Conns_;
+ public:
+ TMutex Mutex_;
+ TIntrusiveListWithAutoDelete<TClientConnection, TDelete> Conns_;
TSocketPoller* Poller_ = nullptr;
const THttpServerOptions& Options;
- };
+ };
- static void* ListenSocketFunction(void* param) {
+ static void* ListenSocketFunction(void* param) {
try {
((TImpl*)param)->ListenSocket();
} catch (...) {
@@ -143,19 +143,19 @@ public:
}
return nullptr;
- }
+ }
- TAutoPtr<TClientRequest> CreateRequest(TAutoPtr<TClientConnection> c) {
- THolder<TClientRequest> obj(Cb_->CreateClient());
+ TAutoPtr<TClientRequest> CreateRequest(TAutoPtr<TClientConnection> c) {
+ THolder<TClientRequest> obj(Cb_->CreateClient());
- obj->Conn_.Reset(c.Release());
-
- return obj;
- }
+ obj->Conn_.Reset(c.Release());
+ return obj;
+ }
+
void AddRequestFromSocket(const TSocket& s, TInstant now, NAddr::IRemoteAddrRef listenerSockAddrRef) {
if (MaxRequestsReached()) {
- Cb_->OnMaxConn();
+ Cb_->OnMaxConn();
bool wasRemoved = Connections->RemoveOld(TInstant::Max());
if (!wasRemoved && Options_.RejectExcessConnections) {
(new TClientConnection(s, this, listenerSockAddrRef))->Reject();
@@ -166,27 +166,27 @@ public:
auto connection = new TClientConnection(s, this, listenerSockAddrRef);
connection->LastUsed = now;
connection->DeActivate();
- }
+ }
- void SaveErrorCode() {
+ void SaveErrorCode() {
ErrorCode = WSAGetLastError();
- }
+ }
int GetErrorCode() const {
return ErrorCode;
- }
+ }
const char* GetError() const {
return LastSystemErrorText(ErrorCode);
- }
+ }
- bool Start() {
- Poller.Reset(new TSocketPoller());
+ bool Start() {
+ Poller.Reset(new TSocketPoller());
Connections.Reset(new TConnections(Poller.Get(), Options_));
-
+
// Start the listener thread
- ListenerRunningOK = false;
-
+ ListenerRunningOK = false;
+
// throws on error
TPipeHandle::Pipe(ListenWakeupReadFd, ListenWakeupWriteFd);
@@ -195,31 +195,31 @@ public:
Poller->WaitRead(ListenWakeupReadFd, &WakeupPollAble);
- ListenStartEvent.Reset();
- try {
+ ListenStartEvent.Reset();
+ try {
ListenThread.Reset(new TThread(ListenSocketFunction, this));
ListenThread->Start();
- } catch (const yexception&) {
- SaveErrorCode();
- return false;
+ } catch (const yexception&) {
+ SaveErrorCode();
+ return false;
}
// Wait until the thread has completely started and return the success indicator
- ListenStartEvent.Wait();
+ ListenStartEvent.Wait();
- return ListenerRunningOK;
- }
+ return ListenerRunningOK;
+ }
- void Wait() {
- Cb_->OnWait();
+ void Wait() {
+ Cb_->OnWait();
TGuard<TMutex> g(StopMutex);
if (ListenThread) {
ListenThread->Join();
ListenThread.Reset(nullptr);
}
- }
+ }
- void Stop() {
+ void Stop() {
Shutdown();
TGuard<TMutex> g(StopMutex);
@@ -228,19 +228,19 @@ public:
ListenThread.Reset(nullptr);
}
- while (ConnectionCount) {
- usleep(10000);
- Connections->Clear();
+ while (ConnectionCount) {
+ usleep(10000);
+ Connections->Clear();
}
- Connections.Destroy();
- Poller.Destroy();
- }
+ Connections.Destroy();
+ Poller.Destroy();
+ }
- void Shutdown() {
+ void Shutdown() {
ListenWakeupWriteFd.Write("", 1);
// ignore result
- }
+ }
void AddRequest(TAutoPtr<TClientRequest> req, bool fail) {
struct TFailRequest: public THttpClientRequestEx {
@@ -257,20 +257,20 @@ public:
ProcessFailRequest(0);
return true;
}
- };
+ };
if (!fail && Requests->Add(req.Get())) {
Y_UNUSED(req.Release());
- } else {
- req = new TFailRequest(req);
+ } else {
+ req = new TFailRequest(req);
- if (FailRequests->Add(req.Get())) {
+ if (FailRequests->Add(req.Get())) {
Y_UNUSED(req.Release());
} else {
- Cb_->OnFailRequest(-1);
+ Cb_->OnFailRequest(-1);
}
}
- }
+ }
size_t GetRequestQueueSize() const {
return Requests->Size();
@@ -305,8 +305,8 @@ public:
if (s == INVALID_SOCKET) {
ythrow yexception() << "accept: " << LastSystemErrorText();
- }
-
+ }
+
Server_->AddRequestFromSocket(s, TInstant::Now(), SockAddrRef_);
}
@@ -318,13 +318,13 @@ public:
TSocket S_;
TImpl* Server_ = nullptr;
NAddr::IRemoteAddrRef SockAddrRef_;
- };
+ };
- void ListenSocket() {
+ void ListenSocket() {
TThread::SetCurrentThreadName(Options_.ListenThreadName.c_str());
ErrorCode = 0;
- TIntrusiveListWithAutoDelete<TListenSocket, TDelete> Reqs;
+ TIntrusiveListWithAutoDelete<TListenSocket, TDelete> Reqs;
std::function<void(TSocket)> callback = [&](TSocket socket) {
THolder<TListenSocket> ls(new TListenSocket(socket, this));
@@ -337,65 +337,65 @@ public:
ListenStartEvent.Signal();
return;
- }
+ }
- Requests->Start(Options_.nThreads, Options_.MaxQueueSize);
- FailRequests->Start(Options_.nFThreads, Options_.MaxFQueueSize);
- Cb_->OnListenStart();
- ListenerRunningOK = true;
- ListenStartEvent.Signal();
+ Requests->Start(Options_.nThreads, Options_.MaxQueueSize);
+ FailRequests->Start(Options_.nFThreads, Options_.MaxFQueueSize);
+ Cb_->OnListenStart();
+ ListenerRunningOK = true;
+ ListenStartEvent.Signal();
TVector<void*> events;
- events.resize(1);
+ events.resize(1);
TInstant now = TInstant::Now();
for (;;) {
- try {
+ try {
const TInstant deadline = Options_.PollTimeout == TDuration::Zero() ? TInstant::Max() : now + Options_.PollTimeout;
const size_t ret = Poller->WaitD(events.data(), events.size(), deadline);
now = TInstant::Now();
- for (size_t i = 0; i < ret; ++i) {
+ for (size_t i = 0; i < ret; ++i) {
((IPollAble*)events[i])->OnPollEvent(now);
- }
+ }
if (ret == 0 && Options_.ExpirationTimeout > TDuration::Zero()) {
Connections->RemoveOld(now - Options_.ExpirationTimeout);
}
// When MaxConnections is limited or ExpirationTimeout is set, OnPollEvent can call
- // RemoveOld and destroy other IPollAble* objects in the
- // poller. Thus in this case we can safely process only one
- // event from the poller at a time.
+ // RemoveOld and destroy other IPollAble* objects in the
+ // poller. Thus in this case we can safely process only one
+ // event from the poller at a time.
if (!Options_.MaxConnections && Options_.ExpirationTimeout == TDuration::Zero()) {
if (ret >= events.size()) {
- events.resize(ret * 2);
+ events.resize(ret * 2);
}
}
} catch (const TShouldStop&) {
break;
- } catch (...) {
- Cb_->OnException();
+ } catch (...) {
+ Cb_->OnException();
}
}
- while (!Reqs.Empty()) {
- THolder<TListenSocket> ls(Reqs.PopFront());
+ while (!Reqs.Empty()) {
+ THolder<TListenSocket> ls(Reqs.PopFront());
- Poller->Unwait(ls->GetSocket());
+ Poller->Unwait(ls->GetSocket());
}
- Requests->Stop();
- FailRequests->Stop();
- Cb_->OnListenStop();
- }
+ Requests->Stop();
+ FailRequests->Stop();
+ Cb_->OnListenStop();
+ }
- void RestartRequestThreads(ui32 nTh, ui32 maxQS) {
- Requests->Stop();
- Options_.nThreads = nTh;
- Options_.MaxQueueSize = maxQS;
- Requests->Start(Options_.nThreads, Options_.MaxQueueSize);
- }
+ void RestartRequestThreads(ui32 nTh, ui32 maxQS) {
+ Requests->Stop();
+ Options_.nThreads = nTh;
+ Options_.MaxQueueSize = maxQS;
+ Requests->Start(Options_.nThreads, Options_.MaxQueueSize);
+ }
TImpl(THttpServer* parent, ICallBack* cb, TMtpQueueRef mainWorkers, TMtpQueueRef failWorkers, const TOptions& options_)
: Requests(mainWorkers)
@@ -415,29 +415,29 @@ public:
options) {
}
- ~TImpl() {
- try {
- Stop();
- } catch (...) {
+ ~TImpl() {
+ try {
+ Stop();
+ } catch (...) {
}
- }
+ }
inline const TOptions& Options() const noexcept {
- return Options_;
- }
+ return Options_;
+ }
inline void DecreaseConnections() noexcept {
- AtomicDecrement(ConnectionCount);
- }
+ AtomicDecrement(ConnectionCount);
+ }
inline void IncreaseConnections() noexcept {
- AtomicIncrement(ConnectionCount);
- }
-
- inline i64 GetClientCount() const {
- return AtomicGet(ConnectionCount);
- }
-
+ AtomicIncrement(ConnectionCount);
+ }
+
+ inline i64 GetClientCount() const {
+ return AtomicGet(ConnectionCount);
+ }
+
inline bool MaxRequestsReached() const {
return Options_.MaxConnections && ((size_t)GetClientCount() >= Options_.MaxConnections);
}
@@ -449,11 +449,11 @@ public:
TMtpQueueRef Requests;
TMtpQueueRef FailRequests;
TAtomic ConnectionCount = 0;
- THolder<TSocketPoller> Poller;
- THolder<TConnections> Connections;
+ THolder<TSocketPoller> Poller;
+ THolder<TConnections> Connections;
bool ListenerRunningOK = false;
int ErrorCode = 0;
- TOptions Options_;
+ TOptions Options_;
ICallBack* Cb_ = nullptr;
THttpServer* Parent_ = nullptr;
TWakeupPollAble WakeupPollAble;
@@ -571,7 +571,7 @@ TClientConnection::~TClientConnection() {
}
void TClientConnection::OnPollEvent(TInstant now) {
- THolder<TClientConnection> this_(this);
+ THolder<TClientConnection> this_(this);
Activate(now);
{
@@ -588,7 +588,7 @@ void TClientConnection::OnPollEvent(TInstant now) {
}
}
- THolder<TClientRequest> obj(HttpServ_->CreateRequest(this_));
+ THolder<TClientRequest> obj(HttpServ_->CreateRequest(this_));
AcceptMoment = now;
HttpServ_->AddRequest(obj, Reject_);
@@ -601,7 +601,7 @@ void TClientConnection::Activate(TInstant now) noexcept {
}
void TClientConnection::DeActivate() {
- HttpServ_->Connections->Add(this);
+ HttpServ_->Connections->Add(this);
}
void TClientConnection::Reject() {
@@ -677,7 +677,7 @@ void TClientRequest::ResetConnection() {
}
void TClientRequest::Process(void* ThreadSpecificResource) {
- THolder<TClientRequest> this_(this);
+ THolder<TClientRequest> this_(this);
auto* serverImpl = Conn_->HttpServ_;
diff --git a/library/cpp/http/server/http.h b/library/cpp/http/server/http.h
index b292d38f27..d0bc92ff7d 100644
--- a/library/cpp/http/server/http.h
+++ b/library/cpp/http/server/http.h
@@ -15,75 +15,75 @@ class TClientRequest;
class TClientConnection;
class THttpServer {
- friend class TClientRequest;
- friend class TClientConnection;
+ friend class TClientRequest;
+ friend class TClientConnection;
-public:
- class ICallBack {
+public:
+ class ICallBack {
public:
- struct TFailLogData {
- int failstate;
+ struct TFailLogData {
+ int failstate;
TString url;
- };
+ };
- virtual ~ICallBack() {
- }
+ virtual ~ICallBack() {
+ }
- virtual void OnFailRequest(int /*failstate*/) {
- }
+ virtual void OnFailRequest(int /*failstate*/) {
+ }
- virtual void OnFailRequestEx(const TFailLogData& d) {
- OnFailRequest(d.failstate);
- }
+ virtual void OnFailRequestEx(const TFailLogData& d) {
+ OnFailRequest(d.failstate);
+ }
- virtual void OnException() {
- }
+ virtual void OnException() {
+ }
- virtual void OnMaxConn() {
- }
+ virtual void OnMaxConn() {
+ }
- virtual TClientRequest* CreateClient() = 0;
+ virtual TClientRequest* CreateClient() = 0;
- virtual void OnListenStart() {
- }
+ virtual void OnListenStart() {
+ }
- virtual void OnListenStop() {
- }
+ virtual void OnListenStop() {
+ }
- virtual void OnWait() {
- }
+ virtual void OnWait() {
+ }
virtual void* CreateThreadSpecificResource() {
return nullptr;
- }
+ }
- virtual void DestroyThreadSpecificResource(void*) {
- }
- };
+ virtual void DestroyThreadSpecificResource(void*) {
+ }
+ };
- typedef THttpServerOptions TOptions;
+ typedef THttpServerOptions TOptions;
typedef TSimpleSharedPtr<IThreadPool> TMtpQueueRef;
THttpServer(ICallBack* cb, const TOptions& options = TOptions(), IThreadFactory* pool = nullptr);
THttpServer(ICallBack* cb, TMtpQueueRef mainWorkers, TMtpQueueRef failWorkers, const TOptions& options = TOptions());
- virtual ~THttpServer();
+ virtual ~THttpServer();
- bool Start();
+ bool Start();
- // shutdown a.s.a.p.
- void Stop();
+ // shutdown a.s.a.p.
+ void Stop();
// graceful shutdown with serving all already open connections
- void Shutdown();
+ void Shutdown();
- void Wait();
- int GetErrorCode();
- const char* GetError();
- void RestartRequestThreads(ui32 nTh, ui32 maxQS);
+ void Wait();
+ int GetErrorCode();
+ const char* GetError();
+ void RestartRequestThreads(ui32 nTh, ui32 maxQS);
const TOptions& Options() const noexcept;
- i64 GetClientCount() const;
+ i64 GetClientCount() const;
- class TImpl;
+ class TImpl;
size_t GetRequestQueueSize() const;
size_t GetFailQueueSize() const;
@@ -92,30 +92,30 @@ public:
static TAtomicBase AcceptReturnsInvalidSocketCounter();
-private:
+private:
bool MaxRequestsReached() const;
private:
- THolder<TImpl> Impl_;
+ THolder<TImpl> Impl_;
};
/**
* @deprecated Use TRequestReplier instead
*/
class TClientRequest: public IObjectInQueue {
- friend class THttpServer::TImpl;
+ friend class THttpServer::TImpl;
-public:
- TClientRequest();
+public:
+ TClientRequest();
~TClientRequest() override;
inline THttpInput& Input() noexcept {
- return *HttpConn_->Input();
- }
+ return *HttpConn_->Input();
+ }
inline THttpOutput& Output() noexcept {
- return *HttpConn_->Output();
- }
+ return *HttpConn_->Output();
+ }
THttpServer* HttpServ() const noexcept;
const TSocket& Socket() const noexcept;
@@ -123,29 +123,29 @@ public:
TInstant AcceptMoment() const noexcept;
bool IsLocal() const;
- bool CheckLoopback();
- void ProcessFailRequest(int failstate);
+ bool CheckLoopback();
+ void ProcessFailRequest(int failstate);
void ReleaseConnection();
void ResetConnection();
-private:
- /*
- * Processes the request after 'connection' been created and 'Headers' been read
- * Returns 'false' if the processing must be continued by the next handler,
- * 'true' otherwise ('this' will be deleted)
- */
- virtual bool Reply(void* ThreadSpecificResource);
+private:
+ /*
+ * Processes the request after 'connection' been created and 'Headers' been read
+ * Returns 'false' if the processing must be continued by the next handler,
+ * 'true' otherwise ('this' will be deleted)
+ */
+ virtual bool Reply(void* ThreadSpecificResource);
void Process(void* ThreadSpecificResource) override;
-public:
+public:
TVector<std::pair<TString, TString>> ParsedHeaders;
TString RequestString;
-private:
- THolder<TClientConnection> Conn_;
- THolder<THttpServerConn> HttpConn_;
+private:
+ THolder<TClientConnection> Conn_;
+ THolder<THttpServerConn> HttpConn_;
};
class TRequestReplier: public TClientRequest {
diff --git a/library/cpp/http/server/http_ut.cpp b/library/cpp/http/server/http_ut.cpp
index cc62bb988e..5f21fc935c 100644
--- a/library/cpp/http/server/http_ut.cpp
+++ b/library/cpp/http/server/http_ut.cpp
@@ -330,7 +330,7 @@ Y_UNIT_TEST_SUITE(THttpServerTest) {
THttpServer server(&serverImpl, THttpServer::TOptions(port).EnableKeepAlive(true).EnableCompression(true));
for (int i = 0; i < 2; ++i) {
- UNIT_ASSERT(server.Start());
+ UNIT_ASSERT(server.Start());
TTestRequest r(port);
r.Content = res;
diff --git a/library/cpp/http/server/ut/ya.make b/library/cpp/http/server/ut/ya.make
index bcb4d4c0b8..47a208e867 100644
--- a/library/cpp/http/server/ut/ya.make
+++ b/library/cpp/http/server/ut/ya.make
@@ -1,6 +1,6 @@
UNITTEST_FOR(library/cpp/http/server)
-OWNER(pg)
+OWNER(pg)
SIZE(MEDIUM)