aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authormarkov <markov@yandex-team.ru>2022-02-10 16:49:38 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:49:38 +0300
commit22a7d4be7e5b779e4deee4dfe36234a1eae13fbd (patch)
tree5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp
parentc35714eae9ffbb7be89036495af366fa51a8c097 (diff)
downloadydb-22a7d4be7e5b779e4deee4dfe36234a1eae13fbd.tar.gz
Restoring authorship annotation for <markov@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/http/server/http.cpp48
-rw-r--r--library/cpp/http/server/http.h6
-rw-r--r--library/cpp/http/server/http_ut.cpp266
-rw-r--r--library/cpp/http/server/options.h14
4 files changed, 167 insertions, 167 deletions
diff --git a/library/cpp/http/server/http.cpp b/library/cpp/http/server/http.cpp
index 7ae2afe553..128583bdd7 100644
--- a/library/cpp/http/server/http.cpp
+++ b/library/cpp/http/server/http.cpp
@@ -59,13 +59,13 @@ public:
inline void Activate(TInstant now) noexcept;
inline void DeActivate();
- inline void Reject();
+ inline void Reject();
public:
TSocket Socket_;
NAddr::IRemoteAddrRef ListenerSockAddrRef_;
THttpServer::TImpl* HttpServ_ = nullptr;
- bool Reject_ = false;
+ bool Reject_ = false;
TInstant LastUsed;
TInstant AcceptMoment;
size_t ReceivedRequests = 0;
@@ -112,7 +112,7 @@ public:
bool TryRemovingUnsafe(TInstant border) noexcept {
if (Conns_.Empty()) {
- return false;
+ return false;
}
TClientConnection* c = &*(Conns_.Begin());
if (c->LastUsed > border) {
@@ -120,7 +120,7 @@ public:
}
EraseUnsafe(c);
delete c;
- return true;
+ return true;
}
void EraseUnsafe(TClientConnection* c) noexcept {
@@ -154,13 +154,13 @@ public:
}
void AddRequestFromSocket(const TSocket& s, TInstant now, NAddr::IRemoteAddrRef listenerSockAddrRef) {
- if (MaxRequestsReached()) {
+ if (MaxRequestsReached()) {
Cb_->OnMaxConn();
bool wasRemoved = Connections->RemoveOld(TInstant::Max());
- if (!wasRemoved && Options_.RejectExcessConnections) {
+ if (!wasRemoved && Options_.RejectExcessConnections) {
(new TClientConnection(s, this, listenerSockAddrRef))->Reject();
- return;
- }
+ return;
+ }
}
auto connection = new TClientConnection(s, this, listenerSockAddrRef);
@@ -242,7 +242,7 @@ public:
// ignore result
}
- void AddRequest(TAutoPtr<TClientRequest> req, bool fail) {
+ void AddRequest(TAutoPtr<TClientRequest> req, bool fail) {
struct TFailRequest: public THttpClientRequestEx {
inline TFailRequest(TAutoPtr<TClientRequest> parent) {
Conn_.Reset(parent->Conn_.Release());
@@ -259,7 +259,7 @@ public:
}
};
- if (!fail && Requests->Add(req.Get())) {
+ if (!fail && Requests->Add(req.Get())) {
Y_UNUSED(req.Release());
} else {
req = new TFailRequest(req);
@@ -438,10 +438,10 @@ public:
return AtomicGet(ConnectionCount);
}
- inline bool MaxRequestsReached() const {
+ inline bool MaxRequestsReached() const {
return Options_.MaxConnections && ((size_t)GetClientCount() >= Options_.MaxConnections);
- }
-
+ }
+
THolder<TThread> ListenThread;
TPipeHandle ListenWakeupReadFd;
TPipeHandle ListenWakeupWriteFd;
@@ -547,10 +547,10 @@ const IThreadPool& THttpServer::GetFailQueue() const {
return Impl_->GetFailQueue();
}
-bool THttpServer::MaxRequestsReached() const {
- return Impl_->MaxRequestsReached();
-}
-
+bool THttpServer::MaxRequestsReached() const {
+ return Impl_->MaxRequestsReached();
+}
+
TClientConnection::TClientConnection(const TSocket& s, THttpServer::TImpl* serv, NAddr::IRemoteAddrRef listenerSockAddrRef)
: Socket_(s)
, ListenerSockAddrRef_(listenerSockAddrRef)
@@ -591,7 +591,7 @@ void TClientConnection::OnPollEvent(TInstant now) {
THolder<TClientRequest> obj(HttpServ_->CreateRequest(this_));
AcceptMoment = now;
- HttpServ_->AddRequest(obj, Reject_);
+ HttpServ_->AddRequest(obj, Reject_);
}
void TClientConnection::Activate(TInstant now) noexcept {
@@ -604,12 +604,12 @@ void TClientConnection::DeActivate() {
HttpServ_->Connections->Add(this);
}
-void TClientConnection::Reject() {
- Reject_ = true;
-
- HttpServ_->Connections->Add(this);
-}
-
+void TClientConnection::Reject() {
+ Reject_ = true;
+
+ HttpServ_->Connections->Add(this);
+}
+
TClientRequest::TClientRequest() {
}
diff --git a/library/cpp/http/server/http.h b/library/cpp/http/server/http.h
index 1639e3ff59..b292d38f27 100644
--- a/library/cpp/http/server/http.h
+++ b/library/cpp/http/server/http.h
@@ -93,9 +93,9 @@ public:
static TAtomicBase AcceptReturnsInvalidSocketCounter();
private:
- bool MaxRequestsReached() const;
-
-private:
+ bool MaxRequestsReached() const;
+
+private:
THolder<TImpl> Impl_;
};
diff --git a/library/cpp/http/server/http_ut.cpp b/library/cpp/http/server/http_ut.cpp
index 1674efa4d2..cc62bb988e 100644
--- a/library/cpp/http/server/http_ut.cpp
+++ b/library/cpp/http/server/http_ut.cpp
@@ -8,7 +8,7 @@
#include <util/stream/output.h>
#include <util/stream/zlib.h>
#include <util/system/datetime.h>
-#include <util/system/sem.h>
+#include <util/system/sem.h>
Y_UNIT_TEST_SUITE(THttpServerTest) {
class TEchoServer: public THttpServer::ICallBack {
@@ -53,89 +53,89 @@ Y_UNIT_TEST_SUITE(THttpServerTest) {
TString Res_;
};
- class TSleepingServer: public THttpServer::ICallBack {
- class TReplier: public TRequestReplier {
- public:
- inline TReplier(TSleepingServer* server)
- : Server(server)
- {
- }
-
- bool DoReply(const TReplyParams& params) override {
- Server->FreeThread();
- Server->Busy(1);
- params.Output.Write("HTTP/1.0 201 Created\nX-Server: sleeping server\n\nZoooo");
- params.Output.Finish();
- Server->Replies->Inc();
- return true;
- }
-
- private:
- TSleepingServer* Server = nullptr;
- };
-
- public:
- inline TSleepingServer(unsigned int size)
- : Semaphore("conns", size)
- , Semaphore2("threads", 1)
- , Replies(new TAtomicCounter())
- , MaxConns(new TAtomicCounter())
- {
- }
-
- void ResetCounters() {
- Replies.Reset(new TAtomicCounter());
- MaxConns.Reset(new TAtomicCounter());
- }
-
- long RepliesCount() const {
- return Replies->Val();
- }
-
- long MaxConnsCount() const {
- return MaxConns->Val();
- }
-
- TClientRequest* CreateClient() override {
- return new TReplier(this);
- }
-
- void OnMaxConn() override {
- MaxConns->Inc();
- }
-
- void OnFailRequest(int) override {
- FreeThread();
- Busy(1);
- }
-
- void Busy(int count) {
- while (count-- > 0) {
- Semaphore.Acquire();
- }
- }
-
- void BusyThread() {
- Semaphore2.Acquire();
- }
-
- void Free(int count) {
- while (count-- > 0) {
- Semaphore.Release();
- }
- }
-
- void FreeThread() {
- Semaphore2.Release();
- }
-
- private:
- TSemaphore Semaphore;
- TSemaphore Semaphore2;
- THolder<TAtomicCounter> Replies;
- THolder<TAtomicCounter> MaxConns;
- };
-
+ class TSleepingServer: public THttpServer::ICallBack {
+ class TReplier: public TRequestReplier {
+ public:
+ inline TReplier(TSleepingServer* server)
+ : Server(server)
+ {
+ }
+
+ bool DoReply(const TReplyParams& params) override {
+ Server->FreeThread();
+ Server->Busy(1);
+ params.Output.Write("HTTP/1.0 201 Created\nX-Server: sleeping server\n\nZoooo");
+ params.Output.Finish();
+ Server->Replies->Inc();
+ return true;
+ }
+
+ private:
+ TSleepingServer* Server = nullptr;
+ };
+
+ public:
+ inline TSleepingServer(unsigned int size)
+ : Semaphore("conns", size)
+ , Semaphore2("threads", 1)
+ , Replies(new TAtomicCounter())
+ , MaxConns(new TAtomicCounter())
+ {
+ }
+
+ void ResetCounters() {
+ Replies.Reset(new TAtomicCounter());
+ MaxConns.Reset(new TAtomicCounter());
+ }
+
+ long RepliesCount() const {
+ return Replies->Val();
+ }
+
+ long MaxConnsCount() const {
+ return MaxConns->Val();
+ }
+
+ TClientRequest* CreateClient() override {
+ return new TReplier(this);
+ }
+
+ void OnMaxConn() override {
+ MaxConns->Inc();
+ }
+
+ void OnFailRequest(int) override {
+ FreeThread();
+ Busy(1);
+ }
+
+ void Busy(int count) {
+ while (count-- > 0) {
+ Semaphore.Acquire();
+ }
+ }
+
+ void BusyThread() {
+ Semaphore2.Acquire();
+ }
+
+ void Free(int count) {
+ while (count-- > 0) {
+ Semaphore.Release();
+ }
+ }
+
+ void FreeThread() {
+ Semaphore2.Release();
+ }
+
+ private:
+ TSemaphore Semaphore;
+ TSemaphore Semaphore2;
+ THolder<TAtomicCounter> Replies;
+ THolder<TAtomicCounter> MaxConns;
+ };
+
static const TString CrLf = "\r\n";
struct TTestRequest {
@@ -166,10 +166,10 @@ Y_UNIT_TEST_SUITE(THttpServerTest) {
TSocket* s = nullptr;
THolder<TSocket> singleReqSocket;
if (KeepAliveConnection) {
- if (!KeepAlivedSocket) {
+ if (!KeepAlivedSocket) {
KeepAlivedSocket = MakeHolder<TSocket>(TNetworkAddress("localhost", Port), TDuration::Seconds(10));
- }
- s = KeepAlivedSocket.Get();
+ }
+ s = KeepAlivedSocket.Get();
} else {
TNetworkAddress addr("localhost", Port);
singleReqSocket.Reset(new TSocket(addr, TDuration::Seconds(10)));
@@ -283,7 +283,7 @@ Y_UNIT_TEST_SUITE(THttpServerTest) {
TString ContentEncoding;
TString Content;
bool KeepAliveConnection = false;
- THolder<TSocket> KeepAlivedSocket;
+ THolder<TSocket> KeepAlivedSocket;
bool EnableResponseEncoding = false;
TString Hdr;
bool Expect100Continue = false;
@@ -455,7 +455,7 @@ Y_UNIT_TEST_SUITE(THttpServerTest) {
}
server.Stop();
}
-
+
class TReleaseConnectionServer: public THttpServer::ICallBack {
class TRequest: public THttpClientRequestEx {
public:
@@ -686,54 +686,54 @@ Y_UNIT_TEST_SUITE(THttpServerTest) {
#if 0
Y_UNIT_TEST(TestSocketsLeak) {
- const bool trueFalse[] = {true, false};
- TPortManager portManager;
- const ui16 port = portManager.GetPort();
- TString res = TestData(25);
- TSleepingServer server(3);
- THttpServer::TOptions options(port);
- options.MaxConnections = 1;
- options.MaxQueueSize = 1;
- options.MaxFQueueSize = 2;
- options.nFThreads = 2;
- options.KeepAliveEnabled = true;
- options.RejectExcessConnections = true;
- THttpServer srv(&server, options);
- UNIT_ASSERT(srv.Start());
-
- for (bool keepAlive : trueFalse) {
- server.ResetCounters();
+ const bool trueFalse[] = {true, false};
+ TPortManager portManager;
+ const ui16 port = portManager.GetPort();
+ TString res = TestData(25);
+ TSleepingServer server(3);
+ THttpServer::TOptions options(port);
+ options.MaxConnections = 1;
+ options.MaxQueueSize = 1;
+ options.MaxFQueueSize = 2;
+ options.nFThreads = 2;
+ options.KeepAliveEnabled = true;
+ options.RejectExcessConnections = true;
+ THttpServer srv(&server, options);
+ UNIT_ASSERT(srv.Start());
+
+ for (bool keepAlive : trueFalse) {
+ server.ResetCounters();
TVector<TAutoPtr<IThreadFactory::IThread>> threads;
-
- server.Busy(3);
- server.BusyThread();
-
- for (size_t i = 0; i < 3; ++i) {
+
+ server.Busy(3);
+ server.BusyThread();
+
+ for (size_t i = 0; i < 3; ++i) {
auto func = [&server, port, keepAlive]() {
- server.BusyThread();
+ server.BusyThread();
THolder<TTestRequest> r = MakeHolder<TTestRequest>(port);
- r->KeepAliveConnection = keepAlive;
- r->Execute();
- };
+ r->KeepAliveConnection = keepAlive;
+ r->Execute();
+ };
threads.push_back(SystemThreadFactory()->Run(func));
- }
-
- server.FreeThread(); // all threads get connection & go to processing
- Sleep(TDuration::MilliSeconds(100));
- server.BusyThread(); // we wait while connections are established by the
- // system and accepted by the server
+ }
+
+ server.FreeThread(); // all threads get connection & go to processing
+ Sleep(TDuration::MilliSeconds(100));
+ server.BusyThread(); // we wait while connections are established by the
+ // system and accepted by the server
server.Free(3); // we release all connections processing
-
- for (auto&& thread : threads) {
- thread->Join();
- }
-
- server.Free(3);
- server.FreeThread();
-
- UNIT_ASSERT_EQUAL_C(server.MaxConnsCount(), 2, "we should get MaxConn notification 2 times, got " + ToString(server.MaxConnsCount()));
- UNIT_ASSERT_EQUAL_C(server.RepliesCount(), 1, "only one request should have been processed, got " + ToString(server.RepliesCount()));
- }
- }
+
+ for (auto&& thread : threads) {
+ thread->Join();
+ }
+
+ server.Free(3);
+ server.FreeThread();
+
+ UNIT_ASSERT_EQUAL_C(server.MaxConnsCount(), 2, "we should get MaxConn notification 2 times, got " + ToString(server.MaxConnsCount()));
+ UNIT_ASSERT_EQUAL_C(server.RepliesCount(), 1, "only one request should have been processed, got " + ToString(server.RepliesCount()));
+ }
+ }
#endif
}
diff --git a/library/cpp/http/server/options.h b/library/cpp/http/server/options.h
index fae4176ae0..38eda0e5e7 100644
--- a/library/cpp/http/server/options.h
+++ b/library/cpp/http/server/options.h
@@ -44,12 +44,12 @@ public:
return *this;
}
- inline THttpServerOptions& EnableRejectExcessConnections(bool enable) noexcept {
- RejectExcessConnections = enable;
-
- return *this;
- }
-
+ inline THttpServerOptions& EnableRejectExcessConnections(bool enable) noexcept {
+ RejectExcessConnections = enable;
+
+ return *this;
+ }
+
inline THttpServerOptions& EnableReusePort(bool enable) noexcept {
ReusePort = enable;
@@ -148,7 +148,7 @@ public:
bool KeepAliveEnabled = true;
bool CompressionEnabled = false;
- bool RejectExcessConnections = false;
+ bool RejectExcessConnections = false;
bool ReusePort = false; // set SO_REUSEPORT socket option
bool ReuseAddress = true; // set SO_REUSEADDR socket option
TAddrs BindSockaddr;