aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2023-08-16 01:16:29 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2023-08-16 02:22:26 +0300
commit36bc9ed652bd6edb13dee861dc1fc958a4a76e9f (patch)
tree987fb2867f6c49f4f2c915d070aa47ca7ac20c89
parentd1a4fffc6c0a74a57deba7c00d8a67abe182d9dd (diff)
downloadydb-36bc9ed652bd6edb13dee861dc1fc958a4a76e9f.tar.gz
Intermediate changes
-rw-r--r--library/cpp/http/server/http_ut.cpp167
1 files changed, 68 insertions, 99 deletions
diff --git a/library/cpp/http/server/http_ut.cpp b/library/cpp/http/server/http_ut.cpp
index b570d1e6e3..824741f6db 100644
--- a/library/cpp/http/server/http_ut.cpp
+++ b/library/cpp/http/server/http_ut.cpp
@@ -8,7 +8,7 @@
#include <util/stream/output.h>
#include <util/stream/zlib.h>
#include <util/system/datetime.h>
-#include <util/system/sem.h>
+#include <util/system/mutex.h>
Y_UNIT_TEST_SUITE(THttpServerTest) {
class TEchoServer: public THttpServer::ICallBack {
@@ -62,11 +62,11 @@ Y_UNIT_TEST_SUITE(THttpServerTest) {
}
bool DoReply(const TReplyParams& params) override {
- Server->FreeThread();
- Server->Busy(1);
- params.Output.Write("HTTP/1.0 201 Created\nX-Server: sleeping server\n\nZoooo");
- params.Output.Finish();
- Server->Replies->Inc();
+ ++Server->Replies;
+ with_lock (Server->Lock) {
+ params.Output.Write("HTTP/1.0 201 Created\nX-Server: sleeping server\n\nZoooo");
+ params.Output.Finish();
+ }
return true;
}
@@ -75,65 +75,18 @@ Y_UNIT_TEST_SUITE(THttpServerTest) {
};
public:
- inline TSleepingServer(unsigned int size)
- : Semaphore("conns", size)
- , Semaphore2("threads", 1)
- , Replies(new TAtomicCounter())
- , MaxConns(new TAtomicCounter())
- {
- }
-
- void ResetCounters() {
- Replies.Reset(new TAtomicCounter());
- MaxConns.Reset(new TAtomicCounter());
- }
-
- long RepliesCount() const {
- return Replies->Val();
- }
-
- long MaxConnsCount() const {
- return MaxConns->Val();
- }
-
TClientRequest* CreateClient() override {
return new TReplier(this);
}
void OnMaxConn() override {
- MaxConns->Inc();
- }
-
- void OnFailRequest(int) override {
- FreeThread();
- Busy(1);
- }
-
- void Busy(int count) {
- while (count-- > 0) {
- Semaphore.Acquire();
- }
- }
-
- void BusyThread() {
- Semaphore2.Acquire();
- }
-
- void Free(int count) {
- while (count-- > 0) {
- Semaphore.Release();
- }
- }
-
- void FreeThread() {
- Semaphore2.Release();
+ ++MaxConns;
}
+ public:
+ TMutex Lock;
- private:
- TSemaphore Semaphore;
- TSemaphore Semaphore2;
- THolder<TAtomicCounter> Replies;
- THolder<TAtomicCounter> MaxConns;
+ std::atomic<size_t> Replies;
+ std::atomic<size_t> MaxConns;
};
static const TString CrLf = "\r\n";
@@ -717,56 +670,72 @@ Y_UNIT_TEST_SUITE(THttpServerTest) {
server.Stop();
}
-#if 0
Y_UNIT_TEST(TestSocketsLeak) {
- const bool trueFalse[] = {true, false};
TPortManager portManager;
- const ui16 port = portManager.GetPort();
TString res = TestData(25);
- TSleepingServer server(3);
- THttpServer::TOptions options(port);
- options.MaxConnections = 1;
- options.MaxQueueSize = 1;
- options.MaxFQueueSize = 2;
- options.nFThreads = 2;
- options.KeepAliveEnabled = true;
- options.RejectExcessConnections = true;
- THttpServer srv(&server, options);
- UNIT_ASSERT(srv.Start());
-
- for (bool keepAlive : trueFalse) {
- server.ResetCounters();
- TVector<TAutoPtr<IThreadFactory::IThread>> threads;
-
- server.Busy(3);
- server.BusyThread();
-
- for (size_t i = 0; i < 3; ++i) {
- auto func = [&server, port, keepAlive]() {
- server.BusyThread();
- THolder<TTestRequest> r = MakeHolder<TTestRequest>(port);
- r->KeepAliveConnection = keepAlive;
- r->Execute();
+
+ const bool trueFalse[] = {true, false};
+
+ for (bool rejectExcessConnections : trueFalse) {
+ for (bool keepAlive : trueFalse) {
+ const ui16 port = portManager.GetPort();
+ TSleepingServer server;
+ THttpServer::TOptions options(port);
+ options.nThreads = 1;
+ options.MaxConnections = 1;
+ options.MaxQueueSize = 10;
+ options.MaxFQueueSize = 2;
+ options.nFThreads = 2;
+ options.KeepAliveEnabled = true;
+ options.RejectExcessConnections = rejectExcessConnections;
+ THttpServer srv(&server, options);
+
+ UNIT_ASSERT(srv.Start());
+ UNIT_ASSERT(server.Lock.TryAcquire());
+
+ std::atomic<size_t> threadsFinished = 0;
+ TVector<THolder<IThreadFactory::IThread>> threads;
+ auto func = [port, keepAlive, &threadsFinished]() {
+ try {
+ TTestRequest r(port);
+ r.KeepAliveConnection = keepAlive;
+ r.Execute();
+ } catch (...) {
+ }
+ ++threadsFinished;
};
+
threads.push_back(SystemThreadFactory()->Run(func));
- }
- server.FreeThread(); // all threads get connection & go to processing
- Sleep(TDuration::MilliSeconds(100));
- server.BusyThread(); // we wait while connections are established by the
- // system and accepted by the server
- server.Free(3); // we release all connections processing
+ while (server.Replies.load() != 1) { //wait while we have one connection inside server
+ Sleep(TDuration::MilliSeconds(1));
+ }
- for (auto&& thread : threads) {
- thread->Join();
- }
+ for (size_t i = 1; i < 3; ++i) {
+ threads.push_back(SystemThreadFactory()->Run(func));
+
+ //in case of rejectExcessConnections next requests will fail, otherwise will stuck inside server queue
+ while ((rejectExcessConnections ? threadsFinished.load() : srv.GetRequestQueueSize()) != i) {
+ Sleep(TDuration::MilliSeconds(1));
+ }
+ }
- server.Free(3);
- server.FreeThread();
+ server.Lock.Release();
- UNIT_ASSERT_EQUAL_C(server.MaxConnsCount(), 2, "we should get MaxConn notification 2 times, got " + ToString(server.MaxConnsCount()));
- UNIT_ASSERT_EQUAL_C(server.RepliesCount(), 1, "only one request should have been processed, got " + ToString(server.RepliesCount()));
+ for (auto&& thread : threads) {
+ thread->Join();
+ }
+
+ TStringStream opts;
+ opts << " [" << rejectExcessConnections << ", " << keepAlive << "] ";
+
+ UNIT_ASSERT_EQUAL_C(server.MaxConns, 2, opts.Str() + "we should get MaxConn notification 2 times, got " + ToString(server.MaxConns.load()));
+ if (rejectExcessConnections) {
+ UNIT_ASSERT_EQUAL_C(server.Replies, 1, opts.Str() + "only one request should have been processed, got " + ToString(server.Replies.load()));
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL(server.Replies.load(), 3);
+ }
+ }
}
}
-#endif
}