diff options
| author | single <[email protected]> | 2022-02-10 16:50:29 +0300 | 
|---|---|---|
| committer | Daniil Cherednik <[email protected]> | 2022-02-10 16:50:29 +0300 | 
| commit | 8ae96df130bbede609c3504aa9af1bc6ff5361b3 (patch) | |
| tree | 4751832974bd75ca721269aa54faa15d76032dfb /library/cpp/messagebus/test/ut/messagebus_ut.cpp | |
| parent | 5d4e7b7c923852e0f6398791ec98a60cf9faab46 (diff) | |
Restoring authorship annotation for <[email protected]>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/test/ut/messagebus_ut.cpp')
| -rw-r--r-- | library/cpp/messagebus/test/ut/messagebus_ut.cpp | 166 | 
1 files changed, 83 insertions, 83 deletions
| diff --git a/library/cpp/messagebus/test/ut/messagebus_ut.cpp b/library/cpp/messagebus/test/ut/messagebus_ut.cpp index 040f9b77022..c11d447224f 100644 --- a/library/cpp/messagebus/test/ut/messagebus_ut.cpp +++ b/library/cpp/messagebus/test/ut/messagebus_ut.cpp @@ -962,103 +962,103 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {          client.Sync.WaitForAndIncrement(3);      } - +       struct TServerForQuotaWake: public TExampleServer {          TSystemEvent GoOn;          TMutex OneLock; - -        TOnMessageContext OneMessage; - -        static TBusServerSessionConfig Config() { -            TBusServerSessionConfig config; - -            config.PerConnectionMaxInFlight = 1; -            config.PerConnectionMaxInFlightBySize = 1500; -            config.MaxMessageSize = 1024; - -            return config; -        } - -        TServerForQuotaWake() -            : TExampleServer("TServerForQuotaWake", Config()) +  +        TOnMessageContext OneMessage;  +  +        static TBusServerSessionConfig Config() {  +            TBusServerSessionConfig config;  +  +            config.PerConnectionMaxInFlight = 1;  +            config.PerConnectionMaxInFlightBySize = 1500;  +            config.MaxMessageSize = 1024;  +  +            return config;  +        }  +  +        TServerForQuotaWake()  +            : TExampleServer("TServerForQuotaWake", Config())           {          } - +           ~TServerForQuotaWake() override { -            Session->Shutdown(); -        } - +            Session->Shutdown();  +        }  +           void OnMessage(TOnMessageContext& req) override { -            if (!GoOn.Wait(0)) { +            if (!GoOn.Wait(0)) {                   TGuard<TMutex> guard(OneLock); - -                UNIT_ASSERT(!OneMessage); - -                OneMessage.Swap(req); -            } else -                TExampleServer::OnMessage(req); -        } - -        void WakeOne() { +  +                UNIT_ASSERT(!OneMessage);  +  +                OneMessage.Swap(req);  +            } else  +                TExampleServer::OnMessage(req);  +        }  +  +        void WakeOne() {               TGuard<TMutex> guard(OneLock); - -            UNIT_ASSERT(!!OneMessage); - -            TExampleServer::OnMessage(OneMessage); - -            TOnMessageContext().Swap(OneMessage); -        } -    }; - +  +            UNIT_ASSERT(!!OneMessage);  +  +            TExampleServer::OnMessage(OneMessage);  +  +            TOnMessageContext().Swap(OneMessage);  +        }  +    };  +       Y_UNIT_TEST(WakeReaderOnQuota) { -        const size_t test_msg_count = 64; - -        TBusClientSessionConfig clientConfig; - -        clientConfig.MaxInFlight = test_msg_count; - -        TExampleClient client(clientConfig); -        TServerForQuotaWake server; -        TInstant start; - -        client.MessageCount = test_msg_count; - -        const NBus::TNetAddr addr = server.GetActualListenAddr(); - -        for (unsigned count = 0;;) { -            UNIT_ASSERT(count <= test_msg_count); - -            TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount)); -            EMessageStatus status = client.Session->SendMessageAutoPtr(message, &addr); - -            if (status == MESSAGE_OK) { -                count++; - -            } else if (status == MESSAGE_BUSY) { +        const size_t test_msg_count = 64;  +  +        TBusClientSessionConfig clientConfig;  +  +        clientConfig.MaxInFlight = test_msg_count;  +  +        TExampleClient client(clientConfig);  +        TServerForQuotaWake server;  +        TInstant start;  +  +        client.MessageCount = test_msg_count;  +  +        const NBus::TNetAddr addr = server.GetActualListenAddr();  +  +        for (unsigned count = 0;;) {  +            UNIT_ASSERT(count <= test_msg_count);  +  +            TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount));  +            EMessageStatus status = client.Session->SendMessageAutoPtr(message, &addr);  +  +            if (status == MESSAGE_OK) {  +                count++;  +  +            } else if (status == MESSAGE_BUSY) {                   if (count == test_msg_count) { -                    TInstant now = TInstant::Now(); - +                    TInstant now = TInstant::Now();  +                       if (start.GetValue() == 0) { -                        start = now; - +                        start = now;  +                           // TODO: properly check that server is blocked                      } else if (start + TDuration::MilliSeconds(100) < now) { -                        break; -                    } -                } - -                Sleep(TDuration::MilliSeconds(10)); - -            } else -                UNIT_ASSERT(false); -        } - -        server.GoOn.Signal(); -        server.WakeOne(); - -        client.WaitReplies(); - -        server.WaitForOnMessageCount(test_msg_count); +                        break;  +                    }  +                }  +  +                Sleep(TDuration::MilliSeconds(10));  +  +            } else  +                UNIT_ASSERT(false);  +        }  +  +        server.GoOn.Signal();  +        server.WakeOne();  +  +        client.WaitReplies();  +  +        server.WaitForOnMessageCount(test_msg_count);       };      Y_UNIT_TEST(TestConnectionAttempts) { | 
