diff options
author | Devtools Arcadia <[email protected]> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <[email protected]> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/http/server |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/http/server')
-rw-r--r-- | library/cpp/http/server/conn.cpp | 69 | ||||
-rw-r--r-- | library/cpp/http/server/conn.h | 37 | ||||
-rw-r--r-- | library/cpp/http/server/http.cpp | 843 | ||||
-rw-r--r-- | library/cpp/http/server/http.h | 176 | ||||
-rw-r--r-- | library/cpp/http/server/http_ex.cpp | 107 | ||||
-rw-r--r-- | library/cpp/http/server/http_ex.h | 28 | ||||
-rw-r--r-- | library/cpp/http/server/http_ut.cpp | 739 | ||||
-rw-r--r-- | library/cpp/http/server/options.cpp | 43 | ||||
-rw-r--r-- | library/cpp/http/server/options.h | 176 | ||||
-rw-r--r-- | library/cpp/http/server/response.cpp | 65 | ||||
-rw-r--r-- | library/cpp/http/server/response.h | 82 | ||||
-rw-r--r-- | library/cpp/http/server/response_ut.cpp | 142 | ||||
-rw-r--r-- | library/cpp/http/server/ut/ya.make | 12 | ||||
-rw-r--r-- | library/cpp/http/server/ya.make | 27 |
14 files changed, 2546 insertions, 0 deletions
diff --git a/library/cpp/http/server/conn.cpp b/library/cpp/http/server/conn.cpp new file mode 100644 index 00000000000..38a76c4c309 --- /dev/null +++ b/library/cpp/http/server/conn.cpp @@ -0,0 +1,69 @@ +#include "conn.h" + +#include <util/network/socket.h> +#include <util/stream/buffered.h> + +class THttpServerConn::TImpl { +public: + inline TImpl(const TSocket& s, size_t outputBufferSize) + : S_(s) + , SI_(S_) + , SO_(S_) + , BO_(&SO_, outputBufferSize) + , HI_(&SI_) + , HO_(&BO_, &HI_) + { + } + + inline ~TImpl() { + } + + inline THttpInput* Input() noexcept { + return &HI_; + } + + inline THttpOutput* Output() noexcept { + return &HO_; + } + + inline void Reset() { + if (S_ != INVALID_SOCKET) { + // send RST packet to client + S_.SetLinger(true, 0); + S_.Close(); + } + } + +private: + TSocket S_; + TSocketInput SI_; + TSocketOutput SO_; + TBufferedOutput BO_; + THttpInput HI_; + THttpOutput HO_; +}; + +THttpServerConn::THttpServerConn(const TSocket& s) + : THttpServerConn(s, s.MaximumTransferUnit()) +{ +} + +THttpServerConn::THttpServerConn(const TSocket& s, size_t outputBufferSize) + : Impl_(new TImpl(s, outputBufferSize)) +{ +} + +THttpServerConn::~THttpServerConn() { +} + +THttpInput* THttpServerConn::Input() noexcept { + return Impl_->Input(); +} + +THttpOutput* THttpServerConn::Output() noexcept { + return Impl_->Output(); +} + +void THttpServerConn::Reset() { + return Impl_->Reset(); +} diff --git a/library/cpp/http/server/conn.h b/library/cpp/http/server/conn.h new file mode 100644 index 00000000000..3aa5329af42 --- /dev/null +++ b/library/cpp/http/server/conn.h @@ -0,0 +1,37 @@ +#pragma once + +#include <library/cpp/http/io/stream.h> +#include <util/generic/ptr.h> + +class TSocket; + +/// Потоки ввода/вывода для получения запросов и отправки ответов HTTP-сервера. +class THttpServerConn { +public: + explicit THttpServerConn(const TSocket& s); + THttpServerConn(const TSocket& s, size_t outputBufferSize); + ~THttpServerConn(); + + THttpInput* Input() noexcept; + THttpOutput* Output() noexcept; + + inline const THttpInput* Input() const noexcept { + return const_cast<THttpServerConn*>(this)->Input(); + } + + inline const THttpOutput* Output() const noexcept { + return const_cast<THttpServerConn*>(this)->Output(); + } + + /// Проверяет, можно ли установить режим, при котором соединение с сервером + /// не завершается после окончания транзакции. + inline bool CanBeKeepAlive() const noexcept { + return Output()->CanBeKeepAlive(); + } + + void Reset(); + +private: + class TImpl; + THolder<TImpl> Impl_; +}; diff --git a/library/cpp/http/server/http.cpp b/library/cpp/http/server/http.cpp new file mode 100644 index 00000000000..128583bdd70 --- /dev/null +++ b/library/cpp/http/server/http.cpp @@ -0,0 +1,843 @@ +#include "http.h" +#include "http_ex.h" + +#include <library/cpp/threading/equeue/equeue.h> + +#include <util/generic/buffer.h> +#include <util/generic/cast.h> +#include <util/generic/intrlist.h> +#include <util/generic/yexception.h> +#include <util/network/address.h> +#include <util/network/socket.h> +#include <util/network/poller.h> +#include <util/system/atomic.h> +#include <util/system/compat.h> // stricmp, strnicmp, strlwr, strupr, stpcpy +#include <util/system/defaults.h> +#include <util/system/event.h> +#include <util/system/mutex.h> +#include <util/system/pipe.h> +#include <util/system/thread.h> +#include <util/thread/factory.h> + +#include <cerrno> +#include <cstring> +#include <ctime> + +#include <sys/stat.h> +#include <sys/types.h> + +using namespace NAddr; + +namespace { + class IPollAble { + public: + inline IPollAble() noexcept { + } + + virtual ~IPollAble() { + } + + virtual void OnPollEvent(TInstant now) = 0; + }; + + struct TShouldStop { + }; + + struct TWakeupPollAble: public IPollAble { + void OnPollEvent(TInstant) override { + throw TShouldStop(); + } + }; +} + +class TClientConnection: public IPollAble, public TIntrusiveListItem<TClientConnection> { +public: + TClientConnection(const TSocket& s, THttpServer::TImpl* serv, NAddr::IRemoteAddrRef listenerSockAddrRef); + ~TClientConnection() override; + + void OnPollEvent(TInstant now) override; + + inline void Activate(TInstant now) noexcept; + inline void DeActivate(); + inline void Reject(); + +public: + TSocket Socket_; + NAddr::IRemoteAddrRef ListenerSockAddrRef_; + THttpServer::TImpl* HttpServ_ = nullptr; + bool Reject_ = false; + TInstant LastUsed; + TInstant AcceptMoment; + size_t ReceivedRequests = 0; +}; + +class THttpServer::TImpl { +public: + class TConnections { + public: + inline TConnections(TSocketPoller* poller, const THttpServerOptions& options) + : Poller_(poller) + , Options(options) + { + } + + inline ~TConnections() { + } + + inline void Add(TClientConnection* c) noexcept { + TGuard<TMutex> g(Mutex_); + + Conns_.PushBack(c); + Poller_->WaitRead(c->Socket_, (void*)static_cast<const IPollAble*>(c)); + } + + inline void Erase(TClientConnection* c, TInstant now) noexcept { + TGuard<TMutex> g(Mutex_); + EraseUnsafe(c); + if (Options.ExpirationTimeout > TDuration::Zero()) { + TryRemovingUnsafe(now - Options.ExpirationTimeout); + } + } + + inline void Clear() noexcept { + TGuard<TMutex> g(Mutex_); + + Conns_.Clear(); + } + + inline bool RemoveOld(TInstant border) noexcept { + TGuard<TMutex> g(Mutex_); + return TryRemovingUnsafe(border); + } + + bool TryRemovingUnsafe(TInstant border) noexcept { + if (Conns_.Empty()) { + return false; + } + TClientConnection* c = &*(Conns_.Begin()); + if (c->LastUsed > border) { + return false; + } + EraseUnsafe(c); + delete c; + return true; + } + + void EraseUnsafe(TClientConnection* c) noexcept { + Poller_->Unwait(c->Socket_); + c->Unlink(); + } + + public: + TMutex Mutex_; + TIntrusiveListWithAutoDelete<TClientConnection, TDelete> Conns_; + TSocketPoller* Poller_ = nullptr; + const THttpServerOptions& Options; + }; + + static void* ListenSocketFunction(void* param) { + try { + ((TImpl*)param)->ListenSocket(); + } catch (...) { + + } + + return nullptr; + } + + TAutoPtr<TClientRequest> CreateRequest(TAutoPtr<TClientConnection> c) { + THolder<TClientRequest> obj(Cb_->CreateClient()); + + obj->Conn_.Reset(c.Release()); + + return obj; + } + + void AddRequestFromSocket(const TSocket& s, TInstant now, NAddr::IRemoteAddrRef listenerSockAddrRef) { + if (MaxRequestsReached()) { + Cb_->OnMaxConn(); + bool wasRemoved = Connections->RemoveOld(TInstant::Max()); + if (!wasRemoved && Options_.RejectExcessConnections) { + (new TClientConnection(s, this, listenerSockAddrRef))->Reject(); + return; + } + } + + auto connection = new TClientConnection(s, this, listenerSockAddrRef); + connection->LastUsed = now; + connection->DeActivate(); + } + + void SaveErrorCode() { + ErrorCode = WSAGetLastError(); + } + + int GetErrorCode() const { + return ErrorCode; + } + + const char* GetError() const { + return LastSystemErrorText(ErrorCode); + } + + bool Start() { + Poller.Reset(new TSocketPoller()); + Connections.Reset(new TConnections(Poller.Get(), Options_)); + + // Start the listener thread + ListenerRunningOK = false; + + // throws on error + TPipeHandle::Pipe(ListenWakeupReadFd, ListenWakeupWriteFd); + + SetNonBlock(ListenWakeupWriteFd, true); + SetNonBlock(ListenWakeupReadFd, true); + + Poller->WaitRead(ListenWakeupReadFd, &WakeupPollAble); + + ListenStartEvent.Reset(); + try { + ListenThread.Reset(new TThread(ListenSocketFunction, this)); + ListenThread->Start(); + } catch (const yexception&) { + SaveErrorCode(); + return false; + } + + // Wait until the thread has completely started and return the success indicator + ListenStartEvent.Wait(); + + return ListenerRunningOK; + } + + void Wait() { + Cb_->OnWait(); + TGuard<TMutex> g(StopMutex); + if (ListenThread) { + ListenThread->Join(); + ListenThread.Reset(nullptr); + } + } + + void Stop() { + Shutdown(); + + TGuard<TMutex> g(StopMutex); + if (ListenThread) { + ListenThread->Join(); + ListenThread.Reset(nullptr); + } + + while (ConnectionCount) { + usleep(10000); + Connections->Clear(); + } + + Connections.Destroy(); + Poller.Destroy(); + } + + void Shutdown() { + ListenWakeupWriteFd.Write("", 1); + // ignore result + } + + void AddRequest(TAutoPtr<TClientRequest> req, bool fail) { + struct TFailRequest: public THttpClientRequestEx { + inline TFailRequest(TAutoPtr<TClientRequest> parent) { + Conn_.Reset(parent->Conn_.Release()); + HttpConn_.Reset(parent->HttpConn_.Release()); + } + + bool Reply(void*) override { + if (!ProcessHeaders()) { + return true; + } + + ProcessFailRequest(0); + return true; + } + }; + + if (!fail && Requests->Add(req.Get())) { + Y_UNUSED(req.Release()); + } else { + req = new TFailRequest(req); + + if (FailRequests->Add(req.Get())) { + Y_UNUSED(req.Release()); + } else { + Cb_->OnFailRequest(-1); + } + } + } + + size_t GetRequestQueueSize() const { + return Requests->Size(); + } + + size_t GetFailQueueSize() const { + return FailRequests->Size(); + } + + const IThreadPool& GetRequestQueue() const { + return *Requests; + } + + const IThreadPool& GetFailQueue() const { + return *FailRequests; + } + + class TListenSocket: public IPollAble, public TIntrusiveListItem<TListenSocket> { + public: + inline TListenSocket(const TSocket& s, TImpl* parent) + : S_(s) + , Server_(parent) + , SockAddrRef_(GetSockAddr(S_)) + { + } + + ~TListenSocket() override { + } + + void OnPollEvent(TInstant) override { + SOCKET s = ::accept(S_, nullptr, nullptr); + + if (s == INVALID_SOCKET) { + ythrow yexception() << "accept: " << LastSystemErrorText(); + } + + Server_->AddRequestFromSocket(s, TInstant::Now(), SockAddrRef_); + } + + SOCKET GetSocket() const noexcept { + return S_; + } + + private: + TSocket S_; + TImpl* Server_ = nullptr; + NAddr::IRemoteAddrRef SockAddrRef_; + }; + + void ListenSocket() { + TThread::SetCurrentThreadName(Options_.ListenThreadName.c_str()); + + ErrorCode = 0; + TIntrusiveListWithAutoDelete<TListenSocket, TDelete> Reqs; + + std::function<void(TSocket)> callback = [&](TSocket socket) { + THolder<TListenSocket> ls(new TListenSocket(socket, this)); + Poller->WaitRead(socket, static_cast<IPollAble*>(ls.Get())); + Reqs.PushBack(ls.Release()); + }; + bool addressesBound = TryToBindAddresses(Options_, &callback); + if (!addressesBound) { + SaveErrorCode(); + ListenStartEvent.Signal(); + + return; + } + + Requests->Start(Options_.nThreads, Options_.MaxQueueSize); + FailRequests->Start(Options_.nFThreads, Options_.MaxFQueueSize); + Cb_->OnListenStart(); + ListenerRunningOK = true; + ListenStartEvent.Signal(); + + TVector<void*> events; + events.resize(1); + + TInstant now = TInstant::Now(); + for (;;) { + try { + const TInstant deadline = Options_.PollTimeout == TDuration::Zero() ? TInstant::Max() : now + Options_.PollTimeout; + const size_t ret = Poller->WaitD(events.data(), events.size(), deadline); + + now = TInstant::Now(); + for (size_t i = 0; i < ret; ++i) { + ((IPollAble*)events[i])->OnPollEvent(now); + } + + if (ret == 0 && Options_.ExpirationTimeout > TDuration::Zero()) { + Connections->RemoveOld(now - Options_.ExpirationTimeout); + } + + // When MaxConnections is limited or ExpirationTimeout is set, OnPollEvent can call + // RemoveOld and destroy other IPollAble* objects in the + // poller. Thus in this case we can safely process only one + // event from the poller at a time. + if (!Options_.MaxConnections && Options_.ExpirationTimeout == TDuration::Zero()) { + if (ret >= events.size()) { + events.resize(ret * 2); + } + } + } catch (const TShouldStop&) { + break; + } catch (...) { + Cb_->OnException(); + } + } + + while (!Reqs.Empty()) { + THolder<TListenSocket> ls(Reqs.PopFront()); + + Poller->Unwait(ls->GetSocket()); + } + + Requests->Stop(); + FailRequests->Stop(); + Cb_->OnListenStop(); + } + + void RestartRequestThreads(ui32 nTh, ui32 maxQS) { + Requests->Stop(); + Options_.nThreads = nTh; + Options_.MaxQueueSize = maxQS; + Requests->Start(Options_.nThreads, Options_.MaxQueueSize); + } + + TImpl(THttpServer* parent, ICallBack* cb, TMtpQueueRef mainWorkers, TMtpQueueRef failWorkers, const TOptions& options_) + : Requests(mainWorkers) + , FailRequests(failWorkers) + , Options_(options_) + , Cb_(cb) + , Parent_(parent) + { + } + + TImpl(THttpServer* parent, ICallBack* cb, const TOptions& options, IThreadFactory* factory) + : TImpl( + parent, + cb, + MakeThreadPool<TSimpleThreadPool>(factory, options.UseElasticQueues, cb, options.RequestsThreadName), + MakeThreadPool<TThreadPool>(factory, options.UseElasticQueues, nullptr, options.FailRequestsThreadName), + options) { + } + + ~TImpl() { + try { + Stop(); + } catch (...) { + } + } + + inline const TOptions& Options() const noexcept { + return Options_; + } + + inline void DecreaseConnections() noexcept { + AtomicDecrement(ConnectionCount); + } + + inline void IncreaseConnections() noexcept { + AtomicIncrement(ConnectionCount); + } + + inline i64 GetClientCount() const { + return AtomicGet(ConnectionCount); + } + + inline bool MaxRequestsReached() const { + return Options_.MaxConnections && ((size_t)GetClientCount() >= Options_.MaxConnections); + } + + THolder<TThread> ListenThread; + TPipeHandle ListenWakeupReadFd; + TPipeHandle ListenWakeupWriteFd; + TSystemEvent ListenStartEvent; + TMtpQueueRef Requests; + TMtpQueueRef FailRequests; + TAtomic ConnectionCount = 0; + THolder<TSocketPoller> Poller; + THolder<TConnections> Connections; + bool ListenerRunningOK = false; + int ErrorCode = 0; + TOptions Options_; + ICallBack* Cb_ = nullptr; + THttpServer* Parent_ = nullptr; + TWakeupPollAble WakeupPollAble; + TMutex StopMutex; + +private: + template <class TThreadPool_> + static THolder<IThreadPool> MakeThreadPool(IThreadFactory* factory, bool elastic, ICallBack* callback = nullptr, const TString& threadName = {}) { + if (!factory) { + factory = SystemThreadFactory(); + } + + THolder<IThreadPool> pool; + const auto params = IThreadPool::TParams().SetFactory(factory).SetThreadName(threadName); + if (callback) { + pool = MakeHolder<TThreadPoolBinder<TThreadPool_, THttpServer::ICallBack>>(callback, params); + } else { + pool = MakeHolder<TThreadPool_>(params); + } + + if (elastic) { + pool = MakeHolder<TElasticQueue>(std::move(pool)); + } + + return pool; + } +}; + +THttpServer::THttpServer(ICallBack* cb, const TOptions& options, IThreadFactory* pool) + : Impl_(new TImpl(this, cb, options, pool)) +{ +} + +THttpServer::THttpServer(ICallBack* cb, TMtpQueueRef mainWorkers, TMtpQueueRef failWorkers, const TOptions& options) + : Impl_(new TImpl(this, cb, mainWorkers, failWorkers, options)) +{ +} + +THttpServer::~THttpServer() { +} + +i64 THttpServer::GetClientCount() const { + return Impl_->GetClientCount(); +} + +bool THttpServer::Start() { + return Impl_->Start(); +} + +void THttpServer::Stop() { + Impl_->Stop(); +} + +void THttpServer::Shutdown() { + Impl_->Shutdown(); +} + +void THttpServer::Wait() { + Impl_->Wait(); +} + +int THttpServer::GetErrorCode() { + return Impl_->GetErrorCode(); +} + +const char* THttpServer::GetError() { + return Impl_->GetError(); +} + +void THttpServer::RestartRequestThreads(ui32 n, ui32 queue) { + Impl_->RestartRequestThreads(n, queue); +} + +const THttpServer::TOptions& THttpServer::Options() const noexcept { + return Impl_->Options(); +} + +size_t THttpServer::GetRequestQueueSize() const { + return Impl_->GetRequestQueueSize(); +} + +size_t THttpServer::GetFailQueueSize() const { + return Impl_->GetFailQueueSize(); +} + +const IThreadPool& THttpServer::GetRequestQueue() const { + return Impl_->GetRequestQueue(); +} + +const IThreadPool& THttpServer::GetFailQueue() const { + return Impl_->GetFailQueue(); +} + +bool THttpServer::MaxRequestsReached() const { + return Impl_->MaxRequestsReached(); +} + +TClientConnection::TClientConnection(const TSocket& s, THttpServer::TImpl* serv, NAddr::IRemoteAddrRef listenerSockAddrRef) + : Socket_(s) + , ListenerSockAddrRef_(listenerSockAddrRef) + , HttpServ_(serv) +{ + SetNoDelay(Socket_, true); + + const TDuration& clientTimeout = HttpServ_->Options().ClientTimeout; + if (clientTimeout != TDuration::Zero()) { + SetSocketTimeout(Socket_, (long)clientTimeout.Seconds(), clientTimeout.MilliSecondsOfSecond()); + } + + HttpServ_->IncreaseConnections(); +} + +TClientConnection::~TClientConnection() { + HttpServ_->DecreaseConnections(); +} + +void TClientConnection::OnPollEvent(TInstant now) { + THolder<TClientConnection> this_(this); + Activate(now); + + { + char tmp[1]; + + if (::recv(Socket_, tmp, 1, MSG_PEEK) < 1) { + /* + * We can received a FIN so our socket was moved to + * TCP_CLOSE_WAIT state. Check it before adding work + * for this socket. + */ + + return; + } + } + + THolder<TClientRequest> obj(HttpServ_->CreateRequest(this_)); + AcceptMoment = now; + + HttpServ_->AddRequest(obj, Reject_); +} + +void TClientConnection::Activate(TInstant now) noexcept { + HttpServ_->Connections->Erase(this, now); + LastUsed = now; + ++ReceivedRequests; +} + +void TClientConnection::DeActivate() { + HttpServ_->Connections->Add(this); +} + +void TClientConnection::Reject() { + Reject_ = true; + + HttpServ_->Connections->Add(this); +} + +TClientRequest::TClientRequest() { +} + +TClientRequest::~TClientRequest() { +} + +bool TClientRequest::Reply(void* /*ThreadSpecificResource*/) { + if (strnicmp(RequestString.data(), "GET ", 4)) { + Output() << "HTTP/1.0 501 Not Implemented\r\n\r\n"; + } else { + Output() << "HTTP/1.0 200 OK\r\n" + "Content-Type: text/html\r\n" + "\r\n" + "Hello World!\r\n"; + } + + return true; +} + +bool TClientRequest::IsLocal() const { + return HasLocalAddress(Socket()); +} + +bool TClientRequest::CheckLoopback() { + bool isLocal = false; + + try { + isLocal = IsLocal(); + } catch (const yexception& e) { + Output() << "HTTP/1.0 500 Oops\r\n\r\n" + << e.what() << "\r\n"; + return false; + } + + if (!isLocal) { + Output() << "HTTP/1.0 403 Permission denied\r\n" + "Content-Type: text/html; charset=windows-1251\r\n" + "Connection: close\r\n" + "\r\n" + "<html><head><title>Permission denied</title></head>" + "<body><h1>Permission denied</h1>" + "<p>This request must be sent from the localhost.</p>" + "</body></html>\r\n"; + + return false; + } + + return true; +} + +void TClientRequest::ReleaseConnection() { + if (Conn_ && HttpConn_ && HttpServ()->Options().KeepAliveEnabled && HttpConn_->CanBeKeepAlive() && (!HttpServ()->Options().RejectExcessConnections || !HttpServ()->MaxRequestsReached())) { + Output().Finish(); + Conn_->DeActivate(); + Y_UNUSED(Conn_.Release()); + } +} + +void TClientRequest::ResetConnection() { + if (HttpConn_) { + // send RST packet to client + HttpConn_->Reset(); + HttpConn_.Destroy(); + } +} + +void TClientRequest::Process(void* ThreadSpecificResource) { + THolder<TClientRequest> this_(this); + + auto* serverImpl = Conn_->HttpServ_; + + try { + if (!HttpConn_) { + const size_t outputBufferSize = HttpServ()->Options().OutputBufferSize; + if (outputBufferSize) { + HttpConn_.Reset(new THttpServerConn(Socket(), outputBufferSize)); + } else { + HttpConn_.Reset(new THttpServerConn(Socket())); + } + + auto maxRequestsPerConnection = HttpServ()->Options().MaxRequestsPerConnection; + HttpConn_->Output()->EnableKeepAlive(HttpServ()->Options().KeepAliveEnabled && (!maxRequestsPerConnection || Conn_->ReceivedRequests < maxRequestsPerConnection)); + HttpConn_->Output()->EnableCompression(HttpServ()->Options().CompressionEnabled); + } + + if (ParsedHeaders.empty()) { + RequestString = Input().FirstLine(); + + const THttpHeaders& h = Input().Headers(); + ParsedHeaders.reserve(h.Count()); + for (THttpHeaders::TConstIterator it = h.Begin(); it != h.End(); ++it) { + ParsedHeaders.emplace_back(it->Name(), it->Value()); + } + } + + if (Reply(ThreadSpecificResource)) { + ReleaseConnection(); + + /* + * *this will be destroyed... + */ + + return; + } + } catch (...) { + serverImpl->Cb_->OnException(); + + throw; + } + + Y_UNUSED(this_.Release()); +} + +void TClientRequest::ProcessFailRequest(int failstate) { + Output() << "HTTP/1.1 503 Service Unavailable\r\n" + "Content-Type: text/plain\r\n" + "Content-Length: 21\r\n" + "\r\n" + "Service Unavailable\r\n"; + + TString url; + + if (!strnicmp(RequestString.data(), "GET ", 4)) { + // Trying to extract url... + const char* str = RequestString.data(); + + // Skipping spaces before url... + size_t start = 3; + while (str[start] == ' ') { + ++start; + } + + if (str[start]) { + // Traversing url... + size_t idx = start; + + while (str[idx] != ' ' && str[idx]) { + ++idx; + } + + url = RequestString.substr(start, idx - start); + } + } + + const THttpServer::ICallBack::TFailLogData d = { + failstate, + url, + }; + + // Handling failure... + Conn_->HttpServ_->Cb_->OnFailRequestEx(d); + Output().Flush(); +} + +THttpServer* TClientRequest::HttpServ() const noexcept { + return Conn_->HttpServ_->Parent_; +} + +const TSocket& TClientRequest::Socket() const noexcept { + return Conn_->Socket_; +} + +NAddr::IRemoteAddrRef TClientRequest::GetListenerSockAddrRef() const noexcept { + return Conn_->ListenerSockAddrRef_; +} + +TInstant TClientRequest::AcceptMoment() const noexcept { + return Conn_->AcceptMoment; +} + +/* + * TRequestReplier + */ +TRequestReplier::TRequestReplier() { +} + +TRequestReplier::~TRequestReplier() { +} + +bool TRequestReplier::Reply(void* threadSpecificResource) { + const TReplyParams params = { + threadSpecificResource, Input(), Output()}; + + return DoReply(params); +} + +bool TryToBindAddresses(const THttpServerOptions& options, const std::function<void(TSocket)>* callbackOnBoundAddress) { + THttpServerOptions::TBindAddresses addrs; + try { + options.BindAddresses(addrs); + } catch (const std::exception&) { + return false; + } + + for (const auto& na : addrs) { + for (TNetworkAddress::TIterator ai = na.Begin(); ai != na.End(); ++ai) { + NAddr::TAddrInfo addr(&*ai); + + TSocket socket(::socket(addr.Addr()->sa_family, SOCK_STREAM, 0)); + + if (socket == INVALID_SOCKET) { + return false; + } + + FixIPv6ListenSocket(socket); + + if (options.ReuseAddress) { + int yes = 1; + ::setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, (const char*)&yes, sizeof(yes)); + } + + if (options.ReusePort) { + SetReusePort(socket, true); + } + + if (::bind(socket, addr.Addr(), addr.Len()) == SOCKET_ERROR) { + return false; + } + + if (::listen(socket, options.ListenBacklog) == SOCKET_ERROR) { + return false; + } + + if (callbackOnBoundAddress != nullptr) { + (*callbackOnBoundAddress)(socket); + } + } + } + + return true; +} diff --git a/library/cpp/http/server/http.h b/library/cpp/http/server/http.h new file mode 100644 index 00000000000..b292d38f270 --- /dev/null +++ b/library/cpp/http/server/http.h @@ -0,0 +1,176 @@ +#pragma once + +#include "conn.h" +#include "options.h" + +#include <util/thread/pool.h> +#include <library/cpp/http/io/stream.h> +#include <util/memory/blob.h> +#include <util/generic/ptr.h> +#include <util/generic/vector.h> +#include <util/system/atomic.h> + +class IThreadFactory; +class TClientRequest; +class TClientConnection; + +class THttpServer { + friend class TClientRequest; + friend class TClientConnection; + +public: + class ICallBack { + public: + struct TFailLogData { + int failstate; + TString url; + }; + + virtual ~ICallBack() { + } + + virtual void OnFailRequest(int /*failstate*/) { + } + + virtual void OnFailRequestEx(const TFailLogData& d) { + OnFailRequest(d.failstate); + } + + virtual void OnException() { + } + + virtual void OnMaxConn() { + } + + virtual TClientRequest* CreateClient() = 0; + + virtual void OnListenStart() { + } + + virtual void OnListenStop() { + } + + virtual void OnWait() { + } + + virtual void* CreateThreadSpecificResource() { + return nullptr; + } + + virtual void DestroyThreadSpecificResource(void*) { + } + }; + + typedef THttpServerOptions TOptions; + typedef TSimpleSharedPtr<IThreadPool> TMtpQueueRef; + + THttpServer(ICallBack* cb, const TOptions& options = TOptions(), IThreadFactory* pool = nullptr); + THttpServer(ICallBack* cb, TMtpQueueRef mainWorkers, TMtpQueueRef failWorkers, const TOptions& options = TOptions()); + virtual ~THttpServer(); + + bool Start(); + + // shutdown a.s.a.p. + void Stop(); + + // graceful shutdown with serving all already open connections + void Shutdown(); + + void Wait(); + int GetErrorCode(); + const char* GetError(); + void RestartRequestThreads(ui32 nTh, ui32 maxQS); + const TOptions& Options() const noexcept; + i64 GetClientCount() const; + + class TImpl; + size_t GetRequestQueueSize() const; + size_t GetFailQueueSize() const; + + const IThreadPool& GetRequestQueue() const; + const IThreadPool& GetFailQueue() const; + + static TAtomicBase AcceptReturnsInvalidSocketCounter(); + +private: + bool MaxRequestsReached() const; + +private: + THolder<TImpl> Impl_; +}; + +/** + * @deprecated Use TRequestReplier instead + */ +class TClientRequest: public IObjectInQueue { + friend class THttpServer::TImpl; + +public: + TClientRequest(); + ~TClientRequest() override; + + inline THttpInput& Input() noexcept { + return *HttpConn_->Input(); + } + + inline THttpOutput& Output() noexcept { + return *HttpConn_->Output(); + } + + THttpServer* HttpServ() const noexcept; + const TSocket& Socket() const noexcept; + NAddr::IRemoteAddrRef GetListenerSockAddrRef() const noexcept; + TInstant AcceptMoment() const noexcept; + + bool IsLocal() const; + bool CheckLoopback(); + void ProcessFailRequest(int failstate); + + void ReleaseConnection(); + + void ResetConnection(); + +private: + /* + * Processes the request after 'connection' been created and 'Headers' been read + * Returns 'false' if the processing must be continued by the next handler, + * 'true' otherwise ('this' will be deleted) + */ + virtual bool Reply(void* ThreadSpecificResource); + void Process(void* ThreadSpecificResource) override; + +public: + TVector<std::pair<TString, TString>> ParsedHeaders; + TString RequestString; + +private: + THolder<TClientConnection> Conn_; + THolder<THttpServerConn> HttpConn_; +}; + +class TRequestReplier: public TClientRequest { +public: + TRequestReplier(); + ~TRequestReplier() override; + + struct TReplyParams { + void* ThreadSpecificResource; + THttpInput& Input; + THttpOutput& Output; + }; + + /* + * Processes the request after 'connection' been created and 'Headers' been read + * Returns 'false' if the processing must be continued by the next handler, + * 'true' otherwise ('this' will be deleted) + */ + virtual bool DoReply(const TReplyParams& params) = 0; + +private: + bool Reply(void* threadSpecificResource) final; + + using TClientRequest::Input; + using TClientRequest::Output; +}; + +bool TryToBindAddresses(const THttpServerOptions& options, const std::function<void(TSocket)>* callbackOnBoundAddress = nullptr); diff --git a/library/cpp/http/server/http_ex.cpp b/library/cpp/http/server/http_ex.cpp new file mode 100644 index 00000000000..e07db22bfc8 --- /dev/null +++ b/library/cpp/http/server/http_ex.cpp @@ -0,0 +1,107 @@ +#include "http_ex.h" + +#include <util/generic/buffer.h> +#include <util/generic/cast.h> +#include <util/stream/null.h> + +bool THttpClientRequestExtension::Parse(char* req, TBaseServerRequestData& rd) { + rd.SetSocket(Socket()); + + if (!rd.Parse(req)) { + Output() << "HTTP/1.1 403 Forbidden\r\n" + "Content-Type: text/plain\r\n" + "Content-Length: 39\r\n" + "\r\n" + "The server cannot be used as a proxy.\r\n"; + + return false; + } + + return true; +} + +bool THttpClientRequestExtension::ProcessHeaders(TBaseServerRequestData& rd, TBlob& postData) { + for (const auto& header : ParsedHeaders) { + rd.AddHeader(header.first, header.second); + } + + char* s = RequestString.begin(); + + enum EMethod { + NotImplemented, + Get, + Post, + Put, + Patch, + Delete, + }; + + enum EMethod foundMethod; + char* urlStart; + + if (strnicmp(s, "GET ", 4) == 0) { + foundMethod = Get; + urlStart = s + 4; + } else if (strnicmp(s, "POST ", 5) == 0) { + foundMethod = Post; + urlStart = s + 5; + } else if (strnicmp(s, "PUT ", 4) == 0) { + foundMethod = Put; + urlStart = s + 4; + } else if (strnicmp(s, "PATCH ", 6) == 0) { + foundMethod = Patch; + urlStart = s + 6; + } else if (strnicmp(s, "DELETE ", 7) == 0) { + foundMethod = Delete; + urlStart = s + 7; + } else { + foundMethod = NotImplemented; + } + + switch (foundMethod) { + case Get: + case Delete: + if (!Parse(urlStart, rd)) { + return false; + } + break; + + case Post: + case Put: + case Patch: + try { + ui64 contentLength = 0; + if (Input().HasExpect100Continue()) { + Output().SendContinue(); + } + + if (!Input().ContentEncoded() && Input().GetContentLength(contentLength)) { + if (contentLength > HttpServ()->Options().MaxInputContentLength) { + Output() << "HTTP/1.1 413 Payload Too Large\r\nContent-Length:0\r\n\r\n"; + Output().Finish(); + return false; + } + + TBuffer buf(SafeIntegerCast<size_t>(contentLength)); + buf.Resize(Input().Load(buf.Data(), (size_t)contentLength)); + postData = TBlob::FromBuffer(buf); + } else { + postData = TBlob::FromStream(Input()); + } + } catch (...) { + Output() << "HTTP/1.1 400 Bad request\r\n\r\n"; + return false; + } + + if (!Parse(urlStart, rd)) { + return false; + } + break; + + case NotImplemented: + Output() << "HTTP/1.1 501 Not Implemented\r\n\r\n"; + return false; + } + + return true; +} diff --git a/library/cpp/http/server/http_ex.h b/library/cpp/http/server/http_ex.h new file mode 100644 index 00000000000..1ef43ea4fd5 --- /dev/null +++ b/library/cpp/http/server/http_ex.h @@ -0,0 +1,28 @@ +#pragma once + +#include "http.h" + +#include <library/cpp/http/misc/httpreqdata.h> + +class THttpClientRequestExtension: public TClientRequest { +public: + bool Parse(char* req, TBaseServerRequestData& rd); + bool ProcessHeaders(TBaseServerRequestData& rd, TBlob& postData); +}; + +template <class TRequestData> +class THttpClientRequestExtImpl: public THttpClientRequestExtension { +protected: + bool Parse(char* req) { + return THttpClientRequestExtension::Parse(req, RD); + } + bool ProcessHeaders() { + return THttpClientRequestExtension::ProcessHeaders(RD, Buf); + } + +protected: + TRequestData RD; + TBlob Buf; +}; + +using THttpClientRequestEx = THttpClientRequestExtImpl<TServerRequestData>; diff --git a/library/cpp/http/server/http_ut.cpp b/library/cpp/http/server/http_ut.cpp new file mode 100644 index 00000000000..cc62bb988e7 --- /dev/null +++ b/library/cpp/http/server/http_ut.cpp @@ -0,0 +1,739 @@ +#include "http.h" +#include "http_ex.h" + +#include <library/cpp/testing/unittest/registar.h> +#include <library/cpp/testing/unittest/tests_data.h> + +#include <util/generic/cast.h> +#include <util/stream/output.h> +#include <util/stream/zlib.h> +#include <util/system/datetime.h> +#include <util/system/sem.h> + +Y_UNIT_TEST_SUITE(THttpServerTest) { + class TEchoServer: public THttpServer::ICallBack { + class TRequest: public THttpClientRequestEx { + public: + inline TRequest(TEchoServer* parent) + : Parent_(parent) + { + } + + bool Reply(void* /*tsr*/) override { + if (!ProcessHeaders()) { + return true; + } + + Output() << "HTTP/1.1 200 Ok\r\n\r\n"; + if (Buf.Size()) { + Output().Write(Buf.AsCharPtr(), Buf.Size()); + } else { + Output() << Parent_->Res_; + } + Output().Finish(); + + return true; + } + + private: + TEchoServer* Parent_ = nullptr; + }; + + public: + inline TEchoServer(const TString& res) + : Res_(res) + { + } + + TClientRequest* CreateClient() override { + return new TRequest(this); + } + + private: + TString Res_; + }; + + class TSleepingServer: public THttpServer::ICallBack { + class TReplier: public TRequestReplier { + public: + inline TReplier(TSleepingServer* server) + : Server(server) + { + } + + bool DoReply(const TReplyParams& params) override { + Server->FreeThread(); + Server->Busy(1); + params.Output.Write("HTTP/1.0 201 Created\nX-Server: sleeping server\n\nZoooo"); + params.Output.Finish(); + Server->Replies->Inc(); + return true; + } + + private: + TSleepingServer* Server = nullptr; + }; + + public: + inline TSleepingServer(unsigned int size) + : Semaphore("conns", size) + , Semaphore2("threads", 1) + , Replies(new TAtomicCounter()) + , MaxConns(new TAtomicCounter()) + { + } + + void ResetCounters() { + Replies.Reset(new TAtomicCounter()); + MaxConns.Reset(new TAtomicCounter()); + } + + long RepliesCount() const { + return Replies->Val(); + } + + long MaxConnsCount() const { + return MaxConns->Val(); + } + + TClientRequest* CreateClient() override { + return new TReplier(this); + } + + void OnMaxConn() override { + MaxConns->Inc(); + } + + void OnFailRequest(int) override { + FreeThread(); + Busy(1); + } + + void Busy(int count) { + while (count-- > 0) { + Semaphore.Acquire(); + } + } + + void BusyThread() { + Semaphore2.Acquire(); + } + + void Free(int count) { + while (count-- > 0) { + Semaphore.Release(); + } + } + + void FreeThread() { + Semaphore2.Release(); + } + + private: + TSemaphore Semaphore; + TSemaphore Semaphore2; + THolder<TAtomicCounter> Replies; + THolder<TAtomicCounter> MaxConns; + }; + + static const TString CrLf = "\r\n"; + + struct TTestRequest { + TTestRequest(ui16 port, TString content = TString()) + : Port(port) + , Content(std::move(content)) + { + } + + void CheckContinue(TSocketInput& si) { + if (Expect100Continue) { + TStringStream ss; + TString firstLine; + si.ReadLine(firstLine); + for (;;) { + TString buf; + si.ReadLine(buf); + if (buf.size() == 0) { + break; + } + ss << buf << CrLf; + } + UNIT_ASSERT_EQUAL(firstLine, "HTTP/1.1 100 Continue"); + } + } + + TString Execute() { + TSocket* s = nullptr; + THolder<TSocket> singleReqSocket; + if (KeepAliveConnection) { + if (!KeepAlivedSocket) { + KeepAlivedSocket = MakeHolder<TSocket>(TNetworkAddress("localhost", Port), TDuration::Seconds(10)); + } + s = KeepAlivedSocket.Get(); + } else { + TNetworkAddress addr("localhost", Port); + singleReqSocket.Reset(new TSocket(addr, TDuration::Seconds(10))); + s = singleReqSocket.Get(); + } + bool isPost = Type == "POST"; + TSocketInput si(*s); + + if (UseHttpOutput) { + TSocketOutput so(*s); + THttpOutput output(&so); + + output.EnableKeepAlive(KeepAliveConnection); + output.EnableCompression(EnableResponseEncoding); + + TStringStream r; + r << Type << " / HTTP/1.1" << CrLf; + r << "Host: localhost:" + ToString(Port) << CrLf; + if (isPost) { + if (ContentEncoding.size()) { + r << "Content-Encoding: " << ContentEncoding << CrLf; + } else { + r << "Transfer-Encoding: chunked" << CrLf; + } + if (Expect100Continue) { + r << "Expect: 100-continue" << CrLf; + } + } + + r << CrLf; + if (isPost) { + output.Write(r.Str()); + output.Flush(); + CheckContinue(si); + output.Write(Content); + output.Finish(); + } else { + output.Write(r.Str()); + output.Finish(); + } + } else { + TStringStream r; + r << Type << " / HTTP/1.1" << CrLf; + r << "Host: localhost:" + ToString(Port) << CrLf; + if (KeepAliveConnection) { + r << "Connection: Keep-Alive" << CrLf; + } else { + r << "Connection: Close" << CrLf; + } + if (EnableResponseEncoding) { + r << "Accept-Encoding: gzip, deflate, x-gzip, x-deflate, y-lzo, y-lzf, y-lzq, y-bzip2, y-lzma" << CrLf; + } + if (isPost && Expect100Continue) { + r << "Expect: 100-continue" << CrLf; + } + if (isPost && ContentEncoding.size() && Content.size()) { + r << "Content-Encoding: " << ContentEncoding << CrLf; + TStringStream compressedContent; + { + TZLibCompress zlib(&compressedContent); + zlib.Write(Content.data(), Content.size()); + zlib.Flush(); + zlib.Finish(); + } + r << "Content-Length: " << compressedContent.Size() << CrLf; + r << CrLf; + s->Send(r.Data(), r.Size()); + CheckContinue(si); + Hdr = r.Str(); + TString tosend = compressedContent.Str(); + s->Send(tosend.data(), tosend.size()); + } else { + if (isPost) { + r << "Content-Length: " << Content.size() << CrLf; + r << CrLf; + s->Send(r.Data(), r.Size()); + CheckContinue(si); + Hdr = r.Str(); + s->Send(Content.data(), Content.size()); + } else { + r << CrLf; + Hdr = r.Str(); + s->Send(r.Data(), r.Size()); + } + } + } + + THttpInput input(&si); + TStringStream ss; + TransferData(&input, &ss); + + return ss.Str(); + } + + TString GetDescription() const { + if (UseHttpOutput) { + TStringStream ss; + ss << (KeepAliveConnection ? "keep-alive " : "") << Type; + if (ContentEncoding.size()) { + ss << " with encoding=" << ContentEncoding; + } + return ss.Str(); + } else { + return Hdr; + } + } + + ui16 Port = 0; + bool UseHttpOutput = true; + TString Type = "GET"; + TString ContentEncoding; + TString Content; + bool KeepAliveConnection = false; + THolder<TSocket> KeepAlivedSocket; + bool EnableResponseEncoding = false; + TString Hdr; + bool Expect100Continue = false; + }; + + class TFailingMtpQueue: public TSimpleThreadPool { + private: + bool FailOnAdd_ = false; + + public: + void SetFailOnAdd(bool fail = true) { + FailOnAdd_ = fail; + } + [[nodiscard]] bool Add(IObjectInQueue* pObj) override { + if (FailOnAdd_) { + return false; + } + + return TSimpleThreadPool::Add(pObj); + } + TFailingMtpQueue() = default; + TFailingMtpQueue(IThreadFactory* pool) + : TSimpleThreadPool(pool) + { + } + }; + + TString TestData(size_t size = 5 * 4096) { + TString res; + + for (size_t i = 0; i < size; ++i) { + res += (char)i; + } + return res; + } + + Y_UNIT_TEST(TestEchoServer) { + TString res = TestData(); + TPortManager pm; + const ui16 port = pm.GetPort(); + const bool trueFalse[] = {true, false}; + + TEchoServer serverImpl(res); + THttpServer server(&serverImpl, THttpServer::TOptions(port).EnableKeepAlive(true).EnableCompression(true)); + + for (int i = 0; i < 2; ++i) { + UNIT_ASSERT(server.Start()); + + TTestRequest r(port); + r.Content = res; + + for (bool keepAlive : trueFalse) { + r.KeepAliveConnection = keepAlive; + + // THttpOutput use chunked stream, else use Content-Length + for (bool useHttpOutput : trueFalse) { + r.UseHttpOutput = useHttpOutput; + + for (bool enableResponseEncoding : trueFalse) { + r.EnableResponseEncoding = enableResponseEncoding; + + const TString reqTypes[] = {"GET", "POST"}; + for (const TString& reqType : reqTypes) { + r.Type = reqType; + + const TString encoders[] = {"", "deflate"}; + for (const TString& encoder : encoders) { + r.ContentEncoding = encoder; + + for (bool expect100Continue : trueFalse) { + r.Expect100Continue = expect100Continue; + TString resp = r.Execute(); + UNIT_ASSERT_C(resp == res, "diff echo response for request:\n" + r.GetDescription()); + } + } + } + } + } + } + + server.Stop(); + } + } + + Y_UNIT_TEST(TestReusePortEnabled) { + if (!IsReusePortAvailable()) { + return; // skip test + } + TString res = TestData(); + TPortManager pm; + const ui16 port = pm.GetPort(); + + TEchoServer serverImpl(res); + TVector<THolder<THttpServer>> servers; + for (ui32 i = 0; i < 10; i++) { + servers.push_back(MakeHolder<THttpServer>(&serverImpl, THttpServer::TOptions(port).EnableReusePort(true))); + } + + for (ui32 testRun = 0; testRun < 3; testRun++) { + for (auto& server : servers) { + // start servers one at a time and check at least one of them is replying + UNIT_ASSERT(server->Start()); + + TTestRequest r(port, res); + UNIT_ASSERT_C(r.Execute() == res, "diff echo response for request:\n" + r.GetDescription()); + } + + for (auto& server : servers) { + // ping servers and stop them one at a time + // at the last iteration only one server is still working and then gets stopped as well + + TTestRequest r(port, res); + UNIT_ASSERT_C(r.Execute() == res, "diff echo response for request:\n" + r.GetDescription()); + + server->Stop(); + } + } + } + + Y_UNIT_TEST(TestReusePortDisabled) { + // check that with the ReusePort option disabled it's impossible to start two servers on the same port + // check that ReusePort option is disabled by default (don't set it explicitly in the test) + TPortManager pm; + const ui16 port = pm.GetPort(); + + TEchoServer serverImpl(TString{}); + THttpServer server1(&serverImpl, THttpServer::TOptions(port)); + THttpServer server2(&serverImpl, THttpServer::TOptions(port)); + + UNIT_ASSERT(true == server1.Start()); + UNIT_ASSERT(false == server2.Start()); + + server1.Stop(); + // Stop() is a sync call, port should be free by now + UNIT_ASSERT(true == server2.Start()); + UNIT_ASSERT(false == server1.Start()); + } + + Y_UNIT_TEST(TestFailServer) { + /** + * Emulate request processing failures + * Data should be large enough not to fit into socket buffer + **/ + TString res = TestData(10 * 1024 * 1024); + TPortManager portManager; + const ui16 port = portManager.GetPort(); + TEchoServer serverImpl(res); + THttpServer::TOptions options(port); + options.EnableKeepAlive(true); + options.EnableCompression(true); + using TFailingServerMtpQueue = TThreadPoolBinder<TFailingMtpQueue, THttpServer::ICallBack>; + THttpServer::TMtpQueueRef mainWorkers = new TFailingServerMtpQueue(&serverImpl, SystemThreadFactory()); + THttpServer::TMtpQueueRef failWorkers = new TThreadPool(SystemThreadFactory()); + THttpServer server(&serverImpl, mainWorkers, failWorkers, options); + + UNIT_ASSERT(server.Start()); + for (size_t i = 0; i < 3; ++i) { + // should fail on 2nd request + static_cast<TFailingMtpQueue*>(mainWorkers.Get())->SetFailOnAdd(i == 1); + TTestRequest r(port); + r.Content = res; + r.Type = "POST"; + TString resp = r.Execute(); + if (i == 1) { + UNIT_ASSERT(resp.Contains("Service Unavailable")); + } else { + UNIT_ASSERT_C(resp == res, "diff echo response for request:\n" + r.GetDescription()); + } + } + server.Stop(); + } + + class TReleaseConnectionServer: public THttpServer::ICallBack { + class TRequest: public THttpClientRequestEx { + public: + bool Reply(void* /*tsr*/) override { + Output() << "HTTP/1.1 200 Ok\r\n\r\n"; + Output() << "reply"; + Output().Finish(); + + ReleaseConnection(); + + throw yexception() << "some error"; + + return true; + } + }; + + public: + TClientRequest* CreateClient() override { + return new TRequest(); + } + + void OnException() override { + ExceptionMessage = CurrentExceptionMessage(); + } + + TString ExceptionMessage; + }; + + class TResetConnectionServer: public THttpServer::ICallBack { + class TRequest: public TClientRequest { + public: + bool Reply(void* /*tsr*/) override { + Output() << "HTTP/1.1"; + ResetConnection(); + + return true; + } + }; + + public: + TClientRequest* CreateClient() override { + return new TRequest(); + } + + void OnException() override { + ExceptionMessage = CurrentExceptionMessage(); + } + + TString ExceptionMessage; + }; + + class TListenerSockAddrReplyServer: public THttpServer::ICallBack { + class TRequest: public TClientRequest { + public: + bool Reply(void* /*tsr*/) override { + Output() << "HTTP/1.1 200 Ok\r\n\r\n"; + Output() << PrintHostAndPort(*GetListenerSockAddrRef()); + + Output().Finish(); + + return true; + } + }; + + public: + TClientRequest* CreateClient() override { + return new TRequest(); + } + }; + + Y_UNIT_TEST(TTestResetConnection) { + TPortManager pm; + const ui16 port = pm.GetPort(); + + TResetConnectionServer serverImpl; + THttpServer server(&serverImpl, THttpServer::TOptions(port)); + UNIT_ASSERT(server.Start()); + + TTestRequest r(port, "request"); + + UNIT_ASSERT_EXCEPTION_CONTAINS(r.Execute(), TSystemError, "Connection reset by peer"); + + server.Stop(); + }; + + Y_UNIT_TEST(TTestReleaseConnection) { + TPortManager pm; + const ui16 port = pm.GetPort(); + + TReleaseConnectionServer serverImpl; + THttpServer server(&serverImpl, THttpServer::TOptions(port).EnableKeepAlive(true)); + UNIT_ASSERT(server.Start()); + + TTestRequest r(port, "request"); + r.KeepAliveConnection = true; + + UNIT_ASSERT_C(r.Execute() == "reply", "diff echo response for request:\n" + r.GetDescription()); + + server.Stop(); + + UNIT_ASSERT_STRINGS_EQUAL(serverImpl.ExceptionMessage, "(yexception) some error"); + }; + + THttpInput SendRequest(TSocket& socket, ui16 port) { + TSocketInput si(socket); + TSocketOutput so(socket); + THttpOutput out(&so); + out.EnableKeepAlive(true); + out << "GET / HTTP/1.1" << CrLf; + out << "Host: localhost:" + ToString(port) << CrLf; + out << CrLf; + out.Flush(); + + THttpInput input(&si); + input.ReadAll(); + return input; + } + + THttpInput SendRequestWithBody(TSocket& socket, ui16 port, TString body) { + TSocketInput si(socket); + TSocketOutput so(socket); + THttpOutput out(&so); + out << "POST / HTTP/1.1" << CrLf; + out << "Host: localhost:" + ToString(port) << CrLf; + out << "Content-Length: " + ToString(body.size()) << CrLf; + out << CrLf; + out << body; + out.Flush(); + + THttpInput input(&si); + input.ReadAll(); + return input; + } + + Y_UNIT_TEST(TTestExpirationTimeout) { + TPortManager pm; + const ui16 port = pm.GetPort(); + + TEchoServer serverImpl("test_data"); + THttpServer::TOptions options(port); + options.nThreads = 1; + options.MaxQueueSize = 0; + options.MaxConnections = 0; + options.KeepAliveEnabled = true; + options.ExpirationTimeout = TDuration::Seconds(1); + options.PollTimeout = TDuration::MilliSeconds(100); + THttpServer server(&serverImpl, options); + UNIT_ASSERT(server.Start()); + + TSocket socket(TNetworkAddress("localhost", port), TDuration::Seconds(10)); + + SendRequest(socket, port); + SendRequest(socket, port); + + Sleep(TDuration::Seconds(5)); + UNIT_ASSERT_EXCEPTION(SendRequest(socket, port), THttpReadException); + + server.Stop(); + } + + Y_UNIT_TEST(TTestContentLengthTooLarge) { + TPortManager pm; + const ui16 port = pm.GetPort(); + + TEchoServer serverImpl("test_data"); + THttpServer::TOptions options(port); + options.nThreads = 1; + options.MaxQueueSize = 0; + options.MaxInputContentLength = 2_KB; + options.MaxConnections = 0; + options.KeepAliveEnabled = false; + options.ExpirationTimeout = TDuration::Seconds(1); + options.PollTimeout = TDuration::MilliSeconds(100); + THttpServer server(&serverImpl, options); + UNIT_ASSERT(server.Start()); + + TSocket socket(TNetworkAddress("localhost", port), TDuration::Seconds(5)); + UNIT_ASSERT_STRING_CONTAINS(SendRequestWithBody(socket, port, TString(1_KB, 'a')).FirstLine(), "HTTP/1.1 200 Ok"); + + TSocket socket2(TNetworkAddress("localhost", port), TDuration::Seconds(5)); + UNIT_ASSERT_STRING_CONTAINS(SendRequestWithBody(socket2, port, TString(10_KB, 'a')).FirstLine(), "HTTP/1.1 413 Payload Too Large"); + + server.Stop(); + } + + + Y_UNIT_TEST(TTestCloseConnectionOnRequestLimit) { + TPortManager pm; + const ui16 port = pm.GetPort(); + + TEchoServer serverImpl("test_data"); + THttpServer server(&serverImpl, THttpServer::TOptions(port).EnableKeepAlive(true).SetMaxRequestsPerConnection(2)); + UNIT_ASSERT(server.Start()); + + TSocket socket(TNetworkAddress("localhost", port), TDuration::Seconds(10)); + + UNIT_ASSERT(SendRequest(socket, port).IsKeepAlive()); + UNIT_ASSERT(!SendRequest(socket, port).IsKeepAlive()); + + UNIT_ASSERT_EXCEPTION(SendRequest(socket, port), THttpReadException); + + server.Stop(); + } + + Y_UNIT_TEST(TTestListenerSockAddrConnection) { + TPortManager pm; + const ui16 port1 = pm.GetPort(); + const ui16 port2 = pm.GetPort(); + + TListenerSockAddrReplyServer serverImpl; + THttpServer server(&serverImpl, THttpServer::TOptions().EnableKeepAlive(true).AddBindAddress("127.0.0.1", port1).AddBindAddress("127.0.0.1", port2)); + UNIT_ASSERT(server.Start()); + + TTestRequest r1(port1); + r1.KeepAliveConnection = true; + + TString resp = r1.Execute(); + UNIT_ASSERT(resp == TString::Join("127.0.0.1", ":", ToString(port1))); + + TTestRequest r2(port2); + r2.KeepAliveConnection = true; + + resp = r2.Execute(); + UNIT_ASSERT(resp == TString::Join("127.0.0.1", ":", ToString(port2))); + + server.Stop(); + }; + +#if 0 + Y_UNIT_TEST(TestSocketsLeak) { + const bool trueFalse[] = {true, false}; + TPortManager portManager; + const ui16 port = portManager.GetPort(); + TString res = TestData(25); + TSleepingServer server(3); + THttpServer::TOptions options(port); + options.MaxConnections = 1; + options.MaxQueueSize = 1; + options.MaxFQueueSize = 2; + options.nFThreads = 2; + options.KeepAliveEnabled = true; + options.RejectExcessConnections = true; + THttpServer srv(&server, options); + UNIT_ASSERT(srv.Start()); + + for (bool keepAlive : trueFalse) { + server.ResetCounters(); + TVector<TAutoPtr<IThreadFactory::IThread>> threads; + + server.Busy(3); + server.BusyThread(); + + for (size_t i = 0; i < 3; ++i) { + auto func = [&server, port, keepAlive]() { + server.BusyThread(); + THolder<TTestRequest> r = MakeHolder<TTestRequest>(port); + r->KeepAliveConnection = keepAlive; + r->Execute(); + }; + threads.push_back(SystemThreadFactory()->Run(func)); + } + + server.FreeThread(); // all threads get connection & go to processing + Sleep(TDuration::MilliSeconds(100)); + server.BusyThread(); // we wait while connections are established by the + // system and accepted by the server + server.Free(3); // we release all connections processing + + for (auto&& thread : threads) { + thread->Join(); + } + + server.Free(3); + server.FreeThread(); + + UNIT_ASSERT_EQUAL_C(server.MaxConnsCount(), 2, "we should get MaxConn notification 2 times, got " + ToString(server.MaxConnsCount())); + UNIT_ASSERT_EQUAL_C(server.RepliesCount(), 1, "only one request should have been processed, got " + ToString(server.RepliesCount())); + } + } +#endif +} diff --git a/library/cpp/http/server/options.cpp b/library/cpp/http/server/options.cpp new file mode 100644 index 00000000000..05c954384a2 --- /dev/null +++ b/library/cpp/http/server/options.cpp @@ -0,0 +1,43 @@ +#include "options.h" + +#include <util/string/cast.h> +#include <util/digest/numeric.h> +#include <util/network/ip.h> +#include <util/network/socket.h> +#include <util/generic/hash_set.h> +#include <util/generic/yexception.h> + +using TAddr = THttpServerOptions::TAddr; + +static inline TString AddrToString(const TAddr& addr) { + return addr.Addr + ":" + ToString(addr.Port); +} + +static inline TNetworkAddress ToNetworkAddr(const TString& address, ui16 port) { + if (address.empty() || address == TStringBuf("*")) { + return TNetworkAddress(port); + } + + return TNetworkAddress(address, port); +} + +void THttpServerOptions::BindAddresses(TBindAddresses& ret) const { + THashSet<TString> check; + + for (auto addr : BindSockaddr) { + if (!addr.Port) { + addr.Port = Port; + } + + const TString straddr = AddrToString(addr); + + if (check.find(straddr) == check.end()) { + check.insert(straddr); + ret.push_back(ToNetworkAddr(addr.Addr, addr.Port)); + } + } + + if (ret.empty()) { + ret.push_back(Host ? TNetworkAddress(Host, Port) : TNetworkAddress(Port)); + } +} diff --git a/library/cpp/http/server/options.h b/library/cpp/http/server/options.h new file mode 100644 index 00000000000..38eda0e5e78 --- /dev/null +++ b/library/cpp/http/server/options.h @@ -0,0 +1,176 @@ +#pragma once + +#include <util/network/ip.h> +#include <util/network/init.h> +#include <util/network/address.h> +#include <util/generic/size_literals.h> +#include <util/generic/string.h> +#include <util/generic/vector.h> +#include <util/datetime/base.h> + +class THttpServerOptions { +public: + inline THttpServerOptions(ui16 port = 17000) noexcept + : Port(port) + { + } + + using TBindAddresses = TVector<TNetworkAddress>; + void BindAddresses(TBindAddresses& ret) const; + + inline THttpServerOptions& AddBindAddress(const TString& address, ui16 port) { + const TAddr addr = { + address, + port, + }; + + BindSockaddr.push_back(addr); + return *this; + } + + inline THttpServerOptions& AddBindAddress(const TString& address) { + return AddBindAddress(address, 0); + } + + inline THttpServerOptions& EnableKeepAlive(bool enable) noexcept { + KeepAliveEnabled = enable; + + return *this; + } + + inline THttpServerOptions& EnableCompression(bool enable) noexcept { + CompressionEnabled = enable; + + return *this; + } + + inline THttpServerOptions& EnableRejectExcessConnections(bool enable) noexcept { + RejectExcessConnections = enable; + + return *this; + } + + inline THttpServerOptions& EnableReusePort(bool enable) noexcept { + ReusePort = enable; + + return *this; + } + + inline THttpServerOptions& EnableReuseAddress(bool enable) noexcept { + ReuseAddress = enable; + + return *this; + } + + inline THttpServerOptions& SetThreads(ui32 threads) noexcept { + nThreads = threads; + + return *this; + } + + /// Default interface name to bind the server. Used when none of BindAddress are provided. + inline THttpServerOptions& SetHost(const TString& host) noexcept { + Host = host; + + return *this; + } + + /// Default port to bind the server. Used when none of BindAddress are provided. + inline THttpServerOptions& SetPort(ui16 port) noexcept { + Port = port; + + return *this; + } + + inline THttpServerOptions& SetMaxConnections(ui32 mc = 0) noexcept { + MaxConnections = mc; + + return *this; + } + + inline THttpServerOptions& SetMaxQueueSize(ui32 mqs = 0) noexcept { + MaxQueueSize = mqs; + + return *this; + } + + inline THttpServerOptions& SetClientTimeout(const TDuration& timeout) noexcept { + ClientTimeout = timeout; + + return *this; + } + + inline THttpServerOptions& SetListenBacklog(int val) noexcept { + ListenBacklog = val; + + return *this; + } + + inline THttpServerOptions& SetOutputBufferSize(size_t val) noexcept { + OutputBufferSize = val; + + return *this; + } + + inline THttpServerOptions& SetMaxInputContentLength(ui64 val) noexcept { + MaxInputContentLength = val; + + return *this; + } + + inline THttpServerOptions& SetMaxRequestsPerConnection(size_t val) noexcept { + MaxRequestsPerConnection = val; + + return *this; + } + + /// Use TElasticQueue instead of TThreadPool for request queues + inline THttpServerOptions& EnableElasticQueues(bool enable) noexcept { + UseElasticQueues = enable; + + return *this; + } + + inline THttpServerOptions& SetThreadsName(const TString& listenThreadName, const TString& requestsThreadName, const TString& failRequestsThreadName) noexcept { + ListenThreadName = listenThreadName; + RequestsThreadName = requestsThreadName; + FailRequestsThreadName = failRequestsThreadName; + + return *this; + } + + struct TAddr { + TString Addr; + ui16 Port; + }; + + typedef TVector<TAddr> TAddrs; + + bool KeepAliveEnabled = true; + bool CompressionEnabled = false; + bool RejectExcessConnections = false; + bool ReusePort = false; // set SO_REUSEPORT socket option + bool ReuseAddress = true; // set SO_REUSEADDR socket option + TAddrs BindSockaddr; + ui16 Port = 17000; // The port on which to run the web server + TString Host; // DNS entry + const char* ServerName = "YWS/1.0"; // The Web server name to return in HTTP headers + ui32 nThreads = 0; // Thread count for requests processing + ui32 MaxQueueSize = 0; // Max allowed request count in queue + ui32 nFThreads = 1; + ui32 MaxFQueueSize = 0; + ui32 MaxConnections = 100; + int ListenBacklog = SOMAXCONN; + TDuration ClientTimeout; + size_t OutputBufferSize = 0; + ui64 MaxInputContentLength = sizeof(size_t) <= 4 ? 2_GB : 64_GB; + size_t MaxRequestsPerConnection = 0; // If keep-alive is enabled, request limit before connection is closed + bool UseElasticQueues = false; + + TDuration PollTimeout; // timeout of TSocketPoller::WaitT call + TDuration ExpirationTimeout; // drop inactive connections after ExpirationTimeout (should be > 0) + + TString ListenThreadName = "HttpListen"; + TString RequestsThreadName = "HttpServer"; + TString FailRequestsThreadName = "HttpServer"; +}; diff --git a/library/cpp/http/server/response.cpp b/library/cpp/http/server/response.cpp new file mode 100644 index 00000000000..52d64c91cec --- /dev/null +++ b/library/cpp/http/server/response.cpp @@ -0,0 +1,65 @@ +#include "response.h" + +#include <util/stream/output.h> +#include <util/stream/mem.h> +#include <util/string/cast.h> + +THttpResponse& THttpResponse::AddMultipleHeaders(const THttpHeaders& headers) { + for (THttpHeaders::TConstIterator i = headers.Begin(); i != headers.End(); ++i) { + this->Headers.AddHeader(*i); + } + return *this; +} + +THttpResponse& THttpResponse::SetContentType(const TStringBuf& contentType) { + Headers.AddOrReplaceHeader(THttpInputHeader("Content-Type", ToString(contentType))); + + return *this; +} + +void THttpResponse::OutTo(IOutputStream& os) const { + TVector<IOutputStream::TPart> parts; + const size_t FIRST_LINE_PARTS = 3; + const size_t HEADERS_PARTS = Headers.Count() * 4; + const size_t CONTENT_PARTS = 5; + parts.reserve(FIRST_LINE_PARTS + HEADERS_PARTS + CONTENT_PARTS); + + // first line + parts.push_back(IOutputStream::TPart(TStringBuf("HTTP/1.1 "))); + parts.push_back(IOutputStream::TPart(HttpCodeStrEx(Code))); + parts.push_back(IOutputStream::TPart::CrLf()); + + // headers + for (THttpHeaders::TConstIterator i = Headers.Begin(); i != Headers.End(); ++i) { + parts.push_back(IOutputStream::TPart(i->Name())); + parts.push_back(IOutputStream::TPart(TStringBuf(": "))); + parts.push_back(IOutputStream::TPart(i->Value())); + parts.push_back(IOutputStream::TPart::CrLf()); + } + + char buf[50]; + + if (!Content.empty()) { + TMemoryOutput mo(buf, sizeof(buf)); + + mo << Content.size(); + + parts.push_back(IOutputStream::TPart(TStringBuf("Content-Length: "))); + parts.push_back(IOutputStream::TPart(buf, mo.Buf() - buf)); + parts.push_back(IOutputStream::TPart::CrLf()); + } + + // content + parts.push_back(IOutputStream::TPart::CrLf()); + + if (!Content.empty()) { + parts.push_back(IOutputStream::TPart(Content)); + } + + os.Write(parts.data(), parts.size()); +} + +template <> +void Out<THttpResponse>(IOutputStream& os, const THttpResponse& resp) { + resp.OutTo(os); +} diff --git a/library/cpp/http/server/response.h b/library/cpp/http/server/response.h new file mode 100644 index 00000000000..a75cb85605f --- /dev/null +++ b/library/cpp/http/server/response.h @@ -0,0 +1,82 @@ +#pragma once + +#include <library/cpp/http/misc/httpcodes.h> +#include <library/cpp/http/io/stream.h> + +#include <util/generic/strbuf.h> +#include <util/string/cast.h> + +class THttpHeaders; +class IOutputStream; + +class THttpResponse { +public: + THttpResponse() noexcept + : Code(HTTP_OK) + { + } + + explicit THttpResponse(HttpCodes code) noexcept + : Code(code) + { + } + + template <typename ValueType> + THttpResponse& AddHeader(const TString& name, const ValueType& value) { + return AddHeader(THttpInputHeader(name, ToString(value))); + } + + THttpResponse& AddHeader(const THttpInputHeader& header) { + Headers.AddHeader(header); + + return *this; + } + + THttpResponse& AddMultipleHeaders(const THttpHeaders& headers); + + const THttpHeaders& GetHeaders() const { + return Headers; + } + + THttpResponse& SetContentType(const TStringBuf& contentType); + + /** + * @note If @arg content isn't empty its size is automatically added as a + * "Content-Length" header during output to IOutputStream. + * @see IOutputStream& operator << (IOutputStream&, const THttpResponse&) + */ + THttpResponse& SetContent(const TString& content) { + Content = content; + + return *this; + } + + TString GetContent() const { + return Content; + } + + /** + * @note If @arg content isn't empty its size is automatically added as a + * "Content-Length" header during output to IOutputStream. + * @see IOutputStream& operator << (IOutputStream&, const THttpResponse&) + */ + THttpResponse& SetContent(const TString& content, const TStringBuf& contentType) { + return SetContent(content).SetContentType(contentType); + } + + HttpCodes HttpCode() const { + return Code; + } + + THttpResponse& SetHttpCode(HttpCodes code) { + Code = code; + return *this; + } + + void OutTo(IOutputStream& out) const; + +private: + HttpCodes Code; + THttpHeaders Headers; + TString Content; +}; diff --git a/library/cpp/http/server/response_ut.cpp b/library/cpp/http/server/response_ut.cpp new file mode 100644 index 00000000000..73e2112ad36 --- /dev/null +++ b/library/cpp/http/server/response_ut.cpp @@ -0,0 +1,142 @@ +#include "response.h" + +#include <library/cpp/testing/unittest/registar.h> + +#include <util/string/cast.h> + +Y_UNIT_TEST_SUITE(TestHttpResponse) { + Y_UNIT_TEST(TestCodeOnly) { + UNIT_ASSERT_STRINGS_EQUAL(ToString(THttpResponse()), "HTTP/1.1 200 Ok\r\n\r\n"); + UNIT_ASSERT_STRINGS_EQUAL(ToString(THttpResponse(HTTP_NOT_FOUND)), "HTTP/1.1 404 Not found\r\n\r\n"); + } + + Y_UNIT_TEST(TestRedirect) { + THttpResponse resp = THttpResponse(HTTP_FOUND).AddHeader("Location", "yandex.ru"); + UNIT_ASSERT_STRINGS_EQUAL(ToString(resp), "HTTP/1.1 302 Moved temporarily\r\n" + "Location: yandex.ru\r\n" + "\r\n"); + } + + Y_UNIT_TEST(TestAddHeader) { + THttpResponse resp(HTTP_FORBIDDEN); + resp.AddHeader(THttpInputHeader("X-Header-1", "ValueOne")); + resp.AddHeader("X-Header-2", 10); + resp.AddHeader("X-Header-3", true); + + const char* EXPECTED = "HTTP/1.1 403 Forbidden\r\n" + "X-Header-1: ValueOne\r\n" + "X-Header-2: 10\r\n" + "X-Header-3: 1\r\n" + "\r\n"; + UNIT_ASSERT_STRINGS_EQUAL(ToString(resp), EXPECTED); + } + + Y_UNIT_TEST(TestAddMultipleHeaders) { + THttpHeaders headers; + headers.AddHeader(THttpInputHeader("X-Header-1", "ValueOne")); + headers.AddHeader(THttpInputHeader("X-Header-2", "ValueTwo")); + headers.AddHeader(THttpInputHeader("X-Header-3", "ValueThree")); + + const char* EXPECTED = "HTTP/1.1 403 Forbidden\r\n" + "X-Header-1: ValueOne\r\n" + "X-Header-2: ValueTwo\r\n" + "X-Header-3: ValueThree\r\n" + "\r\n"; + UNIT_ASSERT_STRINGS_EQUAL(ToString(THttpResponse(HTTP_FORBIDDEN).AddMultipleHeaders(headers)), + EXPECTED); + } + + Y_UNIT_TEST(TestGetHeaders) { + THttpResponse resp(HTTP_FORBIDDEN); + + THttpHeaders headers; + headers.AddHeader(THttpInputHeader("X-Header-1", "ValueOne")); + headers.AddHeader(THttpInputHeader("X-Header-2", "ValueTwo")); + headers.AddHeader(THttpInputHeader("X-Header-3", "ValueThree")); + resp.AddMultipleHeaders(headers); + resp.AddHeader("X-Header-4", "ValueFour"); + + const THttpHeaders& gotHeaders = resp.GetHeaders(); + UNIT_ASSERT_VALUES_EQUAL(gotHeaders.Count(), 4); + UNIT_ASSERT(gotHeaders.HasHeader("X-Header-1")); + UNIT_ASSERT_STRINGS_EQUAL(gotHeaders.FindHeader("X-Header-1")->Value(), "ValueOne"); + UNIT_ASSERT(gotHeaders.HasHeader("X-Header-4")); + UNIT_ASSERT_STRINGS_EQUAL(gotHeaders.FindHeader("X-Header-4")->Value(), "ValueFour"); + } + + + Y_UNIT_TEST(TestSetContent) { + const char* EXPECTED = "HTTP/1.1 200 Ok\r\n" + "Content-Length: 10\r\n" + "\r\n" + "0123456789"; + UNIT_ASSERT_STRINGS_EQUAL(ToString(THttpResponse().SetContent("0123456789")), + EXPECTED); + } + + Y_UNIT_TEST(TestSetContentWithContentType) { + const char* EXPECTED = "HTTP/1.1 200 Ok\r\n" + "Content-Type: text/xml\r\n" + "Content-Length: 28\r\n" + "\r\n" + "<xml><tag value=\"1\" /></xml>"; + THttpResponse resp; + resp.SetContent("<xml><tag value=\"1\" /></xml>").SetContentType("text/xml"); + UNIT_ASSERT_STRINGS_EQUAL(ToString(resp), EXPECTED); + } + + Y_UNIT_TEST(TestCopyConstructor) { + THttpResponse resp(HTTP_FORBIDDEN); + resp.AddHeader(THttpInputHeader("X-Header-1", "ValueOne")) + .AddHeader("X-Header-2", "ValueTwo") + .AddHeader(THttpInputHeader("X-Header-3", "ValueThree")) + .SetContent("Some stuff") + .SetContentType("text/plain"); + + THttpResponse copy = resp; + UNIT_ASSERT_STRINGS_EQUAL(ToString(copy), ToString(resp)); + } + + Y_UNIT_TEST(TestAssignment) { + THttpResponse resp(HTTP_FORBIDDEN); + resp.AddHeader(THttpInputHeader("X-Header-1", "ValueOne")); + resp.AddHeader(THttpInputHeader("X-Header-2", "ValueTwo")); + resp.AddHeader(THttpInputHeader("X-Header-3", "ValueThree")); + resp.SetContent("Some stuff").SetContentType("text/plain"); + + THttpResponse copy; + copy = resp; + UNIT_ASSERT_STRINGS_EQUAL(ToString(copy), ToString(resp)); + } + + Y_UNIT_TEST(TestEmptyContent) { + UNIT_ASSERT_STRINGS_EQUAL(ToString(THttpResponse().SetContent("")), "HTTP/1.1 200 Ok\r\n\r\n"); + } + + Y_UNIT_TEST(TestReturnReference) { + THttpResponse resp; + UNIT_ASSERT_EQUAL(&resp, &resp.AddHeader("Header1", 1)); + UNIT_ASSERT_EQUAL(&resp, &resp.AddHeader(THttpInputHeader("Header2", "2"))); + + THttpHeaders headers; + headers.AddHeader(THttpInputHeader("Header3", "3")); + headers.AddHeader(THttpInputHeader("Header4", "4")); + UNIT_ASSERT_EQUAL(&resp, &resp.AddMultipleHeaders(headers)); + + UNIT_ASSERT_EQUAL(&resp, &resp.SetContent("some stuff")); + UNIT_ASSERT_EQUAL(&resp, &resp.SetContent("some other stuff").SetContentType("text/plain")); + } + + Y_UNIT_TEST(TestSetContentType) { + const char* EXPECTED = "HTTP/1.1 200 Ok\r\n" + "Content-Type: text/xml\r\n" + "Content-Length: 28\r\n" + "\r\n" + "<xml><tag value=\"1\" /></xml>"; + THttpResponse resp; + resp.SetContent("<xml><tag value=\"1\" /></xml>") + .SetContentType("application/json") + .SetContentType("text/xml"); + UNIT_ASSERT_STRINGS_EQUAL(ToString(resp), EXPECTED); + } +} diff --git a/library/cpp/http/server/ut/ya.make b/library/cpp/http/server/ut/ya.make new file mode 100644 index 00000000000..bcb4d4c0b88 --- /dev/null +++ b/library/cpp/http/server/ut/ya.make @@ -0,0 +1,12 @@ +UNITTEST_FOR(library/cpp/http/server) + +OWNER(pg) + +SIZE(MEDIUM) + +SRCS( + http_ut.cpp + response_ut.cpp +) + +END() diff --git a/library/cpp/http/server/ya.make b/library/cpp/http/server/ya.make new file mode 100644 index 00000000000..bae6f33306b --- /dev/null +++ b/library/cpp/http/server/ya.make @@ -0,0 +1,27 @@ +LIBRARY() + +OWNER( + pg + mvel + kulikov + g:base + g:middle +) + +SRCS( + conn.cpp + http.cpp + http_ex.cpp + options.cpp + response.cpp +) + +PEERDIR( + library/cpp/http/misc + library/cpp/http/io + library/cpp/threading/equeue +) + +END() + +RECURSE_FOR_TESTS(ut) |