diff options
author | single <single@yandex-team.ru> | 2022-02-10 16:50:29 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:29 +0300 |
commit | 8ae96df130bbede609c3504aa9af1bc6ff5361b3 (patch) | |
tree | 4751832974bd75ca721269aa54faa15d76032dfb /library/cpp/messagebus/test | |
parent | 5d4e7b7c923852e0f6398791ec98a60cf9faab46 (diff) | |
download | ydb-8ae96df130bbede609c3504aa9af1bc6ff5361b3.tar.gz |
Restoring authorship annotation for <single@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/test')
-rw-r--r-- | library/cpp/messagebus/test/ut/messagebus_ut.cpp | 166 |
1 files changed, 83 insertions, 83 deletions
diff --git a/library/cpp/messagebus/test/ut/messagebus_ut.cpp b/library/cpp/messagebus/test/ut/messagebus_ut.cpp index 040f9b7702..c11d447224 100644 --- a/library/cpp/messagebus/test/ut/messagebus_ut.cpp +++ b/library/cpp/messagebus/test/ut/messagebus_ut.cpp @@ -962,103 +962,103 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { client.Sync.WaitForAndIncrement(3); } - + struct TServerForQuotaWake: public TExampleServer { TSystemEvent GoOn; TMutex OneLock; - - TOnMessageContext OneMessage; - - static TBusServerSessionConfig Config() { - TBusServerSessionConfig config; - - config.PerConnectionMaxInFlight = 1; - config.PerConnectionMaxInFlightBySize = 1500; - config.MaxMessageSize = 1024; - - return config; - } - - TServerForQuotaWake() - : TExampleServer("TServerForQuotaWake", Config()) + + TOnMessageContext OneMessage; + + static TBusServerSessionConfig Config() { + TBusServerSessionConfig config; + + config.PerConnectionMaxInFlight = 1; + config.PerConnectionMaxInFlightBySize = 1500; + config.MaxMessageSize = 1024; + + return config; + } + + TServerForQuotaWake() + : TExampleServer("TServerForQuotaWake", Config()) { } - + ~TServerForQuotaWake() override { - Session->Shutdown(); - } - + Session->Shutdown(); + } + void OnMessage(TOnMessageContext& req) override { - if (!GoOn.Wait(0)) { + if (!GoOn.Wait(0)) { TGuard<TMutex> guard(OneLock); - - UNIT_ASSERT(!OneMessage); - - OneMessage.Swap(req); - } else - TExampleServer::OnMessage(req); - } - - void WakeOne() { + + UNIT_ASSERT(!OneMessage); + + OneMessage.Swap(req); + } else + TExampleServer::OnMessage(req); + } + + void WakeOne() { TGuard<TMutex> guard(OneLock); - - UNIT_ASSERT(!!OneMessage); - - TExampleServer::OnMessage(OneMessage); - - TOnMessageContext().Swap(OneMessage); - } - }; - + + UNIT_ASSERT(!!OneMessage); + + TExampleServer::OnMessage(OneMessage); + + TOnMessageContext().Swap(OneMessage); + } + }; + Y_UNIT_TEST(WakeReaderOnQuota) { - const size_t test_msg_count = 64; - - TBusClientSessionConfig clientConfig; - - clientConfig.MaxInFlight = test_msg_count; - - TExampleClient client(clientConfig); - TServerForQuotaWake server; - TInstant start; - - client.MessageCount = test_msg_count; - - const NBus::TNetAddr addr = server.GetActualListenAddr(); - - for (unsigned count = 0;;) { - UNIT_ASSERT(count <= test_msg_count); - - TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount)); - EMessageStatus status = client.Session->SendMessageAutoPtr(message, &addr); - - if (status == MESSAGE_OK) { - count++; - - } else if (status == MESSAGE_BUSY) { + const size_t test_msg_count = 64; + + TBusClientSessionConfig clientConfig; + + clientConfig.MaxInFlight = test_msg_count; + + TExampleClient client(clientConfig); + TServerForQuotaWake server; + TInstant start; + + client.MessageCount = test_msg_count; + + const NBus::TNetAddr addr = server.GetActualListenAddr(); + + for (unsigned count = 0;;) { + UNIT_ASSERT(count <= test_msg_count); + + TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount)); + EMessageStatus status = client.Session->SendMessageAutoPtr(message, &addr); + + if (status == MESSAGE_OK) { + count++; + + } else if (status == MESSAGE_BUSY) { if (count == test_msg_count) { - TInstant now = TInstant::Now(); - + TInstant now = TInstant::Now(); + if (start.GetValue() == 0) { - start = now; - + start = now; + // TODO: properly check that server is blocked } else if (start + TDuration::MilliSeconds(100) < now) { - break; - } - } - - Sleep(TDuration::MilliSeconds(10)); - - } else - UNIT_ASSERT(false); - } - - server.GoOn.Signal(); - server.WakeOne(); - - client.WaitReplies(); - - server.WaitForOnMessageCount(test_msg_count); + break; + } + } + + Sleep(TDuration::MilliSeconds(10)); + + } else + UNIT_ASSERT(false); + } + + server.GoOn.Signal(); + server.WakeOne(); + + client.WaitReplies(); + + server.WaitForOnMessageCount(test_msg_count); }; Y_UNIT_TEST(TestConnectionAttempts) { |