aboutsummaryrefslogblamecommitdiffstats
path: root/library/cpp/messagebus/test/ut/messagebus_ut.cpp
blob: b333e2fe23fddef8bf823ff69513d553598a8f41 (plain) (tree)
1
2
3
4
5
6
7
8
9
                                                  
 



                                                                  
 
                                                  

                              
                  
                     
                            







                                                                   
         
 
                                                     


                                                                                        
                                                                     












                                                   
                                      
                                                                                 
                                                                                















                                                                          
                                   

                                                                         
                                           

                                                                        
                                                  

                                                                        
                            





                                                      
                                 
































                                                                   
                                                          

                           



                                                         
 
                                            


                                                                                     
                               
 
                                                                                                                          














                                                                                                                           
                                                                                               

                                                                                    
                                                                                                  






                                                               
                                                       























                                                                                               
                                     





                                            
                                



                                           
                              






























                                                               
                                                                    












                                                                                         
         






                                                                             
                                                                                                                               














                                                                                
                                                            

                                                 
                                                             

                                                  
                                                       

















                                                                                           
                                                            













                                                                                  
                                                                                   







                                                                          
                                                            






                                        
                                                                 


























                                                                                        
                            
                                                                                                           

         
                                                          














































                                                                                                  
                                                                           

         
                              







                                                                                                         
                                         





















                                                                         
                                                              








                                                                    
         
 
                                                


















                                                                                 
                                                                     

         
                                          


















                                                                   
                                                             








                                                                   
         
 
                                               








                                                                                                
                                 


             
                                         















                                                                                     
                                          

















                                                                        
                                        















                                                                                   
                                            






















                                                                                                    
                                                 






























                                                                                      
                                                                              














                                                                     
                                                                



                                                                                  
                                                                                                                                                  



                                          
                                                       










                                                                                                                                  
                                                                              
                           
                                                                




                                                                                     
                                                                                                                          




                                                      
                                        



                                          
                                                             













                                                                                      
                                                                                                                       






                                             
                                                     



                                           
                                                                      
                           
                                                        








                                                                       
                                               













                                                                                                                            
                             















                                                                        
                                    








                                                                                    
                                    



                                                                 
                                                                








                                                       
                                      
























                                                                                             
                                                                                        






                                                                              
                                                            
                       
                                              

























                                                                                    
                                                            
                                  

                                                        
 
                                              






                                                                      
                                                    















                                                                                                  
                                                      














                                                                                                  
                                                       
                          
                       













                                                             
         
 
                                         



                                                         
                                              







                                               
                                          







                                                  
                                     






















                                                                                           
                                              
                                                   
                                                
                                    
                                                                      

















                                                                            
                                          










                                                                                                                      
                                                                                                  








                                                                                                                      
                                                                                                  





                                                                                                 
                                                                             











                                                                                                                      
                                                                                                  










                                                                                                 
                                                                          











                                                                                                                      
                                                                                                  












                                                                                                    
#include <library/cpp/testing/unittest/registar.h>

#include <library/cpp/messagebus/test/helper/example.h>
#include <library/cpp/messagebus/test/helper/fixed_port.h>
#include <library/cpp/messagebus/test/helper/hanging_server.h>
#include <library/cpp/messagebus/test/helper/object_count_check.h>
#include <library/cpp/messagebus/test/helper/wait_for.h>

#include <library/cpp/messagebus/misc/test_sync.h>

#include <util/network/sock.h>

#include <utility>

using namespace NBus;
using namespace NBus::NTest;

namespace {
    struct TExampleClientSlowOnMessageSent: public TExampleClient {
        TAtomic SentCompleted;

        TSystemEvent ReplyReceived;

        TExampleClientSlowOnMessageSent()
            : SentCompleted(0)
        {
        }

        ~TExampleClientSlowOnMessageSent() override {
            Session->Shutdown();
        }

        void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override {
            Y_VERIFY(AtomicGet(SentCompleted), "must be completed"); 

            TExampleClient::OnReply(mess, reply);

            ReplyReceived.Signal();
        }

        void OnMessageSent(TBusMessage*) override {
            Sleep(TDuration::MilliSeconds(100));
            AtomicSet(SentCompleted, 1);
        }
    };

}

