diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2023-08-16 01:16:29 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2023-08-16 02:22:26 +0300 |
commit | 36bc9ed652bd6edb13dee861dc1fc958a4a76e9f (patch) | |
tree | 987fb2867f6c49f4f2c915d070aa47ca7ac20c89 | |
parent | d1a4fffc6c0a74a57deba7c00d8a67abe182d9dd (diff) | |
download | ydb-36bc9ed652bd6edb13dee861dc1fc958a4a76e9f.tar.gz |
Intermediate changes
-rw-r--r-- | library/cpp/http/server/http_ut.cpp | 167 |
1 files changed, 68 insertions, 99 deletions
diff --git a/library/cpp/http/server/http_ut.cpp b/library/cpp/http/server/http_ut.cpp index b570d1e6e3..824741f6db 100644 --- a/library/cpp/http/server/http_ut.cpp +++ b/library/cpp/http/server/http_ut.cpp @@ -8,7 +8,7 @@ #include <util/stream/output.h> #include <util/stream/zlib.h> #include <util/system/datetime.h> -#include <util/system/sem.h> +#include <util/system/mutex.h> Y_UNIT_TEST_SUITE(THttpServerTest) { class TEchoServer: public THttpServer::ICallBack { @@ -62,11 +62,11 @@ Y_UNIT_TEST_SUITE(THttpServerTest) { } bool DoReply(const TReplyParams& params) override { - Server->FreeThread(); - Server->Busy(1); - params.Output.Write("HTTP/1.0 201 Created\nX-Server: sleeping server\n\nZoooo"); - params.Output.Finish(); - Server->Replies->Inc(); + ++Server->Replies; + with_lock (Server->Lock) { + params.Output.Write("HTTP/1.0 201 Created\nX-Server: sleeping server\n\nZoooo"); + params.Output.Finish(); + } return true; } @@ -75,65 +75,18 @@ Y_UNIT_TEST_SUITE(THttpServerTest) { }; public: - inline TSleepingServer(unsigned int size) - : Semaphore("conns", size) - , Semaphore2("threads", 1) - , Replies(new TAtomicCounter()) - , MaxConns(new TAtomicCounter()) - { - } - - void ResetCounters() { - Replies.Reset(new TAtomicCounter()); - MaxConns.Reset(new TAtomicCounter()); - } - - long RepliesCount() const { - return Replies->Val(); - } - - long MaxConnsCount() const { - return MaxConns->Val(); - } - TClientRequest* CreateClient() override { return new TReplier(this); } void OnMaxConn() override { - MaxConns->Inc(); - } - - void OnFailRequest(int) override { - FreeThread(); - Busy(1); - } - - void Busy(int count) { - while (count-- > 0) { - Semaphore.Acquire(); - } - } - - void BusyThread() { - Semaphore2.Acquire(); - } - - void Free(int count) { - while (count-- > 0) { - Semaphore.Release(); - } - } - - void FreeThread() { - Semaphore2.Release(); + ++MaxConns; } + public: + TMutex Lock; - private: - TSemaphore Semaphore; - TSemaphore Semaphore2; - THolder<TAtomicCounter> Replies; - THolder<TAtomicCounter> MaxConns; + std::atomic<size_t> Replies; + std::atomic<size_t> MaxConns; }; static const TString CrLf = "\r\n"; @@ -717,56 +670,72 @@ Y_UNIT_TEST_SUITE(THttpServerTest) { server.Stop(); } -#if 0 Y_UNIT_TEST(TestSocketsLeak) { - const bool trueFalse[] = {true, false}; TPortManager portManager; - const ui16 port = portManager.GetPort(); TString res = TestData(25); - TSleepingServer server(3); - THttpServer::TOptions options(port); - options.MaxConnections = 1; - options.MaxQueueSize = 1; - options.MaxFQueueSize = 2; - options.nFThreads = 2; - options.KeepAliveEnabled = true; - options.RejectExcessConnections = true; - THttpServer srv(&server, options); - UNIT_ASSERT(srv.Start()); - - for (bool keepAlive : trueFalse) { - server.ResetCounters(); - TVector<TAutoPtr<IThreadFactory::IThread>> threads; - - server.Busy(3); - server.BusyThread(); - - for (size_t i = 0; i < 3; ++i) { - auto func = [&server, port, keepAlive]() { - server.BusyThread(); - THolder<TTestRequest> r = MakeHolder<TTestRequest>(port); - r->KeepAliveConnection = keepAlive; - r->Execute(); + + const bool trueFalse[] = {true, false}; + + for (bool rejectExcessConnections : trueFalse) { + for (bool keepAlive : trueFalse) { + const ui16 port = portManager.GetPort(); + TSleepingServer server; + THttpServer::TOptions options(port); + options.nThreads = 1; + options.MaxConnections = 1; + options.MaxQueueSize = 10; + options.MaxFQueueSize = 2; + options.nFThreads = 2; + options.KeepAliveEnabled = true; + options.RejectExcessConnections = rejectExcessConnections; + THttpServer srv(&server, options); + + UNIT_ASSERT(srv.Start()); + UNIT_ASSERT(server.Lock.TryAcquire()); + + std::atomic<size_t> threadsFinished = 0; + TVector<THolder<IThreadFactory::IThread>> threads; + auto func = [port, keepAlive, &threadsFinished]() { + try { + TTestRequest r(port); + r.KeepAliveConnection = keepAlive; + r.Execute(); + } catch (...) { + } + ++threadsFinished; }; + threads.push_back(SystemThreadFactory()->Run(func)); - } - server.FreeThread(); // all threads get connection & go to processing - Sleep(TDuration::MilliSeconds(100)); - server.BusyThread(); // we wait while connections are established by the - // system and accepted by the server - server.Free(3); // we release all connections processing + while (server.Replies.load() != 1) { //wait while we have one connection inside server + Sleep(TDuration::MilliSeconds(1)); + } - for (auto&& thread : threads) { - thread->Join(); - } + for (size_t i = 1; i < 3; ++i) { + threads.push_back(SystemThreadFactory()->Run(func)); + + //in case of rejectExcessConnections next requests will fail, otherwise will stuck inside server queue + while ((rejectExcessConnections ? threadsFinished.load() : srv.GetRequestQueueSize()) != i) { + Sleep(TDuration::MilliSeconds(1)); + } + } - server.Free(3); - server.FreeThread(); + server.Lock.Release(); - UNIT_ASSERT_EQUAL_C(server.MaxConnsCount(), 2, "we should get MaxConn notification 2 times, got " + ToString(server.MaxConnsCount())); - UNIT_ASSERT_EQUAL_C(server.RepliesCount(), 1, "only one request should have been processed, got " + ToString(server.RepliesCount())); + for (auto&& thread : threads) { + thread->Join(); + } + + TStringStream opts; + opts << " [" << rejectExcessConnections << ", " << keepAlive << "] "; + + UNIT_ASSERT_EQUAL_C(server.MaxConns, 2, opts.Str() + "we should get MaxConn notification 2 times, got " + ToString(server.MaxConns.load())); + if (rejectExcessConnections) { + UNIT_ASSERT_EQUAL_C(server.Replies, 1, opts.Str() + "only one request should have been processed, got " + ToString(server.Replies.load())); + } else { + UNIT_ASSERT_VALUES_EQUAL(server.Replies.load(), 3); + } + } } } -#endif } |