aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/test
diff options
context:
space:
mode:
authorsingle <single@yandex-team.ru>2022-02-10 16:50:29 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:50:29 +0300
commit8ae96df130bbede609c3504aa9af1bc6ff5361b3 (patch)
tree4751832974bd75ca721269aa54faa15d76032dfb /library/cpp/messagebus/test
parent5d4e7b7c923852e0f6398791ec98a60cf9faab46 (diff)
downloadydb-8ae96df130bbede609c3504aa9af1bc6ff5361b3.tar.gz
Restoring authorship annotation for <single@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/test')
-rw-r--r--library/cpp/messagebus/test/ut/messagebus_ut.cpp166
1 files changed, 83 insertions, 83 deletions
diff --git a/library/cpp/messagebus/test/ut/messagebus_ut.cpp b/library/cpp/messagebus/test/ut/messagebus_ut.cpp
index 040f9b7702..c11d447224 100644
--- a/library/cpp/messagebus/test/ut/messagebus_ut.cpp
+++ b/library/cpp/messagebus/test/ut/messagebus_ut.cpp
@@ -962,103 +962,103 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
client.Sync.WaitForAndIncrement(3);
}
-
+
struct TServerForQuotaWake: public TExampleServer {
TSystemEvent GoOn;
TMutex OneLock;
-
- TOnMessageContext OneMessage;
-
- static TBusServerSessionConfig Config() {
- TBusServerSessionConfig config;
-
- config.PerConnectionMaxInFlight = 1;
- config.PerConnectionMaxInFlightBySize = 1500;
- config.MaxMessageSize = 1024;
-
- return config;
- }
-
- TServerForQuotaWake()
- : TExampleServer("TServerForQuotaWake", Config())
+
+ TOnMessageContext OneMessage;
+
+ static TBusServerSessionConfig Config() {
+ TBusServerSessionConfig config;
+
+ config.PerConnectionMaxInFlight = 1;
+ config.PerConnectionMaxInFlightBySize = 1500;
+ config.MaxMessageSize = 1024;
+
+ return config;
+ }
+
+ TServerForQuotaWake()
+ : TExampleServer("TServerForQuotaWake", Config())
{
}
-
+
~TServerForQuotaWake() override {
- Session->Shutdown();
- }
-
+ Session->Shutdown();
+ }
+
void OnMessage(TOnMessageContext& req) override {
- if (!GoOn.Wait(0)) {
+ if (!GoOn.Wait(0)) {
TGuard<TMutex> guard(OneLock);
-
- UNIT_ASSERT(!OneMessage);
-
- OneMessage.Swap(req);
- } else
- TExampleServer::OnMessage(req);
- }
-
- void WakeOne() {
+
+ UNIT_ASSERT(!OneMessage);
+
+ OneMessage.Swap(req);
+ } else
+ TExampleServer::OnMessage(req);
+ }
+
+ void WakeOne() {
TGuard<TMutex> guard(OneLock);
-
- UNIT_ASSERT(!!OneMessage);
-
- TExampleServer::OnMessage(OneMessage);
-
- TOnMessageContext().Swap(OneMessage);
- }
- };
-
+
+ UNIT_ASSERT(!!OneMessage);
+
+ TExampleServer::OnMessage(OneMessage);
+
+ TOnMessageContext().Swap(OneMessage);
+ }
+ };
+
Y_UNIT_TEST(WakeReaderOnQuota) {
- const size_t test_msg_count = 64;
-
- TBusClientSessionConfig clientConfig;
-
- clientConfig.MaxInFlight = test_msg_count;
-
- TExampleClient client(clientConfig);
- TServerForQuotaWake server;
- TInstant start;
-
- client.MessageCount = test_msg_count;
-
- const NBus::TNetAddr addr = server.GetActualListenAddr();
-
- for (unsigned count = 0;;) {
- UNIT_ASSERT(count <= test_msg_count);
-
- TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount));
- EMessageStatus status = client.Session->SendMessageAutoPtr(message, &addr);
-
- if (status == MESSAGE_OK) {
- count++;
-
- } else if (status == MESSAGE_BUSY) {
+ const size_t test_msg_count = 64;
+
+ TBusClientSessionConfig clientConfig;
+
+ clientConfig.MaxInFlight = test_msg_count;
+
+ TExampleClient client(clientConfig);
+ TServerForQuotaWake server;
+ TInstant start;
+
+ client.MessageCount = test_msg_count;
+
+ const NBus::TNetAddr addr = server.GetActualListenAddr();
+
+ for (unsigned count = 0;;) {
+ UNIT_ASSERT(count <= test_msg_count);
+
+ TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount));
+ EMessageStatus status = client.Session->SendMessageAutoPtr(message, &addr);
+
+ if (status == MESSAGE_OK) {
+ count++;
+
+ } else if (status == MESSAGE_BUSY) {
if (count == test_msg_count) {
- TInstant now = TInstant::Now();
-
+ TInstant now = TInstant::Now();
+
if (start.GetValue() == 0) {
- start = now;
-
+ start = now;
+
// TODO: properly check that server is blocked
} else if (start + TDuration::MilliSeconds(100) < now) {
- break;
- }
- }
-
- Sleep(TDuration::MilliSeconds(10));
-
- } else
- UNIT_ASSERT(false);
- }
-
- server.GoOn.Signal();
- server.WakeOne();
-
- client.WaitReplies();
-
- server.WaitForOnMessageCount(test_msg_count);
+ break;
+ }
+ }
+
+ Sleep(TDuration::MilliSeconds(10));
+
+ } else
+ UNIT_ASSERT(false);
+ }
+
+ server.GoOn.Signal();
+ server.WakeOne();
+
+ client.WaitReplies();
+
+ server.WaitForOnMessageCount(test_msg_count);
};
Y_UNIT_TEST(TestConnectionAttempts) {