summaryrefslogtreecommitdiffstats
path: root/library/cpp/http/server
diff options
context:
space:
mode:
authorDevtools Arcadia <[email protected]>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <[email protected]>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/http/server
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/http/server')
-rw-r--r--library/cpp/http/server/conn.cpp69
-rw-r--r--library/cpp/http/server/conn.h37
-rw-r--r--library/cpp/http/server/http.cpp843
-rw-r--r--library/cpp/http/server/http.h176
-rw-r--r--library/cpp/http/server/http_ex.cpp107
-rw-r--r--library/cpp/http/server/http_ex.h28
-rw-r--r--library/cpp/http/server/http_ut.cpp739
-rw-r--r--library/cpp/http/server/options.cpp43
-rw-r--r--library/cpp/http/server/options.h176
-rw-r--r--library/cpp/http/server/response.cpp65
-rw-r--r--library/cpp/http/server/response.h82
-rw-r--r--library/cpp/http/server/response_ut.cpp142
-rw-r--r--library/cpp/http/server/ut/ya.make12
-rw-r--r--library/cpp/http/server/ya.make27
14 files changed, 2546 insertions, 0 deletions
diff --git a/library/cpp/http/server/conn.cpp b/library/cpp/http/server/conn.cpp
new file mode 100644
index 00000000000..38a76c4c309
--- /dev/null
+++ b/library/cpp/http/server/conn.cpp
@@ -0,0 +1,69 @@
+#include "conn.h"
+
+#include <util/network/socket.h>
+#include <util/stream/buffered.h>
+
+class THttpServerConn::TImpl {
+public:
+ inline TImpl(const TSocket& s, size_t outputBufferSize)
+ : S_(s)
+ , SI_(S_)
+ , SO_(S_)
+ , BO_(&SO_, outputBufferSize)
+ , HI_(&SI_)
+ , HO_(&BO_, &HI_)
+ {
+ }
+
+ inline ~TImpl() {
+ }
+
+ inline THttpInput* Input() noexcept {
+ return &HI_;
+ }
+
+ inline THttpOutput* Output() noexcept {
+ return &HO_;
+ }
+
+ inline void Reset() {
+ if (S_ != INVALID_SOCKET) {
+ // send RST packet to client
+ S_.SetLinger(true, 0);
+ S_.Close();
+ }
+ }
+
+private:
+ TSocket S_;
+ TSocketInput SI_;
+ TSocketOutput SO_;
+ TBufferedOutput BO_;
+ THttpInput HI_;
+ THttpOutput HO_;
+};
+
+THttpServerConn::THttpServerConn(const TSocket& s)
+ : THttpServerConn(s, s.MaximumTransferUnit())
+{
+}
+
+THttpServerConn::THttpServerConn(const TSocket& s, size_t outputBufferSize)
+ : Impl_(new TImpl(s, outputBufferSize))
+{
+}
+
+THttpServerConn::~THttpServerConn() {
+}
+
+THttpInput* THttpServerConn::Input() noexcept {
+ return Impl_->Input();
+}
+
+THttpOutput* THttpServerConn::Output() noexcept {
+ return Impl_->Output();
+}
+
+void THttpServerConn::Reset() {
+ return Impl_->Reset();
+}
diff --git a/library/cpp/http/server/conn.h b/library/cpp/http/server/conn.h
new file mode 100644
index 00000000000..3aa5329af42
--- /dev/null
+++ b/library/cpp/http/server/conn.h
@@ -0,0 +1,37 @@
+#pragma once
+
+#include <library/cpp/http/io/stream.h>
+#include <util/generic/ptr.h>
+
+class TSocket;
+
+/// Потоки ввода/вывода для получения запросов и отправки ответов HTTP-сервера.
+class THttpServerConn {
+public:
+ explicit THttpServerConn(const TSocket& s);
+ THttpServerConn(const TSocket& s, size_t outputBufferSize);
+ ~THttpServerConn();
+
+ THttpInput* Input() noexcept;
+ THttpOutput* Output() noexcept;
+
+ inline const THttpInput* Input() const noexcept {
+ return const_cast<THttpServerConn*>(this)->Input();
+ }
+
+ inline const THttpOutput* Output() const noexcept {
+ return const_cast<THttpServerConn*>(this)->Output();
+ }
+
+ /// Проверяет, можно ли установить режим, при котором соединение с сервером
+ /// не завершается после окончания транзакции.
+ inline bool CanBeKeepAlive() const noexcept {
+ return Output()->CanBeKeepAlive();
+ }
+
+ void Reset();
+
+private:
+ class TImpl;
+ THolder<TImpl> Impl_;
+};
diff --git a/library/cpp/http/server/http.cpp b/library/cpp/http/server/http.cpp
new file mode 100644
index 00000000000..128583bdd70
--- /dev/null
+++ b/library/cpp/http/server/http.cpp
@@ -0,0 +1,843 @@
+#include "http.h"
+#include "http_ex.h"
+
+#include <library/cpp/threading/equeue/equeue.h>
+
+#include <util/generic/buffer.h>
+#include <util/generic/cast.h>
+#include <util/generic/intrlist.h>
+#include <util/generic/yexception.h>
+#include <util/network/address.h>
+#include <util/network/socket.h>
+#include <util/network/poller.h>
+#include <util/system/atomic.h>
+#include <util/system/compat.h> // stricmp, strnicmp, strlwr, strupr, stpcpy
+#include <util/system/defaults.h>
+#include <util/system/event.h>
+#include <util/system/mutex.h>
+#include <util/system/pipe.h>
+#include <util/system/thread.h>
+#include <util/thread/factory.h>
+
+#include <cerrno>
+#include <cstring>
+#include <ctime>
+
+#include <sys/stat.h>
+#include <sys/types.h>
+
+using namespace NAddr;
+
+namespace {
+ class IPollAble {
+ public:
+ inline IPollAble() noexcept {
+ }
+
+ virtual ~IPollAble() {
+ }
+
+ virtual void OnPollEvent(TInstant now) = 0;
+ };
+
+ struct TShouldStop {
+ };
+
+ struct TWakeupPollAble: public IPollAble {
+ void OnPollEvent(TInstant) override {
+ throw TShouldStop();
+ }
+ };
+}
+
+class TClientConnection: public IPollAble, public TIntrusiveListItem<TClientConnection> {
+public:
+ TClientConnection(const TSocket& s, THttpServer::TImpl* serv, NAddr::IRemoteAddrRef listenerSockAddrRef);
+ ~TClientConnection() override;
+
+ void OnPollEvent(TInstant now) override;
+
+ inline void Activate(TInstant now) noexcept;
+ inline void DeActivate();
+ inline void Reject();
+
+public:
+ TSocket Socket_;
+ NAddr::IRemoteAddrRef ListenerSockAddrRef_;
+ THttpServer::TImpl* HttpServ_ = nullptr;
+ bool Reject_ = false;
+ TInstant LastUsed;
+ TInstant AcceptMoment;
+ size_t ReceivedRequests = 0;
+};
+
+class THttpServer::TImpl {
+public:
+ class TConnections {
+ public:
+ inline TConnections(TSocketPoller* poller, const THttpServerOptions& options)
+ : Poller_(poller)
+ , Options(options)
+ {
+ }
+
+ inline ~TConnections() {
+ }
+
+ inline void Add(TClientConnection* c) noexcept {
+ TGuard<TMutex> g(Mutex_);
+
+ Conns_.PushBack(c);
+ Poller_->WaitRead(c->Socket_, (void*)static_cast<const IPollAble*>(c));
+ }
+
+ inline void Erase(TClientConnection* c, TInstant now) noexcept {
+ TGuard<TMutex> g(Mutex_);
+ EraseUnsafe(c);
+ if (Options.ExpirationTimeout > TDuration::Zero()) {
+ TryRemovingUnsafe(now - Options.ExpirationTimeout);
+ }
+ }
+
+ inline void Clear() noexcept {
+ TGuard<TMutex> g(Mutex_);
+
+ Conns_.Clear();
+ }
+
+ inline bool RemoveOld(TInstant border) noexcept {
+ TGuard<TMutex> g(Mutex_);
+ return TryRemovingUnsafe(border);
+ }
+
+ bool TryRemovingUnsafe(TInstant border) noexcept {
+ if (Conns_.Empty()) {
+ return false;
+ }
+ TClientConnection* c = &*(Conns_.Begin());
+ if (c->LastUsed > border) {
+ return false;
+ }
+ EraseUnsafe(c);
+ delete c;
+ return true;
+ }
+
+ void EraseUnsafe(TClientConnection* c) noexcept {
+ Poller_->Unwait(c->Socket_);
+ c->Unlink();
+ }
+
+ public:
+ TMutex Mutex_;
+ TIntrusiveListWithAutoDelete<TClientConnection, TDelete> Conns_;
+ TSocketPoller* Poller_ = nullptr;
+ const THttpServerOptions& Options;
+ };
+
+ static void* ListenSocketFunction(void* param) {
+ try {
+ ((TImpl*)param)->ListenSocket();
+ } catch (...) {
+
+ }
+
+ return nullptr;
+ }
+
+ TAutoPtr<TClientRequest> CreateRequest(TAutoPtr<TClientConnection> c) {
+ THolder<TClientRequest> obj(Cb_->CreateClient());
+
+ obj->Conn_.Reset(c.Release());
+
+ return obj;
+ }
+
+ void AddRequestFromSocket(const TSocket& s, TInstant now, NAddr::IRemoteAddrRef listenerSockAddrRef) {
+ if (MaxRequestsReached()) {
+ Cb_->OnMaxConn();
+ bool wasRemoved = Connections->RemoveOld(TInstant::Max());
+ if (!wasRemoved && Options_.RejectExcessConnections) {
+ (new TClientConnection(s, this, listenerSockAddrRef))->Reject();
+ return;
+ }
+ }
+
+ auto connection = new TClientConnection(s, this, listenerSockAddrRef);
+ connection->LastUsed = now;
+ connection->DeActivate();
+ }
+
+ void SaveErrorCode() {
+ ErrorCode = WSAGetLastError();
+ }
+
+ int GetErrorCode() const {
+ return ErrorCode;
+ }
+
+ const char* GetError() const {
+ return LastSystemErrorText(ErrorCode);
+ }
+
+ bool Start() {
+ Poller.Reset(new TSocketPoller());
+ Connections.Reset(new TConnections(Poller.Get(), Options_));
+
+ // Start the listener thread
+ ListenerRunningOK = false;
+
+ // throws on error
+ TPipeHandle::Pipe(ListenWakeupReadFd, ListenWakeupWriteFd);
+
+ SetNonBlock(ListenWakeupWriteFd, true);
+ SetNonBlock(ListenWakeupReadFd, true);
+
+ Poller->WaitRead(ListenWakeupReadFd, &WakeupPollAble);
+
+ ListenStartEvent.Reset();
+ try {
+ ListenThread.Reset(new TThread(ListenSocketFunction, this));
+ ListenThread->Start();
+ } catch (const yexception&) {
+ SaveErrorCode();
+ return false;
+ }
+
+ // Wait until the thread has completely started and return the success indicator
+ ListenStartEvent.Wait();
+
+ return ListenerRunningOK;
+ }
+
+ void Wait() {
+ Cb_->OnWait();
+ TGuard<TMutex> g(StopMutex);
+ if (ListenThread) {
+ ListenThread->Join();
+ ListenThread.Reset(nullptr);
+ }
+ }
+
+ void Stop() {
+ Shutdown();
+
+ TGuard<TMutex> g(StopMutex);
+ if (ListenThread) {
+ ListenThread->Join();
+ ListenThread.Reset(nullptr);
+ }
+
+ while (ConnectionCount) {
+ usleep(10000);
+ Connections->Clear();
+ }
+
+ Connections.Destroy();
+ Poller.Destroy();
+ }
+
+ void Shutdown() {
+ ListenWakeupWriteFd.Write("", 1);
+ // ignore result
+ }
+
+ void AddRequest(TAutoPtr<TClientRequest> req, bool fail) {
+ struct TFailRequest: public THttpClientRequestEx {
+ inline TFailRequest(TAutoPtr<TClientRequest> parent) {
+ Conn_.Reset(parent->Conn_.Release());
+ HttpConn_.Reset(parent->HttpConn_.Release());
+ }
+
+ bool Reply(void*) override {
+ if (!ProcessHeaders()) {
+ return true;
+ }
+
+ ProcessFailRequest(0);
+ return true;
+ }
+ };
+
+ if (!fail && Requests->Add(req.Get())) {
+ Y_UNUSED(req.Release());
+ } else {
+ req = new TFailRequest(req);
+
+ if (FailRequests->Add(req.Get())) {
+ Y_UNUSED(req.Release());
+ } else {
+ Cb_->OnFailRequest(-1);
+ }
+ }
+ }
+
+ size_t GetRequestQueueSize() const {
+ return Requests->Size();
+ }
+
+ size_t GetFailQueueSize() const {
+ return FailRequests->Size();
+ }
+
+ const IThreadPool& GetRequestQueue() const {
+ return *Requests;
+ }
+
+ const IThreadPool& GetFailQueue() const {
+ return *FailRequests;
+ }
+
+ class TListenSocket: public IPollAble, public TIntrusiveListItem<TListenSocket> {
+ public:
+ inline TListenSocket(const TSocket& s, TImpl* parent)
+ : S_(s)
+ , Server_(parent)
+ , SockAddrRef_(GetSockAddr(S_))
+ {
+ }
+
+ ~TListenSocket() override {
+ }
+
+ void OnPollEvent(TInstant) override {
+ SOCKET s = ::accept(S_, nullptr, nullptr);
+
+ if (s == INVALID_SOCKET) {
+ ythrow yexception() << "accept: " << LastSystemErrorText();
+ }
+
+ Server_->AddRequestFromSocket(s, TInstant::Now(), SockAddrRef_);
+ }
+
+ SOCKET GetSocket() const noexcept {
+ return S_;
+ }
+
+ private:
+ TSocket S_;
+ TImpl* Server_ = nullptr;
+ NAddr::IRemoteAddrRef SockAddrRef_;
+ };
+
+ void ListenSocket() {
+ TThread::SetCurrentThreadName(Options_.ListenThreadName.c_str());
+
+ ErrorCode = 0;
+ TIntrusiveListWithAutoDelete<TListenSocket, TDelete> Reqs;
+
+ std::function<void(TSocket)> callback = [&](TSocket socket) {
+ THolder<TListenSocket> ls(new TListenSocket(socket, this));
+ Poller->WaitRead(socket, static_cast<IPollAble*>(ls.Get()));
+ Reqs.PushBack(ls.Release());
+ };
+ bool addressesBound = TryToBindAddresses(Options_, &callback);
+ if (!addressesBound) {
+ SaveErrorCode();
+ ListenStartEvent.Signal();
+
+ return;
+ }
+
+ Requests->Start(Options_.nThreads, Options_.MaxQueueSize);
+ FailRequests->Start(Options_.nFThreads, Options_.MaxFQueueSize);
+ Cb_->OnListenStart();
+ ListenerRunningOK = true;
+ ListenStartEvent.Signal();
+
+ TVector<void*> events;
+ events.resize(1);
+
+ TInstant now = TInstant::Now();
+ for (;;) {
+ try {
+ const TInstant deadline = Options_.PollTimeout == TDuration::Zero() ? TInstant::Max() : now + Options_.PollTimeout;
+ const size_t ret = Poller->WaitD(events.data(), events.size(), deadline);
+
+ now = TInstant::Now();
+ for (size_t i = 0; i < ret; ++i) {
+ ((IPollAble*)events[i])->OnPollEvent(now);
+ }
+
+ if (ret == 0 && Options_.ExpirationTimeout > TDuration::Zero()) {
+ Connections->RemoveOld(now - Options_.ExpirationTimeout);
+ }
+
+ // When MaxConnections is limited or ExpirationTimeout is set, OnPollEvent can call
+ // RemoveOld and destroy other IPollAble* objects in the
+ // poller. Thus in this case we can safely process only one
+ // event from the poller at a time.
+ if (!Options_.MaxConnections && Options_.ExpirationTimeout == TDuration::Zero()) {
+ if (ret >= events.size()) {
+ events.resize(ret * 2);
+ }
+ }
+ } catch (const TShouldStop&) {
+ break;
+ } catch (...) {
+ Cb_->OnException();
+ }
+ }
+
+ while (!Reqs.Empty()) {
+ THolder<TListenSocket> ls(Reqs.PopFront());
+
+ Poller->Unwait(ls->GetSocket());
+ }
+
+ Requests->Stop();
+ FailRequests->Stop();
+ Cb_->OnListenStop();
+ }
+
+ void RestartRequestThreads(ui32 nTh, ui32 maxQS) {
+ Requests->Stop();
+ Options_.nThreads = nTh;
+ Options_.MaxQueueSize = maxQS;
+ Requests->Start(Options_.nThreads, Options_.MaxQueueSize);
+ }
+
+ TImpl(THttpServer* parent, ICallBack* cb, TMtpQueueRef mainWorkers, TMtpQueueRef failWorkers, const TOptions& options_)
+ : Requests(mainWorkers)
+ , FailRequests(failWorkers)
+ , Options_(options_)
+ , Cb_(cb)
+ , Parent_(parent)
+ {
+ }
+
+ TImpl(THttpServer* parent, ICallBack* cb, const TOptions& options, IThreadFactory* factory)
+ : TImpl(
+ parent,
+ cb,
+ MakeThreadPool<TSimpleThreadPool>(factory, options.UseElasticQueues, cb, options.RequestsThreadName),
+ MakeThreadPool<TThreadPool>(factory, options.UseElasticQueues, nullptr, options.FailRequestsThreadName),
+ options) {
+ }
+
+ ~TImpl() {
+ try {
+ Stop();
+ } catch (...) {
+ }
+ }
+
+ inline const TOptions& Options() const noexcept {
+ return Options_;
+ }
+
+ inline void DecreaseConnections() noexcept {
+ AtomicDecrement(ConnectionCount);
+ }
+
+ inline void IncreaseConnections() noexcept {
+ AtomicIncrement(ConnectionCount);
+ }
+
+ inline i64 GetClientCount() const {
+ return AtomicGet(ConnectionCount);
+ }
+
+ inline bool MaxRequestsReached() const {
+ return Options_.MaxConnections && ((size_t)GetClientCount() >= Options_.MaxConnections);
+ }
+
+ THolder<TThread> ListenThread;
+ TPipeHandle ListenWakeupReadFd;
+ TPipeHandle ListenWakeupWriteFd;
+ TSystemEvent ListenStartEvent;
+ TMtpQueueRef Requests;
+ TMtpQueueRef FailRequests;
+ TAtomic ConnectionCount = 0;
+ THolder<TSocketPoller> Poller;
+ THolder<TConnections> Connections;
+ bool ListenerRunningOK = false;
+ int ErrorCode = 0;
+ TOptions Options_;
+ ICallBack* Cb_ = nullptr;
+ THttpServer* Parent_ = nullptr;
+ TWakeupPollAble WakeupPollAble;
+ TMutex StopMutex;
+
+private:
+ template <class TThreadPool_>
+ static THolder<IThreadPool> MakeThreadPool(IThreadFactory* factory, bool elastic, ICallBack* callback = nullptr, const TString& threadName = {}) {
+ if (!factory) {
+ factory = SystemThreadFactory();
+ }
+
+ THolder<IThreadPool> pool;
+ const auto params = IThreadPool::TParams().SetFactory(factory).SetThreadName(threadName);
+ if (callback) {
+ pool = MakeHolder<TThreadPoolBinder<TThreadPool_, THttpServer::ICallBack>>(callback, params);
+ } else {
+ pool = MakeHolder<TThreadPool_>(params);
+ }
+
+ if (elastic) {
+ pool = MakeHolder<TElasticQueue>(std::move(pool));
+ }
+
+ return pool;
+ }
+};
+
+THttpServer::THttpServer(ICallBack* cb, const TOptions& options, IThreadFactory* pool)
+ : Impl_(new TImpl(this, cb, options, pool))
+{
+}
+
+THttpServer::THttpServer(ICallBack* cb, TMtpQueueRef mainWorkers, TMtpQueueRef failWorkers, const TOptions& options)
+ : Impl_(new TImpl(this, cb, mainWorkers, failWorkers, options))
+{
+}
+
+THttpServer::~THttpServer() {
+}
+
+i64 THttpServer::GetClientCount() const {
+ return Impl_->GetClientCount();
+}
+
+bool THttpServer::Start() {
+ return Impl_->Start();
+}
+
+void THttpServer::Stop() {
+ Impl_->Stop();
+}
+
+void THttpServer::Shutdown() {
+ Impl_->Shutdown();
+}
+
+void THttpServer::Wait() {
+ Impl_->Wait();
+}
+
+int THttpServer::GetErrorCode() {
+ return Impl_->GetErrorCode();
+}
+
+const char* THttpServer::GetError() {
+ return Impl_->GetError();
+}
+
+void THttpServer::RestartRequestThreads(ui32 n, ui32 queue) {
+ Impl_->RestartRequestThreads(n, queue);
+}
+
+const THttpServer::TOptions& THttpServer::Options() const noexcept {
+ return Impl_->Options();
+}
+
+size_t THttpServer::GetRequestQueueSize() const {
+ return Impl_->GetRequestQueueSize();
+}
+
+size_t THttpServer::GetFailQueueSize() const {
+ return Impl_->GetFailQueueSize();
+}
+
+const IThreadPool& THttpServer::GetRequestQueue() const {
+ return Impl_->GetRequestQueue();
+}
+
+const IThreadPool& THttpServer::GetFailQueue() const {
+ return Impl_->GetFailQueue();
+}
+
+bool THttpServer::MaxRequestsReached() const {
+ return Impl_->MaxRequestsReached();
+}
+
+TClientConnection::TClientConnection(const TSocket& s, THttpServer::TImpl* serv, NAddr::IRemoteAddrRef listenerSockAddrRef)
+ : Socket_(s)
+ , ListenerSockAddrRef_(listenerSockAddrRef)
+ , HttpServ_(serv)
+{
+ SetNoDelay(Socket_, true);
+
+ const TDuration& clientTimeout = HttpServ_->Options().ClientTimeout;
+ if (clientTimeout != TDuration::Zero()) {
+ SetSocketTimeout(Socket_, (long)clientTimeout.Seconds(), clientTimeout.MilliSecondsOfSecond());
+ }
+
+ HttpServ_->IncreaseConnections();
+}
+
+TClientConnection::~TClientConnection() {
+ HttpServ_->DecreaseConnections();
+}
+
+void TClientConnection::OnPollEvent(TInstant now) {
+ THolder<TClientConnection> this_(this);
+ Activate(now);
+
+ {
+ char tmp[1];
+
+ if (::recv(Socket_, tmp, 1, MSG_PEEK) < 1) {
+ /*
+ * We can received a FIN so our socket was moved to
+ * TCP_CLOSE_WAIT state. Check it before adding work
+ * for this socket.
+ */
+
+ return;
+ }
+ }
+
+ THolder<TClientRequest> obj(HttpServ_->CreateRequest(this_));
+ AcceptMoment = now;
+
+ HttpServ_->AddRequest(obj, Reject_);
+}
+
+void TClientConnection::Activate(TInstant now) noexcept {
+ HttpServ_->Connections->Erase(this, now);
+ LastUsed = now;
+ ++ReceivedRequests;
+}
+
+void TClientConnection::DeActivate() {
+ HttpServ_->Connections->Add(this);
+}
+
+void TClientConnection::Reject() {
+ Reject_ = true;
+
+ HttpServ_->Connections->Add(this);
+}
+
+TClientRequest::TClientRequest() {
+}
+
+TClientRequest::~TClientRequest() {
+}
+
+bool TClientRequest::Reply(void* /*ThreadSpecificResource*/) {
+ if (strnicmp(RequestString.data(), "GET ", 4)) {
+ Output() << "HTTP/1.0 501 Not Implemented\r\n\r\n";
+ } else {
+ Output() << "HTTP/1.0 200 OK\r\n"
+ "Content-Type: text/html\r\n"
+ "\r\n"
+ "Hello World!\r\n";
+ }
+
+ return true;
+}
+
+bool TClientRequest::IsLocal() const {
+ return HasLocalAddress(Socket());
+}
+
+bool TClientRequest::CheckLoopback() {
+ bool isLocal = false;
+
+ try {
+ isLocal = IsLocal();
+ } catch (const yexception& e) {
+ Output() << "HTTP/1.0 500 Oops\r\n\r\n"
+ << e.what() << "\r\n";
+ return false;
+ }
+
+ if (!isLocal) {
+ Output() << "HTTP/1.0 403 Permission denied\r\n"
+ "Content-Type: text/html; charset=windows-1251\r\n"
+ "Connection: close\r\n"
+ "\r\n"
+ "<html><head><title>Permission denied</title></head>"
+ "<body><h1>Permission denied</h1>"
+ "<p>This request must be sent from the localhost.</p>"
+ "</body></html>\r\n";
+
+ return false;
+ }
+
+ return true;
+}
+
+void TClientRequest::ReleaseConnection() {
+ if (Conn_ && HttpConn_ && HttpServ()->Options().KeepAliveEnabled && HttpConn_->CanBeKeepAlive() && (!HttpServ()->Options().RejectExcessConnections || !HttpServ()->MaxRequestsReached())) {
+ Output().Finish();
+ Conn_->DeActivate();
+ Y_UNUSED(Conn_.Release());
+ }
+}
+
+void TClientRequest::ResetConnection() {
+ if (HttpConn_) {
+ // send RST packet to client
+ HttpConn_->Reset();
+ HttpConn_.Destroy();
+ }
+}
+
+void TClientRequest::Process(void* ThreadSpecificResource) {
+ THolder<TClientRequest> this_(this);
+
+ auto* serverImpl = Conn_->HttpServ_;
+
+ try {
+ if (!HttpConn_) {
+ const size_t outputBufferSize = HttpServ()->Options().OutputBufferSize;
+ if (outputBufferSize) {
+ HttpConn_.Reset(new THttpServerConn(Socket(), outputBufferSize));
+ } else {
+ HttpConn_.Reset(new THttpServerConn(Socket()));
+ }
+
+ auto maxRequestsPerConnection = HttpServ()->Options().MaxRequestsPerConnection;
+ HttpConn_->Output()->EnableKeepAlive(HttpServ()->Options().KeepAliveEnabled && (!maxRequestsPerConnection || Conn_->ReceivedRequests < maxRequestsPerConnection));
+ HttpConn_->Output()->EnableCompression(HttpServ()->Options().CompressionEnabled);
+ }
+
+ if (ParsedHeaders.empty()) {
+ RequestString = Input().FirstLine();
+
+ const THttpHeaders& h = Input().Headers();
+ ParsedHeaders.reserve(h.Count());
+ for (THttpHeaders::TConstIterator it = h.Begin(); it != h.End(); ++it) {
+ ParsedHeaders.emplace_back(it->Name(), it->Value());
+ }
+ }
+
+ if (Reply(ThreadSpecificResource)) {
+ ReleaseConnection();
+
+ /*
+ * *this will be destroyed...
+ */
+
+ return;
+ }
+ } catch (...) {
+ serverImpl->Cb_->OnException();
+
+ throw;
+ }
+
+ Y_UNUSED(this_.Release());
+}
+
+void TClientRequest::ProcessFailRequest(int failstate) {
+ Output() << "HTTP/1.1 503 Service Unavailable\r\n"
+ "Content-Type: text/plain\r\n"
+ "Content-Length: 21\r\n"
+ "\r\n"
+ "Service Unavailable\r\n";
+
+ TString url;
+
+ if (!strnicmp(RequestString.data(), "GET ", 4)) {
+ // Trying to extract url...
+ const char* str = RequestString.data();
+
+ // Skipping spaces before url...
+ size_t start = 3;
+ while (str[start] == ' ') {
+ ++start;
+ }
+
+ if (str[start]) {
+ // Traversing url...
+ size_t idx = start;
+
+ while (str[idx] != ' ' && str[idx]) {
+ ++idx;
+ }
+
+ url = RequestString.substr(start, idx - start);
+ }
+ }
+
+ const THttpServer::ICallBack::TFailLogData d = {
+ failstate,
+ url,
+ };
+
+ // Handling failure...
+ Conn_->HttpServ_->Cb_->OnFailRequestEx(d);
+ Output().Flush();
+}
+
+THttpServer* TClientRequest::HttpServ() const noexcept {
+ return Conn_->HttpServ_->Parent_;
+}
+
+const TSocket& TClientRequest::Socket() const noexcept {
+ return Conn_->Socket_;
+}
+
+NAddr::IRemoteAddrRef TClientRequest::GetListenerSockAddrRef() const noexcept {
+ return Conn_->ListenerSockAddrRef_;
+}
+
+TInstant TClientRequest::AcceptMoment() const noexcept {
+ return Conn_->AcceptMoment;
+}
+
+/*
+ * TRequestReplier
+ */
+TRequestReplier::TRequestReplier() {
+}
+
+TRequestReplier::~TRequestReplier() {
+}
+
+bool TRequestReplier::Reply(void* threadSpecificResource) {
+ const TReplyParams params = {
+ threadSpecificResource, Input(), Output()};
+
+ return DoReply(params);
+}
+
+bool TryToBindAddresses(const THttpServerOptions& options, const std::function<void(TSocket)>* callbackOnBoundAddress) {
+ THttpServerOptions::TBindAddresses addrs;
+ try {
+ options.BindAddresses(addrs);
+ } catch (const std::exception&) {
+ return false;
+ }
+
+ for (const auto& na : addrs) {
+ for (TNetworkAddress::TIterator ai = na.Begin(); ai != na.End(); ++ai) {
+ NAddr::TAddrInfo addr(&*ai);
+
+ TSocket socket(::socket(addr.Addr()->sa_family, SOCK_STREAM, 0));
+
+ if (socket == INVALID_SOCKET) {
+ return false;
+ }
+
+ FixIPv6ListenSocket(socket);
+
+ if (options.ReuseAddress) {
+ int yes = 1;
+ ::setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, (const char*)&yes, sizeof(yes));
+ }
+
+ if (options.ReusePort) {
+ SetReusePort(socket, true);
+ }
+
+ if (::bind(socket, addr.Addr(), addr.Len()) == SOCKET_ERROR) {
+ return false;
+ }
+
+ if (::listen(socket, options.ListenBacklog) == SOCKET_ERROR) {
+ return false;
+ }
+
+ if (callbackOnBoundAddress != nullptr) {
+ (*callbackOnBoundAddress)(socket);
+ }
+ }
+ }
+
+ return true;
+}
diff --git a/library/cpp/http/server/http.h b/library/cpp/http/server/http.h
new file mode 100644
index 00000000000..b292d38f270
--- /dev/null
+++ b/library/cpp/http/server/http.h
@@ -0,0 +1,176 @@
+#pragma once
+
+#include "conn.h"
+#include "options.h"
+
+#include <util/thread/pool.h>
+#include <library/cpp/http/io/stream.h>
+#include <util/memory/blob.h>
+#include <util/generic/ptr.h>
+#include <util/generic/vector.h>
+#include <util/system/atomic.h>
+
+class IThreadFactory;
+class TClientRequest;
+class TClientConnection;
+
+class THttpServer {
+ friend class TClientRequest;
+ friend class TClientConnection;
+
+public:
+ class ICallBack {
+ public:
+ struct TFailLogData {
+ int failstate;
+ TString url;
+ };
+
+ virtual ~ICallBack() {
+ }
+
+ virtual void OnFailRequest(int /*failstate*/) {
+ }
+
+ virtual void OnFailRequestEx(const TFailLogData& d) {
+ OnFailRequest(d.failstate);
+ }
+
+ virtual void OnException() {
+ }
+
+ virtual void OnMaxConn() {
+ }
+
+ virtual TClientRequest* CreateClient() = 0;
+
+ virtual void OnListenStart() {
+ }
+
+ virtual void OnListenStop() {
+ }
+
+ virtual void OnWait() {
+ }
+
+ virtual void* CreateThreadSpecificResource() {
+ return nullptr;
+ }
+
+ virtual void DestroyThreadSpecificResource(void*) {
+ }
+ };
+
+ typedef THttpServerOptions TOptions;
+ typedef TSimpleSharedPtr<IThreadPool> TMtpQueueRef;
+
+ THttpServer(ICallBack* cb, const TOptions& options = TOptions(), IThreadFactory* pool = nullptr);
+ THttpServer(ICallBack* cb, TMtpQueueRef mainWorkers, TMtpQueueRef failWorkers, const TOptions& options = TOptions());
+ virtual ~THttpServer();
+
+ bool Start();
+
+ // shutdown a.s.a.p.
+ void Stop();
+
+ // graceful shutdown with serving all already open connections
+ void Shutdown();
+
+ void Wait();
+ int GetErrorCode();
+ const char* GetError();
+ void RestartRequestThreads(ui32 nTh, ui32 maxQS);
+ const TOptions& Options() const noexcept;
+ i64 GetClientCount() const;
+
+ class TImpl;
+ size_t GetRequestQueueSize() const;
+ size_t GetFailQueueSize() const;
+
+ const IThreadPool& GetRequestQueue() const;
+ const IThreadPool& GetFailQueue() const;
+
+ static TAtomicBase AcceptReturnsInvalidSocketCounter();
+
+private:
+ bool MaxRequestsReached() const;
+
+private:
+ THolder<TImpl> Impl_;
+};
+
+/**
+ * @deprecated Use TRequestReplier instead
+ */
+class TClientRequest: public IObjectInQueue {
+ friend class THttpServer::TImpl;
+
+public:
+ TClientRequest();
+ ~TClientRequest() override;
+
+ inline THttpInput& Input() noexcept {
+ return *HttpConn_->Input();
+ }
+
+ inline THttpOutput& Output() noexcept {
+ return *HttpConn_->Output();
+ }
+
+ THttpServer* HttpServ() const noexcept;
+ const TSocket& Socket() const noexcept;
+ NAddr::IRemoteAddrRef GetListenerSockAddrRef() const noexcept;
+ TInstant AcceptMoment() const noexcept;
+
+ bool IsLocal() const;
+ bool CheckLoopback();
+ void ProcessFailRequest(int failstate);
+
+ void ReleaseConnection();
+
+ void ResetConnection();
+
+private:
+ /*
+ * Processes the request after 'connection' been created and 'Headers' been read
+ * Returns 'false' if the processing must be continued by the next handler,
+ * 'true' otherwise ('this' will be deleted)
+ */
+ virtual bool Reply(void* ThreadSpecificResource);
+ void Process(void* ThreadSpecificResource) override;
+
+public:
+ TVector<std::pair<TString, TString>> ParsedHeaders;
+ TString RequestString;
+
+private:
+ THolder<TClientConnection> Conn_;
+ THolder<THttpServerConn> HttpConn_;
+};
+
+class TRequestReplier: public TClientRequest {
+public:
+ TRequestReplier();
+ ~TRequestReplier() override;
+
+ struct TReplyParams {
+ void* ThreadSpecificResource;
+ THttpInput& Input;
+ THttpOutput& Output;
+ };
+
+ /*
+ * Processes the request after 'connection' been created and 'Headers' been read
+ * Returns 'false' if the processing must be continued by the next handler,
+ * 'true' otherwise ('this' will be deleted)
+ */
+ virtual bool DoReply(const TReplyParams& params) = 0;
+
+private:
+ bool Reply(void* threadSpecificResource) final;
+
+ using TClientRequest::Input;
+ using TClientRequest::Output;
+};
+
+bool TryToBindAddresses(const THttpServerOptions& options, const std::function<void(TSocket)>* callbackOnBoundAddress = nullptr);
diff --git a/library/cpp/http/server/http_ex.cpp b/library/cpp/http/server/http_ex.cpp
new file mode 100644
index 00000000000..e07db22bfc8
--- /dev/null
+++ b/library/cpp/http/server/http_ex.cpp
@@ -0,0 +1,107 @@
+#include "http_ex.h"
+
+#include <util/generic/buffer.h>
+#include <util/generic/cast.h>
+#include <util/stream/null.h>
+
+bool THttpClientRequestExtension::Parse(char* req, TBaseServerRequestData& rd) {
+ rd.SetSocket(Socket());
+
+ if (!rd.Parse(req)) {
+ Output() << "HTTP/1.1 403 Forbidden\r\n"
+ "Content-Type: text/plain\r\n"
+ "Content-Length: 39\r\n"
+ "\r\n"
+ "The server cannot be used as a proxy.\r\n";
+
+ return false;
+ }
+
+ return true;
+}
+
+bool THttpClientRequestExtension::ProcessHeaders(TBaseServerRequestData& rd, TBlob& postData) {
+ for (const auto& header : ParsedHeaders) {
+ rd.AddHeader(header.first, header.second);
+ }
+
+ char* s = RequestString.begin();
+
+ enum EMethod {
+ NotImplemented,
+ Get,
+ Post,
+ Put,
+ Patch,
+ Delete,
+ };
+
+ enum EMethod foundMethod;
+ char* urlStart;
+
+ if (strnicmp(s, "GET ", 4) == 0) {
+ foundMethod = Get;
+ urlStart = s + 4;
+ } else if (strnicmp(s, "POST ", 5) == 0) {
+ foundMethod = Post;
+ urlStart = s + 5;
+ } else if (strnicmp(s, "PUT ", 4) == 0) {
+ foundMethod = Put;
+ urlStart = s + 4;
+ } else if (strnicmp(s, "PATCH ", 6) == 0) {
+ foundMethod = Patch;
+ urlStart = s + 6;
+ } else if (strnicmp(s, "DELETE ", 7) == 0) {
+ foundMethod = Delete;
+ urlStart = s + 7;
+ } else {
+ foundMethod = NotImplemented;
+ }
+
+ switch (foundMethod) {
+ case Get:
+ case Delete:
+ if (!Parse(urlStart, rd)) {
+ return false;
+ }
+ break;
+
+ case Post:
+ case Put:
+ case Patch:
+ try {
+ ui64 contentLength = 0;
+ if (Input().HasExpect100Continue()) {
+ Output().SendContinue();
+ }
+
+ if (!Input().ContentEncoded() && Input().GetContentLength(contentLength)) {
+ if (contentLength > HttpServ()->Options().MaxInputContentLength) {
+ Output() << "HTTP/1.1 413 Payload Too Large\r\nContent-Length:0\r\n\r\n";
+ Output().Finish();
+ return false;
+ }
+
+ TBuffer buf(SafeIntegerCast<size_t>(contentLength));
+ buf.Resize(Input().Load(buf.Data(), (size_t)contentLength));
+ postData = TBlob::FromBuffer(buf);
+ } else {
+ postData = TBlob::FromStream(Input());
+ }
+ } catch (...) {
+ Output() << "HTTP/1.1 400 Bad request\r\n\r\n";
+ return false;
+ }
+
+ if (!Parse(urlStart, rd)) {
+ return false;
+ }
+ break;
+
+ case NotImplemented:
+ Output() << "HTTP/1.1 501 Not Implemented\r\n\r\n";
+ return false;
+ }
+
+ return true;
+}
diff --git a/library/cpp/http/server/http_ex.h b/library/cpp/http/server/http_ex.h
new file mode 100644
index 00000000000..1ef43ea4fd5
--- /dev/null
+++ b/library/cpp/http/server/http_ex.h
@@ -0,0 +1,28 @@
+#pragma once
+
+#include "http.h"
+
+#include <library/cpp/http/misc/httpreqdata.h>
+
+class THttpClientRequestExtension: public TClientRequest {
+public:
+ bool Parse(char* req, TBaseServerRequestData& rd);
+ bool ProcessHeaders(TBaseServerRequestData& rd, TBlob& postData);
+};
+
+template <class TRequestData>
+class THttpClientRequestExtImpl: public THttpClientRequestExtension {
+protected:
+ bool Parse(char* req) {
+ return THttpClientRequestExtension::Parse(req, RD);
+ }
+ bool ProcessHeaders() {
+ return THttpClientRequestExtension::ProcessHeaders(RD, Buf);
+ }
+
+protected:
+ TRequestData RD;
+ TBlob Buf;
+};
+
+using THttpClientRequestEx = THttpClientRequestExtImpl<TServerRequestData>;
diff --git a/library/cpp/http/server/http_ut.cpp b/library/cpp/http/server/http_ut.cpp
new file mode 100644
index 00000000000..cc62bb988e7
--- /dev/null
+++ b/library/cpp/http/server/http_ut.cpp
@@ -0,0 +1,739 @@
+#include "http.h"
+#include "http_ex.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+#include <library/cpp/testing/unittest/tests_data.h>
+
+#include <util/generic/cast.h>
+#include <util/stream/output.h>
+#include <util/stream/zlib.h>
+#include <util/system/datetime.h>
+#include <util/system/sem.h>
+
+Y_UNIT_TEST_SUITE(THttpServerTest) {
+ class TEchoServer: public THttpServer::ICallBack {
+ class TRequest: public THttpClientRequestEx {
+ public:
+ inline TRequest(TEchoServer* parent)
+ : Parent_(parent)
+ {
+ }
+
+ bool Reply(void* /*tsr*/) override {
+ if (!ProcessHeaders()) {
+ return true;
+ }
+
+ Output() << "HTTP/1.1 200 Ok\r\n\r\n";
+ if (Buf.Size()) {
+ Output().Write(Buf.AsCharPtr(), Buf.Size());
+ } else {
+ Output() << Parent_->Res_;
+ }
+ Output().Finish();
+
+ return true;
+ }
+
+ private:
+ TEchoServer* Parent_ = nullptr;
+ };
+
+ public:
+ inline TEchoServer(const TString& res)
+ : Res_(res)
+ {
+ }
+
+ TClientRequest* CreateClient() override {
+ return new TRequest(this);
+ }
+
+ private:
+ TString Res_;
+ };
+
+ class TSleepingServer: public THttpServer::ICallBack {
+ class TReplier: public TRequestReplier {
+ public:
+ inline TReplier(TSleepingServer* server)
+ : Server(server)
+ {
+ }
+
+ bool DoReply(const TReplyParams& params) override {
+ Server->FreeThread();
+ Server->Busy(1);
+ params.Output.Write("HTTP/1.0 201 Created\nX-Server: sleeping server\n\nZoooo");
+ params.Output.Finish();
+ Server->Replies->Inc();
+ return true;
+ }
+
+ private:
+ TSleepingServer* Server = nullptr;
+ };
+
+ public:
+ inline TSleepingServer(unsigned int size)
+ : Semaphore("conns", size)
+ , Semaphore2("threads", 1)
+ , Replies(new TAtomicCounter())
+ , MaxConns(new TAtomicCounter())
+ {
+ }
+
+ void ResetCounters() {
+ Replies.Reset(new TAtomicCounter());
+ MaxConns.Reset(new TAtomicCounter());
+ }
+
+ long RepliesCount() const {
+ return Replies->Val();
+ }
+
+ long MaxConnsCount() const {
+ return MaxConns->Val();
+ }
+
+ TClientRequest* CreateClient() override {
+ return new TReplier(this);
+ }
+
+ void OnMaxConn() override {
+ MaxConns->Inc();
+ }
+
+ void OnFailRequest(int) override {
+ FreeThread();
+ Busy(1);
+ }
+
+ void Busy(int count) {
+ while (count-- > 0) {
+ Semaphore.Acquire();
+ }
+ }
+
+ void BusyThread() {
+ Semaphore2.Acquire();
+ }
+
+ void Free(int count) {
+ while (count-- > 0) {
+ Semaphore.Release();
+ }
+ }
+
+ void FreeThread() {
+ Semaphore2.Release();
+ }
+
+ private:
+ TSemaphore Semaphore;
+ TSemaphore Semaphore2;
+ THolder<TAtomicCounter> Replies;
+ THolder<TAtomicCounter> MaxConns;
+ };
+
+ static const TString CrLf = "\r\n";
+
+ struct TTestRequest {
+ TTestRequest(ui16 port, TString content = TString())
+ : Port(port)
+ , Content(std::move(content))
+ {
+ }
+
+ void CheckContinue(TSocketInput& si) {
+ if (Expect100Continue) {
+ TStringStream ss;
+ TString firstLine;
+ si.ReadLine(firstLine);
+ for (;;) {
+ TString buf;
+ si.ReadLine(buf);
+ if (buf.size() == 0) {
+ break;
+ }
+ ss << buf << CrLf;
+ }
+ UNIT_ASSERT_EQUAL(firstLine, "HTTP/1.1 100 Continue");
+ }
+ }
+
+ TString Execute() {
+ TSocket* s = nullptr;
+ THolder<TSocket> singleReqSocket;
+ if (KeepAliveConnection) {
+ if (!KeepAlivedSocket) {
+ KeepAlivedSocket = MakeHolder<TSocket>(TNetworkAddress("localhost", Port), TDuration::Seconds(10));
+ }
+ s = KeepAlivedSocket.Get();
+ } else {
+ TNetworkAddress addr("localhost", Port);
+ singleReqSocket.Reset(new TSocket(addr, TDuration::Seconds(10)));
+ s = singleReqSocket.Get();
+ }
+ bool isPost = Type == "POST";
+ TSocketInput si(*s);
+
+ if (UseHttpOutput) {
+ TSocketOutput so(*s);
+ THttpOutput output(&so);
+
+ output.EnableKeepAlive(KeepAliveConnection);
+ output.EnableCompression(EnableResponseEncoding);
+
+ TStringStream r;
+ r << Type << " / HTTP/1.1" << CrLf;
+ r << "Host: localhost:" + ToString(Port) << CrLf;
+ if (isPost) {
+ if (ContentEncoding.size()) {
+ r << "Content-Encoding: " << ContentEncoding << CrLf;
+ } else {
+ r << "Transfer-Encoding: chunked" << CrLf;
+ }
+ if (Expect100Continue) {
+ r << "Expect: 100-continue" << CrLf;
+ }
+ }
+
+ r << CrLf;
+ if (isPost) {
+ output.Write(r.Str());
+ output.Flush();
+ CheckContinue(si);
+ output.Write(Content);
+ output.Finish();
+ } else {
+ output.Write(r.Str());
+ output.Finish();
+ }
+ } else {
+ TStringStream r;
+ r << Type << " / HTTP/1.1" << CrLf;
+ r << "Host: localhost:" + ToString(Port) << CrLf;
+ if (KeepAliveConnection) {
+ r << "Connection: Keep-Alive" << CrLf;
+ } else {
+ r << "Connection: Close" << CrLf;
+ }
+ if (EnableResponseEncoding) {
+ r << "Accept-Encoding: gzip, deflate, x-gzip, x-deflate, y-lzo, y-lzf, y-lzq, y-bzip2, y-lzma" << CrLf;
+ }
+ if (isPost && Expect100Continue) {
+ r << "Expect: 100-continue" << CrLf;
+ }
+ if (isPost && ContentEncoding.size() && Content.size()) {
+ r << "Content-Encoding: " << ContentEncoding << CrLf;
+ TStringStream compressedContent;
+ {
+ TZLibCompress zlib(&compressedContent);
+ zlib.Write(Content.data(), Content.size());
+ zlib.Flush();
+ zlib.Finish();
+ }
+ r << "Content-Length: " << compressedContent.Size() << CrLf;
+ r << CrLf;
+ s->Send(r.Data(), r.Size());
+ CheckContinue(si);
+ Hdr = r.Str();
+ TString tosend = compressedContent.Str();
+ s->Send(tosend.data(), tosend.size());
+ } else {
+ if (isPost) {
+ r << "Content-Length: " << Content.size() << CrLf;
+ r << CrLf;
+ s->Send(r.Data(), r.Size());
+ CheckContinue(si);
+ Hdr = r.Str();
+ s->Send(Content.data(), Content.size());
+ } else {
+ r << CrLf;
+ Hdr = r.Str();
+ s->Send(r.Data(), r.Size());
+ }
+ }
+ }
+
+ THttpInput input(&si);
+ TStringStream ss;
+ TransferData(&input, &ss);
+
+ return ss.Str();
+ }
+
+ TString GetDescription() const {
+ if (UseHttpOutput) {
+ TStringStream ss;
+ ss << (KeepAliveConnection ? "keep-alive " : "") << Type;
+ if (ContentEncoding.size()) {
+ ss << " with encoding=" << ContentEncoding;
+ }
+ return ss.Str();
+ } else {
+ return Hdr;
+ }
+ }
+
+ ui16 Port = 0;
+ bool UseHttpOutput = true;
+ TString Type = "GET";
+ TString ContentEncoding;
+ TString Content;
+ bool KeepAliveConnection = false;
+ THolder<TSocket> KeepAlivedSocket;
+ bool EnableResponseEncoding = false;
+ TString Hdr;
+ bool Expect100Continue = false;
+ };
+
+ class TFailingMtpQueue: public TSimpleThreadPool {
+ private:
+ bool FailOnAdd_ = false;
+
+ public:
+ void SetFailOnAdd(bool fail = true) {
+ FailOnAdd_ = fail;
+ }
+ [[nodiscard]] bool Add(IObjectInQueue* pObj) override {
+ if (FailOnAdd_) {
+ return false;
+ }
+
+ return TSimpleThreadPool::Add(pObj);
+ }
+ TFailingMtpQueue() = default;
+ TFailingMtpQueue(IThreadFactory* pool)
+ : TSimpleThreadPool(pool)
+ {
+ }
+ };
+
+ TString TestData(size_t size = 5 * 4096) {
+ TString res;
+
+ for (size_t i = 0; i < size; ++i) {
+ res += (char)i;
+ }
+ return res;
+ }
+
+ Y_UNIT_TEST(TestEchoServer) {
+ TString res = TestData();
+ TPortManager pm;
+ const ui16 port = pm.GetPort();
+ const bool trueFalse[] = {true, false};
+
+ TEchoServer serverImpl(res);
+ THttpServer server(&serverImpl, THttpServer::TOptions(port).EnableKeepAlive(true).EnableCompression(true));
+
+ for (int i = 0; i < 2; ++i) {
+ UNIT_ASSERT(server.Start());
+
+ TTestRequest r(port);
+ r.Content = res;
+
+ for (bool keepAlive : trueFalse) {
+ r.KeepAliveConnection = keepAlive;
+
+ // THttpOutput use chunked stream, else use Content-Length
+ for (bool useHttpOutput : trueFalse) {
+ r.UseHttpOutput = useHttpOutput;
+
+ for (bool enableResponseEncoding : trueFalse) {
+ r.EnableResponseEncoding = enableResponseEncoding;
+
+ const TString reqTypes[] = {"GET", "POST"};
+ for (const TString& reqType : reqTypes) {
+ r.Type = reqType;
+
+ const TString encoders[] = {"", "deflate"};
+ for (const TString& encoder : encoders) {
+ r.ContentEncoding = encoder;
+
+ for (bool expect100Continue : trueFalse) {
+ r.Expect100Continue = expect100Continue;
+ TString resp = r.Execute();
+ UNIT_ASSERT_C(resp == res, "diff echo response for request:\n" + r.GetDescription());
+ }
+ }
+ }
+ }
+ }
+ }
+
+ server.Stop();
+ }
+ }
+
+ Y_UNIT_TEST(TestReusePortEnabled) {
+ if (!IsReusePortAvailable()) {
+ return; // skip test
+ }
+ TString res = TestData();
+ TPortManager pm;
+ const ui16 port = pm.GetPort();
+
+ TEchoServer serverImpl(res);
+ TVector<THolder<THttpServer>> servers;
+ for (ui32 i = 0; i < 10; i++) {
+ servers.push_back(MakeHolder<THttpServer>(&serverImpl, THttpServer::TOptions(port).EnableReusePort(true)));
+ }
+
+ for (ui32 testRun = 0; testRun < 3; testRun++) {
+ for (auto& server : servers) {
+ // start servers one at a time and check at least one of them is replying
+ UNIT_ASSERT(server->Start());
+
+ TTestRequest r(port, res);
+ UNIT_ASSERT_C(r.Execute() == res, "diff echo response for request:\n" + r.GetDescription());
+ }
+
+ for (auto& server : servers) {
+ // ping servers and stop them one at a time
+ // at the last iteration only one server is still working and then gets stopped as well
+
+ TTestRequest r(port, res);
+ UNIT_ASSERT_C(r.Execute() == res, "diff echo response for request:\n" + r.GetDescription());
+
+ server->Stop();
+ }
+ }
+ }
+
+ Y_UNIT_TEST(TestReusePortDisabled) {
+ // check that with the ReusePort option disabled it's impossible to start two servers on the same port
+ // check that ReusePort option is disabled by default (don't set it explicitly in the test)
+ TPortManager pm;
+ const ui16 port = pm.GetPort();
+
+ TEchoServer serverImpl(TString{});
+ THttpServer server1(&serverImpl, THttpServer::TOptions(port));
+ THttpServer server2(&serverImpl, THttpServer::TOptions(port));
+
+ UNIT_ASSERT(true == server1.Start());
+ UNIT_ASSERT(false == server2.Start());
+
+ server1.Stop();
+ // Stop() is a sync call, port should be free by now
+ UNIT_ASSERT(true == server2.Start());
+ UNIT_ASSERT(false == server1.Start());
+ }
+
+ Y_UNIT_TEST(TestFailServer) {
+ /**
+ * Emulate request processing failures
+ * Data should be large enough not to fit into socket buffer
+ **/
+ TString res = TestData(10 * 1024 * 1024);
+ TPortManager portManager;
+ const ui16 port = portManager.GetPort();
+ TEchoServer serverImpl(res);
+ THttpServer::TOptions options(port);
+ options.EnableKeepAlive(true);
+ options.EnableCompression(true);
+ using TFailingServerMtpQueue = TThreadPoolBinder<TFailingMtpQueue, THttpServer::ICallBack>;
+ THttpServer::TMtpQueueRef mainWorkers = new TFailingServerMtpQueue(&serverImpl, SystemThreadFactory());
+ THttpServer::TMtpQueueRef failWorkers = new TThreadPool(SystemThreadFactory());
+ THttpServer server(&serverImpl, mainWorkers, failWorkers, options);
+
+ UNIT_ASSERT(server.Start());
+ for (size_t i = 0; i < 3; ++i) {
+ // should fail on 2nd request
+ static_cast<TFailingMtpQueue*>(mainWorkers.Get())->SetFailOnAdd(i == 1);
+ TTestRequest r(port);
+ r.Content = res;
+ r.Type = "POST";
+ TString resp = r.Execute();
+ if (i == 1) {
+ UNIT_ASSERT(resp.Contains("Service Unavailable"));
+ } else {
+ UNIT_ASSERT_C(resp == res, "diff echo response for request:\n" + r.GetDescription());
+ }
+ }
+ server.Stop();
+ }
+
+ class TReleaseConnectionServer: public THttpServer::ICallBack {
+ class TRequest: public THttpClientRequestEx {
+ public:
+ bool Reply(void* /*tsr*/) override {
+ Output() << "HTTP/1.1 200 Ok\r\n\r\n";
+ Output() << "reply";
+ Output().Finish();
+
+ ReleaseConnection();
+
+ throw yexception() << "some error";
+
+ return true;
+ }
+ };
+
+ public:
+ TClientRequest* CreateClient() override {
+ return new TRequest();
+ }
+
+ void OnException() override {
+ ExceptionMessage = CurrentExceptionMessage();
+ }
+
+ TString ExceptionMessage;
+ };
+
+ class TResetConnectionServer: public THttpServer::ICallBack {
+ class TRequest: public TClientRequest {
+ public:
+ bool Reply(void* /*tsr*/) override {
+ Output() << "HTTP/1.1";
+ ResetConnection();
+
+ return true;
+ }
+ };
+
+ public:
+ TClientRequest* CreateClient() override {
+ return new TRequest();
+ }
+
+ void OnException() override {
+ ExceptionMessage = CurrentExceptionMessage();
+ }
+
+ TString ExceptionMessage;
+ };
+
+ class TListenerSockAddrReplyServer: public THttpServer::ICallBack {
+ class TRequest: public TClientRequest {
+ public:
+ bool Reply(void* /*tsr*/) override {
+ Output() << "HTTP/1.1 200 Ok\r\n\r\n";
+ Output() << PrintHostAndPort(*GetListenerSockAddrRef());
+
+ Output().Finish();
+
+ return true;
+ }
+ };
+
+ public:
+ TClientRequest* CreateClient() override {
+ return new TRequest();
+ }
+ };
+
+ Y_UNIT_TEST(TTestResetConnection) {
+ TPortManager pm;
+ const ui16 port = pm.GetPort();
+
+ TResetConnectionServer serverImpl;
+ THttpServer server(&serverImpl, THttpServer::TOptions(port));
+ UNIT_ASSERT(server.Start());
+
+ TTestRequest r(port, "request");
+
+ UNIT_ASSERT_EXCEPTION_CONTAINS(r.Execute(), TSystemError, "Connection reset by peer");
+
+ server.Stop();
+ };
+
+ Y_UNIT_TEST(TTestReleaseConnection) {
+ TPortManager pm;
+ const ui16 port = pm.GetPort();
+
+ TReleaseConnectionServer serverImpl;
+ THttpServer server(&serverImpl, THttpServer::TOptions(port).EnableKeepAlive(true));
+ UNIT_ASSERT(server.Start());
+
+ TTestRequest r(port, "request");
+ r.KeepAliveConnection = true;
+
+ UNIT_ASSERT_C(r.Execute() == "reply", "diff echo response for request:\n" + r.GetDescription());
+
+ server.Stop();
+
+ UNIT_ASSERT_STRINGS_EQUAL(serverImpl.ExceptionMessage, "(yexception) some error");
+ };
+
+ THttpInput SendRequest(TSocket& socket, ui16 port) {
+ TSocketInput si(socket);
+ TSocketOutput so(socket);
+ THttpOutput out(&so);
+ out.EnableKeepAlive(true);
+ out << "GET / HTTP/1.1" << CrLf;
+ out << "Host: localhost:" + ToString(port) << CrLf;
+ out << CrLf;
+ out.Flush();
+
+ THttpInput input(&si);
+ input.ReadAll();
+ return input;
+ }
+
+ THttpInput SendRequestWithBody(TSocket& socket, ui16 port, TString body) {
+ TSocketInput si(socket);
+ TSocketOutput so(socket);
+ THttpOutput out(&so);
+ out << "POST / HTTP/1.1" << CrLf;
+ out << "Host: localhost:" + ToString(port) << CrLf;
+ out << "Content-Length: " + ToString(body.size()) << CrLf;
+ out << CrLf;
+ out << body;
+ out.Flush();
+
+ THttpInput input(&si);
+ input.ReadAll();
+ return input;
+ }
+
+ Y_UNIT_TEST(TTestExpirationTimeout) {
+ TPortManager pm;
+ const ui16 port = pm.GetPort();
+
+ TEchoServer serverImpl("test_data");
+ THttpServer::TOptions options(port);
+ options.nThreads = 1;
+ options.MaxQueueSize = 0;
+ options.MaxConnections = 0;
+ options.KeepAliveEnabled = true;
+ options.ExpirationTimeout = TDuration::Seconds(1);
+ options.PollTimeout = TDuration::MilliSeconds(100);
+ THttpServer server(&serverImpl, options);
+ UNIT_ASSERT(server.Start());
+
+ TSocket socket(TNetworkAddress("localhost", port), TDuration::Seconds(10));
+
+ SendRequest(socket, port);
+ SendRequest(socket, port);
+
+ Sleep(TDuration::Seconds(5));
+ UNIT_ASSERT_EXCEPTION(SendRequest(socket, port), THttpReadException);
+
+ server.Stop();
+ }
+
+ Y_UNIT_TEST(TTestContentLengthTooLarge) {
+ TPortManager pm;
+ const ui16 port = pm.GetPort();
+
+ TEchoServer serverImpl("test_data");
+ THttpServer::TOptions options(port);
+ options.nThreads = 1;
+ options.MaxQueueSize = 0;
+ options.MaxInputContentLength = 2_KB;
+ options.MaxConnections = 0;
+ options.KeepAliveEnabled = false;
+ options.ExpirationTimeout = TDuration::Seconds(1);
+ options.PollTimeout = TDuration::MilliSeconds(100);
+ THttpServer server(&serverImpl, options);
+ UNIT_ASSERT(server.Start());
+
+ TSocket socket(TNetworkAddress("localhost", port), TDuration::Seconds(5));
+ UNIT_ASSERT_STRING_CONTAINS(SendRequestWithBody(socket, port, TString(1_KB, 'a')).FirstLine(), "HTTP/1.1 200 Ok");
+
+ TSocket socket2(TNetworkAddress("localhost", port), TDuration::Seconds(5));
+ UNIT_ASSERT_STRING_CONTAINS(SendRequestWithBody(socket2, port, TString(10_KB, 'a')).FirstLine(), "HTTP/1.1 413 Payload Too Large");
+
+ server.Stop();
+ }
+
+
+ Y_UNIT_TEST(TTestCloseConnectionOnRequestLimit) {
+ TPortManager pm;
+ const ui16 port = pm.GetPort();
+
+ TEchoServer serverImpl("test_data");
+ THttpServer server(&serverImpl, THttpServer::TOptions(port).EnableKeepAlive(true).SetMaxRequestsPerConnection(2));
+ UNIT_ASSERT(server.Start());
+
+ TSocket socket(TNetworkAddress("localhost", port), TDuration::Seconds(10));
+
+ UNIT_ASSERT(SendRequest(socket, port).IsKeepAlive());
+ UNIT_ASSERT(!SendRequest(socket, port).IsKeepAlive());
+
+ UNIT_ASSERT_EXCEPTION(SendRequest(socket, port), THttpReadException);
+
+ server.Stop();
+ }
+
+ Y_UNIT_TEST(TTestListenerSockAddrConnection) {
+ TPortManager pm;
+ const ui16 port1 = pm.GetPort();
+ const ui16 port2 = pm.GetPort();
+
+ TListenerSockAddrReplyServer serverImpl;
+ THttpServer server(&serverImpl, THttpServer::TOptions().EnableKeepAlive(true).AddBindAddress("127.0.0.1", port1).AddBindAddress("127.0.0.1", port2));
+ UNIT_ASSERT(server.Start());
+
+ TTestRequest r1(port1);
+ r1.KeepAliveConnection = true;
+
+ TString resp = r1.Execute();
+ UNIT_ASSERT(resp == TString::Join("127.0.0.1", ":", ToString(port1)));
+
+ TTestRequest r2(port2);
+ r2.KeepAliveConnection = true;
+
+ resp = r2.Execute();
+ UNIT_ASSERT(resp == TString::Join("127.0.0.1", ":", ToString(port2)));
+
+ server.Stop();
+ };
+
+#if 0
+ Y_UNIT_TEST(TestSocketsLeak) {
+ const bool trueFalse[] = {true, false};
+ TPortManager portManager;
+ const ui16 port = portManager.GetPort();
+ TString res = TestData(25);
+ TSleepingServer server(3);
+ THttpServer::TOptions options(port);
+ options.MaxConnections = 1;
+ options.MaxQueueSize = 1;
+ options.MaxFQueueSize = 2;
+ options.nFThreads = 2;
+ options.KeepAliveEnabled = true;
+ options.RejectExcessConnections = true;
+ THttpServer srv(&server, options);
+ UNIT_ASSERT(srv.Start());
+
+ for (bool keepAlive : trueFalse) {
+ server.ResetCounters();
+ TVector<TAutoPtr<IThreadFactory::IThread>> threads;
+
+ server.Busy(3);
+ server.BusyThread();
+
+ for (size_t i = 0; i < 3; ++i) {
+ auto func = [&server, port, keepAlive]() {
+ server.BusyThread();
+ THolder<TTestRequest> r = MakeHolder<TTestRequest>(port);
+ r->KeepAliveConnection = keepAlive;
+ r->Execute();
+ };
+ threads.push_back(SystemThreadFactory()->Run(func));
+ }
+
+ server.FreeThread(); // all threads get connection & go to processing
+ Sleep(TDuration::MilliSeconds(100));
+ server.BusyThread(); // we wait while connections are established by the
+ // system and accepted by the server
+ server.Free(3); // we release all connections processing
+
+ for (auto&& thread : threads) {
+ thread->Join();
+ }
+
+ server.Free(3);
+ server.FreeThread();
+
+ UNIT_ASSERT_EQUAL_C(server.MaxConnsCount(), 2, "we should get MaxConn notification 2 times, got " + ToString(server.MaxConnsCount()));
+ UNIT_ASSERT_EQUAL_C(server.RepliesCount(), 1, "only one request should have been processed, got " + ToString(server.RepliesCount()));
+ }
+ }
+#endif
+}
diff --git a/library/cpp/http/server/options.cpp b/library/cpp/http/server/options.cpp
new file mode 100644
index 00000000000..05c954384a2
--- /dev/null
+++ b/library/cpp/http/server/options.cpp
@@ -0,0 +1,43 @@
+#include "options.h"
+
+#include <util/string/cast.h>
+#include <util/digest/numeric.h>
+#include <util/network/ip.h>
+#include <util/network/socket.h>
+#include <util/generic/hash_set.h>
+#include <util/generic/yexception.h>
+
+using TAddr = THttpServerOptions::TAddr;
+
+static inline TString AddrToString(const TAddr& addr) {
+ return addr.Addr + ":" + ToString(addr.Port);
+}
+
+static inline TNetworkAddress ToNetworkAddr(const TString& address, ui16 port) {
+ if (address.empty() || address == TStringBuf("*")) {
+ return TNetworkAddress(port);
+ }
+
+ return TNetworkAddress(address, port);
+}
+
+void THttpServerOptions::BindAddresses(TBindAddresses& ret) const {
+ THashSet<TString> check;
+
+ for (auto addr : BindSockaddr) {
+ if (!addr.Port) {
+ addr.Port = Port;
+ }
+
+ const TString straddr = AddrToString(addr);
+
+ if (check.find(straddr) == check.end()) {
+ check.insert(straddr);
+ ret.push_back(ToNetworkAddr(addr.Addr, addr.Port));
+ }
+ }
+
+ if (ret.empty()) {
+ ret.push_back(Host ? TNetworkAddress(Host, Port) : TNetworkAddress(Port));
+ }
+}
diff --git a/library/cpp/http/server/options.h b/library/cpp/http/server/options.h
new file mode 100644
index 00000000000..38eda0e5e78
--- /dev/null
+++ b/library/cpp/http/server/options.h
@@ -0,0 +1,176 @@
+#pragma once
+
+#include <util/network/ip.h>
+#include <util/network/init.h>
+#include <util/network/address.h>
+#include <util/generic/size_literals.h>
+#include <util/generic/string.h>
+#include <util/generic/vector.h>
+#include <util/datetime/base.h>
+
+class THttpServerOptions {
+public:
+ inline THttpServerOptions(ui16 port = 17000) noexcept
+ : Port(port)
+ {
+ }
+
+ using TBindAddresses = TVector<TNetworkAddress>;
+ void BindAddresses(TBindAddresses& ret) const;
+
+ inline THttpServerOptions& AddBindAddress(const TString& address, ui16 port) {
+ const TAddr addr = {
+ address,
+ port,
+ };
+
+ BindSockaddr.push_back(addr);
+ return *this;
+ }
+
+ inline THttpServerOptions& AddBindAddress(const TString& address) {
+ return AddBindAddress(address, 0);
+ }
+
+ inline THttpServerOptions& EnableKeepAlive(bool enable) noexcept {
+ KeepAliveEnabled = enable;
+
+ return *this;
+ }
+
+ inline THttpServerOptions& EnableCompression(bool enable) noexcept {
+ CompressionEnabled = enable;
+
+ return *this;
+ }
+
+ inline THttpServerOptions& EnableRejectExcessConnections(bool enable) noexcept {
+ RejectExcessConnections = enable;
+
+ return *this;
+ }
+
+ inline THttpServerOptions& EnableReusePort(bool enable) noexcept {
+ ReusePort = enable;
+
+ return *this;
+ }
+
+ inline THttpServerOptions& EnableReuseAddress(bool enable) noexcept {
+ ReuseAddress = enable;
+
+ return *this;
+ }
+
+ inline THttpServerOptions& SetThreads(ui32 threads) noexcept {
+ nThreads = threads;
+
+ return *this;
+ }
+
+ /// Default interface name to bind the server. Used when none of BindAddress are provided.
+ inline THttpServerOptions& SetHost(const TString& host) noexcept {
+ Host = host;
+
+ return *this;
+ }
+
+ /// Default port to bind the server. Used when none of BindAddress are provided.
+ inline THttpServerOptions& SetPort(ui16 port) noexcept {
+ Port = port;
+
+ return *this;
+ }
+
+ inline THttpServerOptions& SetMaxConnections(ui32 mc = 0) noexcept {
+ MaxConnections = mc;
+
+ return *this;
+ }
+
+ inline THttpServerOptions& SetMaxQueueSize(ui32 mqs = 0) noexcept {
+ MaxQueueSize = mqs;
+
+ return *this;
+ }
+
+ inline THttpServerOptions& SetClientTimeout(const TDuration& timeout) noexcept {
+ ClientTimeout = timeout;
+
+ return *this;
+ }
+
+ inline THttpServerOptions& SetListenBacklog(int val) noexcept {
+ ListenBacklog = val;
+
+ return *this;
+ }
+
+ inline THttpServerOptions& SetOutputBufferSize(size_t val) noexcept {
+ OutputBufferSize = val;
+
+ return *this;
+ }
+
+ inline THttpServerOptions& SetMaxInputContentLength(ui64 val) noexcept {
+ MaxInputContentLength = val;
+
+ return *this;
+ }
+
+ inline THttpServerOptions& SetMaxRequestsPerConnection(size_t val) noexcept {
+ MaxRequestsPerConnection = val;
+
+ return *this;
+ }
+
+ /// Use TElasticQueue instead of TThreadPool for request queues
+ inline THttpServerOptions& EnableElasticQueues(bool enable) noexcept {
+ UseElasticQueues = enable;
+
+ return *this;
+ }
+
+ inline THttpServerOptions& SetThreadsName(const TString& listenThreadName, const TString& requestsThreadName, const TString& failRequestsThreadName) noexcept {
+ ListenThreadName = listenThreadName;
+ RequestsThreadName = requestsThreadName;
+ FailRequestsThreadName = failRequestsThreadName;
+
+ return *this;
+ }
+
+ struct TAddr {
+ TString Addr;
+ ui16 Port;
+ };
+
+ typedef TVector<TAddr> TAddrs;
+
+ bool KeepAliveEnabled = true;
+ bool CompressionEnabled = false;
+ bool RejectExcessConnections = false;
+ bool ReusePort = false; // set SO_REUSEPORT socket option
+ bool ReuseAddress = true; // set SO_REUSEADDR socket option
+ TAddrs BindSockaddr;
+ ui16 Port = 17000; // The port on which to run the web server
+ TString Host; // DNS entry
+ const char* ServerName = "YWS/1.0"; // The Web server name to return in HTTP headers
+ ui32 nThreads = 0; // Thread count for requests processing
+ ui32 MaxQueueSize = 0; // Max allowed request count in queue
+ ui32 nFThreads = 1;
+ ui32 MaxFQueueSize = 0;
+ ui32 MaxConnections = 100;
+ int ListenBacklog = SOMAXCONN;
+ TDuration ClientTimeout;
+ size_t OutputBufferSize = 0;
+ ui64 MaxInputContentLength = sizeof(size_t) <= 4 ? 2_GB : 64_GB;
+ size_t MaxRequestsPerConnection = 0; // If keep-alive is enabled, request limit before connection is closed
+ bool UseElasticQueues = false;
+
+ TDuration PollTimeout; // timeout of TSocketPoller::WaitT call
+ TDuration ExpirationTimeout; // drop inactive connections after ExpirationTimeout (should be > 0)
+
+ TString ListenThreadName = "HttpListen";
+ TString RequestsThreadName = "HttpServer";
+ TString FailRequestsThreadName = "HttpServer";
+};
diff --git a/library/cpp/http/server/response.cpp b/library/cpp/http/server/response.cpp
new file mode 100644
index 00000000000..52d64c91cec
--- /dev/null
+++ b/library/cpp/http/server/response.cpp
@@ -0,0 +1,65 @@
+#include "response.h"
+
+#include <util/stream/output.h>
+#include <util/stream/mem.h>
+#include <util/string/cast.h>
+
+THttpResponse& THttpResponse::AddMultipleHeaders(const THttpHeaders& headers) {
+ for (THttpHeaders::TConstIterator i = headers.Begin(); i != headers.End(); ++i) {
+ this->Headers.AddHeader(*i);
+ }
+ return *this;
+}
+
+THttpResponse& THttpResponse::SetContentType(const TStringBuf& contentType) {
+ Headers.AddOrReplaceHeader(THttpInputHeader("Content-Type", ToString(contentType)));
+
+ return *this;
+}
+
+void THttpResponse::OutTo(IOutputStream& os) const {
+ TVector<IOutputStream::TPart> parts;
+ const size_t FIRST_LINE_PARTS = 3;
+ const size_t HEADERS_PARTS = Headers.Count() * 4;
+ const size_t CONTENT_PARTS = 5;
+ parts.reserve(FIRST_LINE_PARTS + HEADERS_PARTS + CONTENT_PARTS);
+
+ // first line
+ parts.push_back(IOutputStream::TPart(TStringBuf("HTTP/1.1 ")));
+ parts.push_back(IOutputStream::TPart(HttpCodeStrEx(Code)));
+ parts.push_back(IOutputStream::TPart::CrLf());
+
+ // headers
+ for (THttpHeaders::TConstIterator i = Headers.Begin(); i != Headers.End(); ++i) {
+ parts.push_back(IOutputStream::TPart(i->Name()));
+ parts.push_back(IOutputStream::TPart(TStringBuf(": ")));
+ parts.push_back(IOutputStream::TPart(i->Value()));
+ parts.push_back(IOutputStream::TPart::CrLf());
+ }
+
+ char buf[50];
+
+ if (!Content.empty()) {
+ TMemoryOutput mo(buf, sizeof(buf));
+
+ mo << Content.size();
+
+ parts.push_back(IOutputStream::TPart(TStringBuf("Content-Length: ")));
+ parts.push_back(IOutputStream::TPart(buf, mo.Buf() - buf));
+ parts.push_back(IOutputStream::TPart::CrLf());
+ }
+
+ // content
+ parts.push_back(IOutputStream::TPart::CrLf());
+
+ if (!Content.empty()) {
+ parts.push_back(IOutputStream::TPart(Content));
+ }
+
+ os.Write(parts.data(), parts.size());
+}
+
+template <>
+void Out<THttpResponse>(IOutputStream& os, const THttpResponse& resp) {
+ resp.OutTo(os);
+}
diff --git a/library/cpp/http/server/response.h b/library/cpp/http/server/response.h
new file mode 100644
index 00000000000..a75cb85605f
--- /dev/null
+++ b/library/cpp/http/server/response.h
@@ -0,0 +1,82 @@
+#pragma once
+
+#include <library/cpp/http/misc/httpcodes.h>
+#include <library/cpp/http/io/stream.h>
+
+#include <util/generic/strbuf.h>
+#include <util/string/cast.h>
+
+class THttpHeaders;
+class IOutputStream;
+
+class THttpResponse {
+public:
+ THttpResponse() noexcept
+ : Code(HTTP_OK)
+ {
+ }
+
+ explicit THttpResponse(HttpCodes code) noexcept
+ : Code(code)
+ {
+ }
+
+ template <typename ValueType>
+ THttpResponse& AddHeader(const TString& name, const ValueType& value) {
+ return AddHeader(THttpInputHeader(name, ToString(value)));
+ }
+
+ THttpResponse& AddHeader(const THttpInputHeader& header) {
+ Headers.AddHeader(header);
+
+ return *this;
+ }
+
+ THttpResponse& AddMultipleHeaders(const THttpHeaders& headers);
+
+ const THttpHeaders& GetHeaders() const {
+ return Headers;
+ }
+
+ THttpResponse& SetContentType(const TStringBuf& contentType);
+
+ /**
+ * @note If @arg content isn't empty its size is automatically added as a
+ * "Content-Length" header during output to IOutputStream.
+ * @see IOutputStream& operator << (IOutputStream&, const THttpResponse&)
+ */
+ THttpResponse& SetContent(const TString& content) {
+ Content = content;
+
+ return *this;
+ }
+
+ TString GetContent() const {
+ return Content;
+ }
+
+ /**
+ * @note If @arg content isn't empty its size is automatically added as a
+ * "Content-Length" header during output to IOutputStream.
+ * @see IOutputStream& operator << (IOutputStream&, const THttpResponse&)
+ */
+ THttpResponse& SetContent(const TString& content, const TStringBuf& contentType) {
+ return SetContent(content).SetContentType(contentType);
+ }
+
+ HttpCodes HttpCode() const {
+ return Code;
+ }
+
+ THttpResponse& SetHttpCode(HttpCodes code) {
+ Code = code;
+ return *this;
+ }
+
+ void OutTo(IOutputStream& out) const;
+
+private:
+ HttpCodes Code;
+ THttpHeaders Headers;
+ TString Content;
+};
diff --git a/library/cpp/http/server/response_ut.cpp b/library/cpp/http/server/response_ut.cpp
new file mode 100644
index 00000000000..73e2112ad36
--- /dev/null
+++ b/library/cpp/http/server/response_ut.cpp
@@ -0,0 +1,142 @@
+#include "response.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/string/cast.h>
+
+Y_UNIT_TEST_SUITE(TestHttpResponse) {
+ Y_UNIT_TEST(TestCodeOnly) {
+ UNIT_ASSERT_STRINGS_EQUAL(ToString(THttpResponse()), "HTTP/1.1 200 Ok\r\n\r\n");
+ UNIT_ASSERT_STRINGS_EQUAL(ToString(THttpResponse(HTTP_NOT_FOUND)), "HTTP/1.1 404 Not found\r\n\r\n");
+ }
+
+ Y_UNIT_TEST(TestRedirect) {
+ THttpResponse resp = THttpResponse(HTTP_FOUND).AddHeader("Location", "yandex.ru");
+ UNIT_ASSERT_STRINGS_EQUAL(ToString(resp), "HTTP/1.1 302 Moved temporarily\r\n"
+ "Location: yandex.ru\r\n"
+ "\r\n");
+ }
+
+ Y_UNIT_TEST(TestAddHeader) {
+ THttpResponse resp(HTTP_FORBIDDEN);
+ resp.AddHeader(THttpInputHeader("X-Header-1", "ValueOne"));
+ resp.AddHeader("X-Header-2", 10);
+ resp.AddHeader("X-Header-3", true);
+
+ const char* EXPECTED = "HTTP/1.1 403 Forbidden\r\n"
+ "X-Header-1: ValueOne\r\n"
+ "X-Header-2: 10\r\n"
+ "X-Header-3: 1\r\n"
+ "\r\n";
+ UNIT_ASSERT_STRINGS_EQUAL(ToString(resp), EXPECTED);
+ }
+
+ Y_UNIT_TEST(TestAddMultipleHeaders) {
+ THttpHeaders headers;
+ headers.AddHeader(THttpInputHeader("X-Header-1", "ValueOne"));
+ headers.AddHeader(THttpInputHeader("X-Header-2", "ValueTwo"));
+ headers.AddHeader(THttpInputHeader("X-Header-3", "ValueThree"));
+
+ const char* EXPECTED = "HTTP/1.1 403 Forbidden\r\n"
+ "X-Header-1: ValueOne\r\n"
+ "X-Header-2: ValueTwo\r\n"
+ "X-Header-3: ValueThree\r\n"
+ "\r\n";
+ UNIT_ASSERT_STRINGS_EQUAL(ToString(THttpResponse(HTTP_FORBIDDEN).AddMultipleHeaders(headers)),
+ EXPECTED);
+ }
+
+ Y_UNIT_TEST(TestGetHeaders) {
+ THttpResponse resp(HTTP_FORBIDDEN);
+
+ THttpHeaders headers;
+ headers.AddHeader(THttpInputHeader("X-Header-1", "ValueOne"));
+ headers.AddHeader(THttpInputHeader("X-Header-2", "ValueTwo"));
+ headers.AddHeader(THttpInputHeader("X-Header-3", "ValueThree"));
+ resp.AddMultipleHeaders(headers);
+ resp.AddHeader("X-Header-4", "ValueFour");
+
+ const THttpHeaders& gotHeaders = resp.GetHeaders();
+ UNIT_ASSERT_VALUES_EQUAL(gotHeaders.Count(), 4);
+ UNIT_ASSERT(gotHeaders.HasHeader("X-Header-1"));
+ UNIT_ASSERT_STRINGS_EQUAL(gotHeaders.FindHeader("X-Header-1")->Value(), "ValueOne");
+ UNIT_ASSERT(gotHeaders.HasHeader("X-Header-4"));
+ UNIT_ASSERT_STRINGS_EQUAL(gotHeaders.FindHeader("X-Header-4")->Value(), "ValueFour");
+ }
+
+
+ Y_UNIT_TEST(TestSetContent) {
+ const char* EXPECTED = "HTTP/1.1 200 Ok\r\n"
+ "Content-Length: 10\r\n"
+ "\r\n"
+ "0123456789";
+ UNIT_ASSERT_STRINGS_EQUAL(ToString(THttpResponse().SetContent("0123456789")),
+ EXPECTED);
+ }
+
+ Y_UNIT_TEST(TestSetContentWithContentType) {
+ const char* EXPECTED = "HTTP/1.1 200 Ok\r\n"
+ "Content-Type: text/xml\r\n"
+ "Content-Length: 28\r\n"
+ "\r\n"
+ "<xml><tag value=\"1\" /></xml>";
+ THttpResponse resp;
+ resp.SetContent("<xml><tag value=\"1\" /></xml>").SetContentType("text/xml");
+ UNIT_ASSERT_STRINGS_EQUAL(ToString(resp), EXPECTED);
+ }
+
+ Y_UNIT_TEST(TestCopyConstructor) {
+ THttpResponse resp(HTTP_FORBIDDEN);
+ resp.AddHeader(THttpInputHeader("X-Header-1", "ValueOne"))
+ .AddHeader("X-Header-2", "ValueTwo")
+ .AddHeader(THttpInputHeader("X-Header-3", "ValueThree"))
+ .SetContent("Some stuff")
+ .SetContentType("text/plain");
+
+ THttpResponse copy = resp;
+ UNIT_ASSERT_STRINGS_EQUAL(ToString(copy), ToString(resp));
+ }
+
+ Y_UNIT_TEST(TestAssignment) {
+ THttpResponse resp(HTTP_FORBIDDEN);
+ resp.AddHeader(THttpInputHeader("X-Header-1", "ValueOne"));
+ resp.AddHeader(THttpInputHeader("X-Header-2", "ValueTwo"));
+ resp.AddHeader(THttpInputHeader("X-Header-3", "ValueThree"));
+ resp.SetContent("Some stuff").SetContentType("text/plain");
+
+ THttpResponse copy;
+ copy = resp;
+ UNIT_ASSERT_STRINGS_EQUAL(ToString(copy), ToString(resp));
+ }
+
+ Y_UNIT_TEST(TestEmptyContent) {
+ UNIT_ASSERT_STRINGS_EQUAL(ToString(THttpResponse().SetContent("")), "HTTP/1.1 200 Ok\r\n\r\n");
+ }
+
+ Y_UNIT_TEST(TestReturnReference) {
+ THttpResponse resp;
+ UNIT_ASSERT_EQUAL(&resp, &resp.AddHeader("Header1", 1));
+ UNIT_ASSERT_EQUAL(&resp, &resp.AddHeader(THttpInputHeader("Header2", "2")));
+
+ THttpHeaders headers;
+ headers.AddHeader(THttpInputHeader("Header3", "3"));
+ headers.AddHeader(THttpInputHeader("Header4", "4"));
+ UNIT_ASSERT_EQUAL(&resp, &resp.AddMultipleHeaders(headers));
+
+ UNIT_ASSERT_EQUAL(&resp, &resp.SetContent("some stuff"));
+ UNIT_ASSERT_EQUAL(&resp, &resp.SetContent("some other stuff").SetContentType("text/plain"));
+ }
+
+ Y_UNIT_TEST(TestSetContentType) {
+ const char* EXPECTED = "HTTP/1.1 200 Ok\r\n"
+ "Content-Type: text/xml\r\n"
+ "Content-Length: 28\r\n"
+ "\r\n"
+ "<xml><tag value=\"1\" /></xml>";
+ THttpResponse resp;
+ resp.SetContent("<xml><tag value=\"1\" /></xml>")
+ .SetContentType("application/json")
+ .SetContentType("text/xml");
+ UNIT_ASSERT_STRINGS_EQUAL(ToString(resp), EXPECTED);
+ }
+}
diff --git a/library/cpp/http/server/ut/ya.make b/library/cpp/http/server/ut/ya.make
new file mode 100644
index 00000000000..bcb4d4c0b88
--- /dev/null
+++ b/library/cpp/http/server/ut/ya.make
@@ -0,0 +1,12 @@
+UNITTEST_FOR(library/cpp/http/server)
+
+OWNER(pg)
+
+SIZE(MEDIUM)
+
+SRCS(
+ http_ut.cpp
+ response_ut.cpp
+)
+
+END()
diff --git a/library/cpp/http/server/ya.make b/library/cpp/http/server/ya.make
new file mode 100644
index 00000000000..bae6f33306b
--- /dev/null
+++ b/library/cpp/http/server/ya.make
@@ -0,0 +1,27 @@
+LIBRARY()
+
+OWNER(
+ pg
+ mvel
+ kulikov
+ g:base
+ g:middle
+)
+
+SRCS(
+ conn.cpp
+ http.cpp
+ http_ex.cpp
+ options.cpp
+ response.cpp
+)
+
+PEERDIR(
+ library/cpp/http/misc
+ library/cpp/http/io
+ library/cpp/threading/equeue
+)
+
+END()
+
+RECURSE_FOR_TESTS(ut)