Y_UNIT_TEST_SUITE(TMessageBusTests) { 
    void TestDestinationTemplate(bool useCompression, bool ackMessageBeforeReply,
                                 const TBusServerSessionConfig& sessionConfig) {
        TObjectCountCheck objectCountCheck;

        TExampleServer server;

        TExampleClient client(sessionConfig);
        client.CrashOnError = true;

        server.UseCompression = useCompression;
        client.UseCompression = useCompression;

        server.AckMessageBeforeSendReply = ackMessageBeforeReply;

        client.SendMessagesWaitReplies(100, server.GetActualListenAddr());
        UNIT_ASSERT_EQUAL(server.Session->GetInFlight(), 0);
        UNIT_ASSERT_EQUAL(client.Session->GetInFlight(), 0);
    }

    Y_UNIT_TEST(TestDestination) { 
        TestDestinationTemplate(false, false, TBusServerSessionConfig());
    }

    Y_UNIT_TEST(TestDestinationUsingAck) { 
        TestDestinationTemplate(false, true, TBusServerSessionConfig());
    }

    Y_UNIT_TEST(TestDestinationWithCompression) { 
        TestDestinationTemplate(true, false, TBusServerSessionConfig());
    }

    Y_UNIT_TEST(TestCork) { 
        TBusServerSessionConfig config;
        config.SendThreshold = 1000000000000;
        config.Cork = TDuration::MilliSeconds(10);
        TestDestinationTemplate(false, false, config);
        // TODO: test for cork hanging
    }

    Y_UNIT_TEST(TestReconnect) { 
        if (!IsFixedPortTestAllowed()) {
            return;
        }

        TObjectCountCheck objectCountCheck;

        unsigned port = FixedPort;
        TNetAddr serverAddr("localhost", port);
        THolder<TExampleServer> server;

        TBusClientSessionConfig clientConfig;
        clientConfig.RetryInterval = 0;
        TExampleClient client(clientConfig);

        server.Reset(new TExampleServer(port, "TExampleServer 1"));

        client.SendMessagesWaitReplies(17, serverAddr);

        server.Destroy();

        // Making the client to detect disconnection.
        client.SendMessages(1, serverAddr);
        EMessageStatus error = client.WaitForError();
        if (error == MESSAGE_DELIVERY_FAILED) {
            client.SendMessages(1, serverAddr);
            error = client.WaitForError();
        }
        UNIT_ASSERT_VALUES_EQUAL(MESSAGE_CONNECT_FAILED, error);

        server.Reset(new TExampleServer(port, "TExampleServer 2"));

        client.SendMessagesWaitReplies(19, serverAddr);
    }

    struct TestNoServerImplClient: public TExampleClient {
        TTestSync TestSync;
        int failures = 0;

        template <typename... Args>
        TestNoServerImplClient(Args&&... args)
            : TExampleClient(std::forward<Args>(args)...)
        {
        }

        ~TestNoServerImplClient() override {
            Session->Shutdown();
        }

        void OnError(TAutoPtr<TBusMessage> message, EMessageStatus status) override {
            Y_UNUSED(message); 

            Y_VERIFY(status == MESSAGE_CONNECT_FAILED, "must be MESSAGE_CONNECT_FAILED, got %s", ToString(status).data());

            TestSync.CheckAndIncrement((failures++) * 2);
        }
    };

    void TestNoServerImpl(unsigned port, bool oneWay) {
        TNetAddr noServerAddr("localhost", port);

        TestNoServerImplClient client;

        int count = 0;
        for (; count < 200; ++count) {
            EMessageStatus status;
            if (oneWay) {
                status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &noServerAddr);
            } else {
                TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount));
                status = client.Session->SendMessageAutoPtr(message, &noServerAddr);
            }

            Y_VERIFY(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data());

            if (count == 0) {
                // lame way to wait until it is connected
                Sleep(TDuration::MilliSeconds(10));
            }
            client.TestSync.WaitForAndIncrement(count * 2 + 1);
        }

        client.TestSync.WaitForAndIncrement(count * 2);
    }

    void HangingServerImpl(unsigned port) {
        TNetAddr noServerAddr("localhost", port);

        TExampleClient client;

        int count = 0;
        for (;; ++count) {
            TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount));
            EMessageStatus status = client.Session->SendMessageAutoPtr(message, &noServerAddr);
            if (status == MESSAGE_BUSY) {
                break;
            }
            UNIT_ASSERT_VALUES_EQUAL(int(MESSAGE_OK), int(status));

            if (count == 0) {
                // lame way to wait until it is connected
                Sleep(TDuration::MilliSeconds(10));
            }
        }

        UNIT_ASSERT_VALUES_EQUAL(client.Session->GetConfig()->MaxInFlight, count);
    }

    Y_UNIT_TEST(TestHangindServer) { 
        TObjectCountCheck objectCountCheck;

        THangingServer server(0);

        HangingServerImpl(server.GetPort());
    }

    Y_UNIT_TEST(TestNoServer) { 
        TObjectCountCheck objectCountCheck;

        TestNoServerImpl(17, false);
    }

    Y_UNIT_TEST(PauseInput) { 
        TObjectCountCheck objectCountCheck;

        TExampleServer server;
        server.Session->PauseInput(true);

        TBusClientSessionConfig clientConfig;
        clientConfig.MaxInFlight = 1000;
        TExampleClient client(clientConfig);

        client.SendMessages(100, server.GetActualListenAddr());

        server.TestSync.Check(0);

        server.Session->PauseInput(false);

        server.TestSync.WaitFor(100);

        client.WaitReplies();

        server.Session->PauseInput(true);

        client.SendMessages(200, server.GetActualListenAddr());

        server.TestSync.Check(100);

        server.Session->PauseInput(false);

        server.TestSync.WaitFor(300);

        client.WaitReplies();
    }

    struct TSendTimeoutCheckerExampleClient: public TExampleClient {
        static TBusClientSessionConfig SessionConfig(bool periodLessThanConnectTimeout) {
            TBusClientSessionConfig sessionConfig;
            if (periodLessThanConnectTimeout) {
                sessionConfig.SendTimeout = 1;
                sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(50);
            } else {
                sessionConfig.SendTimeout = 50;
                sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(1);
            }
            return sessionConfig;
        }

        TSendTimeoutCheckerExampleClient(bool periodLessThanConnectTimeout)
            : TExampleClient(SessionConfig(periodLessThanConnectTimeout))
        {
        }

        ~TSendTimeoutCheckerExampleClient() override {
            Session->Shutdown();
        }

        TSystemEvent ErrorHappened;

        void OnError(TAutoPtr<TBusMessage>, EMessageStatus status) override {
            Y_VERIFY(status == MESSAGE_CONNECT_FAILED || status == MESSAGE_TIMEOUT, "got status: %s", ToString(status).data());
            ErrorHappened.Signal();
        }
    };

    void NoServer_SendTimeout_Callback_Impl(bool periodLessThanConnectTimeout) {
        TObjectCountCheck objectCountCheck;

        TNetAddr serverAddr("localhost", 17);

        TSendTimeoutCheckerExampleClient client(periodLessThanConnectTimeout);

        client.SendMessages(1, serverAddr);

        client.ErrorHappened.WaitI();
    }

    Y_UNIT_TEST(NoServer_SendTimeout_Callback_PeriodLess) { 
        NoServer_SendTimeout_Callback_Impl(true);
    }

    Y_UNIT_TEST(NoServer_SendTimeout_Callback_TimeoutLess) { 
        NoServer_SendTimeout_Callback_Impl(false);
    }

    Y_UNIT_TEST(TestOnReplyCalledAfterOnMessageSent) { 
        TObjectCountCheck objectCountCheck;

        TExampleServer server;
        TNetAddr serverAddr = server.GetActualListenAddr();
        TExampleClientSlowOnMessageSent client;

        TAutoPtr<TExampleRequest> message(new TExampleRequest(&client.Proto.RequestCount));
        EMessageStatus s = client.Session->SendMessageAutoPtr(message, &serverAddr);
        UNIT_ASSERT_EQUAL(s, MESSAGE_OK);

        UNIT_ASSERT(client.ReplyReceived.WaitT(TDuration::Seconds(5)));
    }

    struct TDelayReplyServer: public TBusServerHandlerError {
        TBusMessageQueuePtr Bus;
        TExampleProtocol Proto;
        TSystemEvent MessageReceivedEvent; // 1 wait for 1 message
        TBusServerSessionPtr Session;
        TMutex Lock_;
        TDeque<TAutoPtr<TOnMessageContext>> DelayedMessages;

        TDelayReplyServer()
            : MessageReceivedEvent(TEventResetType::rAuto)
        {
            Bus = CreateMessageQueue("TDelayReplyServer");
            TBusServerSessionConfig sessionConfig;
            sessionConfig.SendTimeout = 1000;
            sessionConfig.TotalTimeout = 2001;
            Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus);
            if (!Session) {
                ythrow yexception() << "Failed to create destination session";
            }
        }

        void OnMessage(TOnMessageContext& mess) override {
            Y_VERIFY(mess.IsConnectionAlive(), "connection should be alive here"); 
            TAutoPtr<TOnMessageContext> delayedMsg(new TOnMessageContext);
            delayedMsg->Swap(mess);
            auto g(Guard(Lock_));
            DelayedMessages.push_back(delayedMsg);
            MessageReceivedEvent.Signal();
        }

        bool CheckClientIsAlive() {
            auto g(Guard(Lock_));
            for (auto& delayedMessage : DelayedMessages) { 
                if (!delayedMessage->IsConnectionAlive()) { 
                    return false;
                }
            }
            return true;
        }

        bool CheckClientIsDead() const {
            auto g(Guard(Lock_));
            for (const auto& delayedMessage : DelayedMessages) { 
                if (delayedMessage->IsConnectionAlive()) { 
                    return false;
                }
            }
            return true;
        }

        void ReplyToDelayedMessages() {
            while (true) {
                TOnMessageContext msg;
                {
                    auto g(Guard(Lock_));
                    if (DelayedMessages.empty()) {
                        break;
                    }
                    DelayedMessages.front()->Swap(msg);
                    DelayedMessages.pop_front();
                }
                TAutoPtr<TBusMessage> reply(new TExampleResponse(&Proto.ResponseCount));
                msg.SendReplyMove(reply);
            }
        }

        size_t GetDelayedMessageCount() const {
            auto g(Guard(Lock_));
            return DelayedMessages.size();
        }

        void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override {
            Y_UNUSED(mess); 
            Y_VERIFY(status == MESSAGE_SHUTDOWN, "only shutdown allowed, got %s", ToString(status).data());
        }
    };

    Y_UNIT_TEST(TestReplyCalledAfterClientDisconnected) { 
        TObjectCountCheck objectCountCheck;

        TDelayReplyServer server;

        THolder<TExampleClient> client(new TExampleClient);

        client->SendMessages(1, TNetAddr("localhost", server.Session->GetActualListenPort()));

        UNIT_ASSERT(server.MessageReceivedEvent.WaitT(TDuration::Seconds(5)));

        UNIT_ASSERT_VALUES_EQUAL(1, server.Session->GetInFlight());

        client.Destroy();

        UNIT_WAIT_FOR(server.CheckClientIsDead());

        server.ReplyToDelayedMessages();

        // wait until all server message are delivered
        UNIT_WAIT_FOR(0 == server.Session->GetInFlight());
    }

    struct TPackUnpackServer: public TBusServerHandlerError {
        TBusMessageQueuePtr Bus;
        TExampleProtocol Proto;
        TSystemEvent MessageReceivedEvent;
        TSystemEvent ClientDiedEvent;
        TBusServerSessionPtr Session;

        TPackUnpackServer() {
            Bus = CreateMessageQueue("TPackUnpackServer");
            TBusServerSessionConfig sessionConfig;
            Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus);
        }

        void OnMessage(TOnMessageContext& mess) override {
            TBusIdentity ident;
            mess.AckMessage(ident);

            char packed[BUS_IDENTITY_PACKED_SIZE];
            ident.Pack(packed);
            TBusIdentity resurrected;
            resurrected.Unpack(packed);

            mess.GetSession()->SendReply(resurrected, new TExampleResponse(&Proto.ResponseCount));
        }

        void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override {
            Y_UNUSED(mess); 
            Y_VERIFY(status == MESSAGE_SHUTDOWN, "only shutdown allowed"); 
        }
    };

    Y_UNIT_TEST(PackUnpack) { 
        TObjectCountCheck objectCountCheck;

        TPackUnpackServer server;

        THolder<TExampleClient> client(new TExampleClient);

        client->SendMessagesWaitReplies(1, TNetAddr("localhost", server.Session->GetActualListenPort()));
    }

    Y_UNIT_TEST(ClientRequestTooLarge) { 
        TObjectCountCheck objectCountCheck;

        TExampleServer server;

        TBusClientSessionConfig clientConfig;
        clientConfig.MaxMessageSize = 100;
        TExampleClient client(clientConfig);

        client.DataSize = 10;
        client.SendMessagesWaitReplies(1, server.GetActualListenAddr());

        client.DataSize = 1000;
        client.SendMessages(1, server.GetActualListenAddr());
        client.WaitForError(MESSAGE_MESSAGE_TOO_LARGE);

        client.DataSize = 20;
        client.SendMessagesWaitReplies(10, server.GetActualListenAddr());

        client.DataSize = 10000;
        client.SendMessages(1, server.GetActualListenAddr());
        client.WaitForError(MESSAGE_MESSAGE_TOO_LARGE);
    }

    struct TServerForResponseTooLarge: public TExampleServer {
        TTestSync TestSync;

        static TBusServerSessionConfig Config() {
            TBusServerSessionConfig config;
            config.MaxMessageSize = 100;
            return config;
        }

        TServerForResponseTooLarge()
            : TExampleServer("TServerForResponseTooLarge", Config())
        {
        }

        ~TServerForResponseTooLarge() override {
            Session->Shutdown();
        }

        void OnMessage(TOnMessageContext& mess) override {
            TAutoPtr<TBusMessage> response;

            if (TestSync.Get() == 0) {
                TestSync.CheckAndIncrement(0);
                response.Reset(new TExampleResponse(&Proto.ResponseCount, 1000));
            } else {
                TestSync.WaitForAndIncrement(3);
                response.Reset(new TExampleResponse(&Proto.ResponseCount, 10));
            }

            mess.SendReplyMove(response);
        }

        void OnError(TAutoPtr<TBusMessage>, EMessageStatus status) override {
            TestSync.WaitForAndIncrement(1);

            Y_VERIFY(status == MESSAGE_MESSAGE_TOO_LARGE, "status"); 
        }
    };

    Y_UNIT_TEST(ServerResponseTooLarge) { 
        TObjectCountCheck objectCountCheck;

        TServerForResponseTooLarge server;

        TExampleClient client;
        client.DataSize = 10;

        client.SendMessages(1, server.GetActualListenAddr());
        server.TestSync.WaitForAndIncrement(2);
        client.ResetCounters();

        client.SendMessages(1, server.GetActualListenAddr());

        client.WorkDone.WaitI();

        server.TestSync.CheckAndIncrement(4);

        UNIT_ASSERT_VALUES_EQUAL(1, client.Session->GetInFlight());
    }

    struct TServerForRequestTooLarge: public TExampleServer {
        TTestSync TestSync;

        static TBusServerSessionConfig Config() {
            TBusServerSessionConfig config;
            config.MaxMessageSize = 100;
            return config;
        }

        TServerForRequestTooLarge()
            : TExampleServer("TServerForRequestTooLarge", Config())
        {
        }

        ~TServerForRequestTooLarge() override {
            Session->Shutdown();
        }

        void OnMessage(TOnMessageContext& req) override {
            unsigned n = TestSync.Get();
            if (n < 2) {
                TestSync.CheckAndIncrement(n);
                TAutoPtr<TExampleResponse> resp(new TExampleResponse(&Proto.ResponseCount, 10));
                req.SendReplyMove(resp);
            } else {
                Y_FAIL("wrong"); 
            }
        }
    };

    Y_UNIT_TEST(ServerRequestTooLarge) { 
        TObjectCountCheck objectCountCheck;

        TServerForRequestTooLarge server;

        TExampleClient client;
        client.DataSize = 10;

        client.SendMessagesWaitReplies(2, server.GetActualListenAddr());

        server.TestSync.CheckAndIncrement(2);

        client.DataSize = 200;
        client.SendMessages(1, server.GetActualListenAddr());
        // server closes connection, so MESSAGE_DELIVERY_FAILED is returned to client
        client.WaitForError(MESSAGE_DELIVERY_FAILED);
    }

    Y_UNIT_TEST(ClientResponseTooLarge) { 
        TObjectCountCheck objectCountCheck;

        TExampleServer server;

        server.DataSize = 10;

        TBusClientSessionConfig clientSessionConfig;
        clientSessionConfig.MaxMessageSize = 100;
        TExampleClient client(clientSessionConfig);
        client.DataSize = 10;

        client.SendMessagesWaitReplies(3, server.GetActualListenAddr());

        server.DataSize = 1000;

        client.SendMessages(1, server.GetActualListenAddr());
        client.WaitForError(MESSAGE_DELIVERY_FAILED);
    }

    Y_UNIT_TEST(ServerUnknownMessage) { 
        TObjectCountCheck objectCountCheck;

        TExampleServer server;
        TNetAddr serverAddr = server.GetActualListenAddr();

        TExampleClient client;

        client.SendMessagesWaitReplies(2, serverAddr);

        TAutoPtr<TBusMessage> req(new TExampleRequest(&client.Proto.RequestCount));
        req->GetHeader()->Type = 11;
        client.Session->SendMessageAutoPtr(req, &serverAddr);
        client.MessageCount = 1;

        client.WaitForError(MESSAGE_DELIVERY_FAILED);
    }

    Y_UNIT_TEST(ServerMessageReservedIds) { 
        TObjectCountCheck objectCountCheck;

        TExampleServer server;
        TNetAddr serverAddr = server.GetActualListenAddr();

        TExampleClient client;

        client.SendMessagesWaitReplies(2, serverAddr);

        // This test doens't check 0, 1, YBUS_KEYINVALID because there are asserts() on sending side

        TAutoPtr<TBusMessage> req(new TExampleRequest(&client.Proto.RequestCount));
        req->GetHeader()->Id = 2;
        client.Session->SendMessageAutoPtr(req, &serverAddr);
        client.MessageCount = 1;
        client.WaitForError(MESSAGE_DELIVERY_FAILED);

        req.Reset(new TExampleRequest(&client.Proto.RequestCount));
        req->GetHeader()->Id = YBUS_KEYLOCAL;
        client.Session->SendMessageAutoPtr(req, &serverAddr);
        client.MessageCount = 1;
        client.WaitForError(MESSAGE_DELIVERY_FAILED);
    }

    Y_UNIT_TEST(TestGetInFlightForDestination) { 
        TObjectCountCheck objectCountCheck;

        TDelayReplyServer server;

        TExampleClient client;

        TNetAddr addr("localhost", server.Session->GetActualListenPort());

        UNIT_ASSERT_VALUES_EQUAL(size_t(0), client.Session->GetInFlight(addr));

        client.SendMessages(2, &addr);

        for (size_t i = 0; i < 5; ++i) {
            // One MessageReceivedEvent indicates one message, we need to wait for two
            UNIT_ASSERT(server.MessageReceivedEvent.WaitT(TDuration::Seconds(5)));
            if (server.GetDelayedMessageCount() == 2) {
                break;
            }
        }
        UNIT_ASSERT_VALUES_EQUAL(server.GetDelayedMessageCount(), 2);

        size_t inFlight = client.Session->GetInFlight(addr);
        // 4 is for messagebus1 that adds inFlight counter twice for some reason
        UNIT_ASSERT(inFlight == 2 || inFlight == 4);

        UNIT_ASSERT(server.CheckClientIsAlive());

        server.ReplyToDelayedMessages();

        client.WaitReplies();
    }

    struct TResetAfterSendOneWayErrorInCallbackClient: public TExampleClient {
        TTestSync TestSync;

        static TBusClientSessionConfig SessionConfig() {
            TBusClientSessionConfig config;
            // 1 ms is not enough when test is running under valgrind
            config.ConnectTimeout = 10;
            config.SendTimeout = 10;
            config.Secret.TimeoutPeriod = TDuration::MilliSeconds(1);
            return config;
        }

        TResetAfterSendOneWayErrorInCallbackClient()
            : TExampleClient(SessionConfig())
        {
        }

        ~TResetAfterSendOneWayErrorInCallbackClient() override {
            Session->Shutdown();
        }

        void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override {
            TestSync.WaitForAndIncrement(0);
            Y_VERIFY(status == MESSAGE_CONNECT_FAILED || status == MESSAGE_TIMEOUT, "must be connection failed, got %s", ToString(status).data());
            mess.Destroy();
            TestSync.CheckAndIncrement(1);
        }
    };

    Y_UNIT_TEST(ResetAfterSendOneWayErrorInCallback) { 
        TObjectCountCheck objectCountCheck;

        TNetAddr noServerAddr("localhost", 17);

        TResetAfterSendOneWayErrorInCallbackClient client;

        EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &noServerAddr);
        UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok);

        client.TestSync.WaitForAndIncrement(2);
    }

    struct TResetAfterSendMessageOneWayDuringShutdown: public TExampleClient {
        TTestSync TestSync;

        ~TResetAfterSendMessageOneWayDuringShutdown() override {
            Session->Shutdown();
        }

        void OnError(TAutoPtr<TBusMessage> message, EMessageStatus status) override {
            TestSync.CheckAndIncrement(0);

            Y_VERIFY(status == MESSAGE_CONNECT_FAILED, "must be MESSAGE_CONNECT_FAILED, got %s", ToString(status).data());

            // check reset is possible here
            message->Reset();

            // intentionally don't destroy the message
            // we will try to resend it
            Y_UNUSED(message.Release());

            TestSync.CheckAndIncrement(1);
        }
    };

    Y_UNIT_TEST(ResetAfterSendMessageOneWayDuringShutdown) { 
        TObjectCountCheck objectCountCheck;

        TNetAddr noServerAddr("localhost", 17);

        TResetAfterSendMessageOneWayDuringShutdown client;

        TExampleRequest* message = new TExampleRequest(&client.Proto.RequestCount);
        EMessageStatus ok = client.Session->SendMessageOneWay(message, &noServerAddr);
        UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok);

        client.TestSync.WaitForAndIncrement(2);

        client.Session->Shutdown();

        ok = client.Session->SendMessageOneWay(message);
        Y_VERIFY(ok == MESSAGE_SHUTDOWN, "must be shutdown when sending during shutdown, got %s", ToString(ok).data());

        // check reset is possible here
        message->Reset();
        client.TestSync.CheckAndIncrement(3);

        delete message;
    }

    Y_UNIT_TEST(ResetAfterSendOneWayErrorInReturn) { 
        TObjectCountCheck objectCountCheck;

        TestNoServerImpl(17, true);
    }

    struct TResetAfterSendOneWaySuccessClient: public TExampleClient {
        TTestSync TestSync;

        ~TResetAfterSendOneWaySuccessClient() override {
            Session->Shutdown();
        }

        void OnMessageSentOneWay(TAutoPtr<TBusMessage> sent) override {
            TestSync.WaitForAndIncrement(0);
            sent->Reset();
            TestSync.CheckAndIncrement(1);
        }
    };

    Y_UNIT_TEST(ResetAfterSendOneWaySuccess) { 
        TObjectCountCheck objectCountCheck;

        TExampleServer server;
        TNetAddr serverAddr = server.GetActualListenAddr();

        TResetAfterSendOneWaySuccessClient client;

        EMessageStatus ok = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &serverAddr);
        UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok);
        // otherwize message might go to OnError(MESSAGE_SHUTDOWN)
        server.WaitForOnMessageCount(1);

        client.TestSync.WaitForAndIncrement(2);
    }

    Y_UNIT_TEST(GetStatus) { 
        TObjectCountCheck objectCountCheck;

        TExampleServer server;

        TExampleClient client;
        // make sure connected
        client.SendMessagesWaitReplies(3, server.GetActualListenAddr());

        server.Bus->GetStatus();
        server.Bus->GetStatus();
        server.Bus->GetStatus();

        client.Bus->GetStatus();
        client.Bus->GetStatus();
        client.Bus->GetStatus();
    }

    Y_UNIT_TEST(BindOnRandomPort) { 
        TObjectCountCheck objectCountCheck;

        TBusServerSessionConfig serverConfig;
        TExampleServer server;

        TExampleClient client;
        TNetAddr addr(TNetAddr("127.0.0.1", server.Session->GetActualListenPort()));
        client.SendMessagesWaitReplies(3, &addr);
    }

    Y_UNIT_TEST(UnbindOnShutdown) { 
        TBusMessageQueuePtr queue(CreateMessageQueue());

        TExampleProtocol proto;
        TBusServerHandlerError handler;
        TBusServerSessionPtr session = TBusServerSession::Create(
            &proto, &handler, TBusServerSessionConfig(), queue);

        unsigned port = session->GetActualListenPort();
        UNIT_ASSERT(port > 0);

        session->Shutdown();

        // fails is Shutdown() didn't unbind
        THangingServer hangingServer(port);
    }

    Y_UNIT_TEST(VersionNegotiation) { 
        TObjectCountCheck objectCountCheck;

        TExampleServer server;

        TSockAddrInet addr(IpFromString("127.0.0.1"), server.Session->GetActualListenPort());

        TInetStreamSocket socket;
        int r1 = socket.Connect(&addr);
        UNIT_ASSERT(r1 >= 0);

        TStreamSocketOutput output(&socket);

        TBusHeader request;
        Zero(request);
        request.Size = sizeof(request);
        request.SetVersionInternal(0xF); // max
        output.Write(&request, sizeof(request));

        UNIT_ASSERT_VALUES_EQUAL(IsVersionNegotiation(request), true);

        TStreamSocketInput input(&socket);

        TBusHeader response;
        size_t pos = 0;

        while (pos < sizeof(response)) {
            size_t count = input.Read(((char*)&response) + pos, sizeof(response) - pos);
            pos += count;
        }

        UNIT_ASSERT_VALUES_EQUAL(sizeof(response), pos);

        UNIT_ASSERT_VALUES_EQUAL(YBUS_VERSION, response.GetVersionInternal());
    }

    struct TOnConnectionEventClient: public TExampleClient {
        TTestSync Sync;

        ~TOnConnectionEventClient() override {
            Session->Shutdown();
        }

        void OnClientConnectionEvent(const TClientConnectionEvent& event) override {
            if (Sync.Get() > 2) {
                // Test OnClientConnectionEvent_Disconnect is broken.
                // Sometimes reconnect happens during server shutdown
                // when acceptor connections is still alive, and
                // server connection is already closed
                return;
            }

            if (event.GetType() == TClientConnectionEvent::CONNECTED) {
                Sync.WaitForAndIncrement(0);
            } else if (event.GetType() == TClientConnectionEvent::DISCONNECTED) {
                Sync.WaitForAndIncrement(2);
            }
        }

        void OnError(TAutoPtr<TBusMessage>, EMessageStatus) override {
            // We do not check for message errors in this test.
        }

        void OnMessageSentOneWay(TAutoPtr<TBusMessage>) override {
        }
    };

    struct TOnConnectionEventServer: public TExampleServer {
        TOnConnectionEventServer()
            : TExampleServer("TOnConnectionEventServer")
        {
        }

        ~TOnConnectionEventServer() override {
            Session->Shutdown();
        }

        void OnError(TAutoPtr<TBusMessage>, EMessageStatus) override {
            // We do not check for server message errors in this test.
        }
    };

    Y_UNIT_TEST(OnClientConnectionEvent_Shutdown) { 
        TObjectCountCheck objectCountCheck;

        TOnConnectionEventServer server;

        TOnConnectionEventClient client;

        TNetAddr addr("127.0.0.1", server.Session->GetActualListenPort());

        client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &addr);

        client.Sync.WaitForAndIncrement(1);

        client.Session->Shutdown();

        client.Sync.WaitForAndIncrement(3);
    }

    Y_UNIT_TEST(OnClientConnectionEvent_Disconnect) { 
        TObjectCountCheck objectCountCheck;

        THolder<TOnConnectionEventServer> server(new TOnConnectionEventServer);

        TOnConnectionEventClient client;
        TNetAddr addr("127.0.0.1", server->Session->GetActualListenPort());

        client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &addr);

        client.Sync.WaitForAndIncrement(1);

        server.Destroy();

        client.Sync.WaitForAndIncrement(3);
    }

    struct TServerForQuotaWake: public TExampleServer {
        TSystemEvent GoOn;
        TMutex OneLock;

        TOnMessageContext OneMessage;

        static TBusServerSessionConfig Config() {
            TBusServerSessionConfig config;

            config.PerConnectionMaxInFlight = 1;
            config.PerConnectionMaxInFlightBySize = 1500;
            config.MaxMessageSize = 1024;

            return config;
        }

        TServerForQuotaWake()
            : TExampleServer("TServerForQuotaWake", Config())
        {
        }

        ~TServerForQuotaWake() override {
            Session->Shutdown();
        }

        void OnMessage(TOnMessageContext& req) override {
            if (!GoOn.Wait(0)) {
                TGuard<TMutex> guard(OneLock);

                UNIT_ASSERT(!OneMessage);

                OneMessage.Swap(req);
            } else
                TExampleServer::OnMessage(req);
        }

        void WakeOne() {
            TGuard<TMutex> guard(OneLock);

            UNIT_ASSERT(!!OneMessage);

            TExampleServer::OnMessage(OneMessage);

            TOnMessageContext().Swap(OneMessage);
        }
    };

    Y_UNIT_TEST(WakeReaderOnQuota) { 
        const size_t test_msg_count = 64;

        TBusClientSessionConfig clientConfig;

        clientConfig.MaxInFlight = test_msg_count;

        TExampleClient client(clientConfig);
        TServerForQuotaWake server;
        TInstant start;

        client.MessageCount = test_msg_count;

        const NBus::TNetAddr addr = server.GetActualListenAddr();

        for (unsigned count = 0;;) {
            UNIT_ASSERT(count <= test_msg_count);

            TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount));
            EMessageStatus status = client.Session->SendMessageAutoPtr(message, &addr);

            if (status == MESSAGE_OK) {
                count++;

            } else if (status == MESSAGE_BUSY) {
                if (count == test_msg_count) {
                    TInstant now = TInstant::Now();

                    if (start.GetValue() == 0) {
                        start = now;

                        // TODO: properly check that server is blocked
                    } else if (start + TDuration::MilliSeconds(100) < now) {
                        break;
                    }
                }

                Sleep(TDuration::MilliSeconds(10));

            } else
                UNIT_ASSERT(false);
        }

        server.GoOn.Signal();
        server.WakeOne();

        client.WaitReplies();

        server.WaitForOnMessageCount(test_msg_count);
    };

    Y_UNIT_TEST(TestConnectionAttempts) { 
        TObjectCountCheck objectCountCheck;

        TNetAddr noServerAddr("localhost", 17);
        TBusClientSessionConfig clientConfig;
        clientConfig.RetryInterval = 100;
        TestNoServerImplClient client(clientConfig);

        int count = 0;
        for (; count < 10; ++count) {
            EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount),
                                                                      &noServerAddr);

            Y_VERIFY(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data());
            client.TestSync.WaitForAndIncrement(count * 2 + 1);

            // First connection attempt is for connect call; second one is to get connect result.
            UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2);
        }
        Sleep(TDuration::MilliSeconds(clientConfig.RetryInterval));
        for (; count < 10; ++count) {
            EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount),
                                                                      &noServerAddr);

            Y_VERIFY(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data());
            client.TestSync.WaitForAndIncrement(count * 2 + 1);

            // First connection attempt is for connect call; second one is to get connect result.
            UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 4);
        }
    };

    Y_UNIT_TEST(TestConnectionAttemptsOnNoMessagesAndNotReconnectWhenIdle) { 
        TObjectCountCheck objectCountCheck;

        TNetAddr noServerAddr("localhost", 17);
        TBusClientSessionConfig clientConfig;
        clientConfig.RetryInterval = 100;
        clientConfig.ReconnectWhenIdle = false;
        TestNoServerImplClient client(clientConfig);

        int count = 0;
        for (; count < 10; ++count) {
            EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount),
                                                                      &noServerAddr);

            Y_VERIFY(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data());
            client.TestSync.WaitForAndIncrement(count * 2 + 1);

            // First connection attempt is for connect call; second one is to get connect result.
            UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2);
        }

        Sleep(TDuration::MilliSeconds(clientConfig.RetryInterval / 2));
        UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2);
        Sleep(TDuration::MilliSeconds(10 * clientConfig.RetryInterval));
        UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2);
    };

    Y_UNIT_TEST(TestConnectionAttemptsOnNoMessagesAndReconnectWhenIdle) { 
        TObjectCountCheck objectCountCheck;

        TNetAddr noServerAddr("localhost", 17);
        TBusClientSessionConfig clientConfig;
        clientConfig.ReconnectWhenIdle = true;
        clientConfig.RetryInterval = 100;
        TestNoServerImplClient client(clientConfig);

        int count = 0;
        for (; count < 10; ++count) {
            EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount),
                                                                      &noServerAddr);

            Y_VERIFY(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data());
            client.TestSync.WaitForAndIncrement(count * 2 + 1);

            // First connection attempt is for connect call; second one is to get connect result.
            UNIT_ASSERT_VALUES_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2);
        }

        Sleep(TDuration::MilliSeconds(clientConfig.RetryInterval / 2));
        UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2);
        Sleep(TDuration::MilliSeconds(10 * clientConfig.RetryInterval));
        // it is undeterministic how many reconnects will be during that amount of time
        // but it should occur at least once
        UNIT_ASSERT(client.Session->GetConnectSyscallsNumForTest(noServerAddr) > 2);
    };
};