diff options
| author | nohttp <[email protected]> | 2022-02-10 16:50:31 +0300 | 
|---|---|---|
| committer | Daniil Cherednik <[email protected]> | 2022-02-10 16:50:31 +0300 | 
| commit | dbdd851418e92f8821ee1b1041352d66e3bde170 (patch) | |
| tree | c51433a8aa6b29260a3fbf22c9e9272b2c792c83 /library/cpp | |
| parent | 8de5e9fef85b2ab655e3bc1d77ee2674f417cd15 (diff) | |
Restoring authorship annotation for <[email protected]>. Commit 1 of 2.
Diffstat (limited to 'library/cpp')
| -rw-r--r-- | library/cpp/messagebus/remote_client_connection.cpp | 14 | ||||
| -rw-r--r-- | library/cpp/messagebus/remote_connection.cpp | 6 | ||||
| -rw-r--r-- | library/cpp/messagebus/session.cpp | 2 | ||||
| -rw-r--r-- | library/cpp/messagebus/session_impl.cpp | 28 | ||||
| -rw-r--r-- | library/cpp/messagebus/test/helper/example.cpp | 16 | ||||
| -rw-r--r-- | library/cpp/messagebus/test/ut/messagebus_ut.cpp | 252 | 
6 files changed, 159 insertions, 159 deletions
| diff --git a/library/cpp/messagebus/remote_client_connection.cpp b/library/cpp/messagebus/remote_client_connection.cpp index 8c7a6db3a8c..977f53af7d8 100644 --- a/library/cpp/messagebus/remote_client_connection.cpp +++ b/library/cpp/messagebus/remote_client_connection.cpp @@ -67,7 +67,7 @@ void TRemoteClientConnection::TryConnect() {      if (!WriterData.Channel) {          if ((now - LastConnectAttempt) < TDuration::MilliSeconds(Config.RetryInterval)) { -            DropEnqueuedData(MESSAGE_CONNECT_FAILED, MESSAGE_CONNECT_FAILED); +            DropEnqueuedData(MESSAGE_CONNECT_FAILED, MESSAGE_CONNECT_FAILED);               return;          }          LastConnectAttempt = now; @@ -77,11 +77,11 @@ void TRemoteClientConnection::TryConnect() {      }      if (BeforeSendQueue.IsEmpty() && WriterData.SendQueue.Empty() && !Config.ReconnectWhenIdle) { -        // TryConnect is called from Writer::Act, which is called in cycle -        // from session's ScheduleTimeoutMessages via Cron. This prevent these excessive connects. -        return; -    } - +        // TryConnect is called from Writer::Act, which is called in cycle  +        // from session's ScheduleTimeoutMessages via Cron. This prevent these excessive connects.  +        return;  +    }  +       ++WriterData.Status.ConnectSyscalls;      int ret = connect(WriterData.Channel->GetSocket(), PeerAddr.Addr(), PeerAddr.Len()); @@ -113,7 +113,7 @@ void TRemoteClientConnection::TryConnect() {              WriterData.Status.Connected = false;              WriterData.Status.ConnectError = err; -            DropEnqueuedData(MESSAGE_CONNECT_FAILED, MESSAGE_CONNECT_FAILED); +            DropEnqueuedData(MESSAGE_CONNECT_FAILED, MESSAGE_CONNECT_FAILED);           }      }  } diff --git a/library/cpp/messagebus/remote_connection.cpp b/library/cpp/messagebus/remote_connection.cpp index 22932569dbd..915947b1934 100644 --- a/library/cpp/messagebus/remote_connection.cpp +++ b/library/cpp/messagebus/remote_connection.cpp @@ -572,9 +572,9 @@ namespace NBus {              BeforeTryWrite();              WriterFillInFlight(); - +               WriterGetReconnectQueue()->DequeueAllLikelyEmpty(); - +               if (!WriterData.Status.Connected) {                  TryConnect();              } else { @@ -584,7 +584,7 @@ namespace NBus {                          GetWriterActor()->AddTaskFromActorLoop();                          break;                      } - +                       if (WriterData.State == WRITER_FILLING) {                          WriterFillBuffer(); diff --git a/library/cpp/messagebus/session.cpp b/library/cpp/messagebus/session.cpp index 46a7ece6a8f..afeceec54d3 100644 --- a/library/cpp/messagebus/session.cpp +++ b/library/cpp/messagebus/session.cpp @@ -59,7 +59,7 @@ namespace NBus {                  // '[]' and '[<address>' are errors.                  return false;              } - +               *hostName = host.substr(1, pos - 1);              pos++; diff --git a/library/cpp/messagebus/session_impl.cpp b/library/cpp/messagebus/session_impl.cpp index ddf9f360c43..d9163d3f3ac 100644 --- a/library/cpp/messagebus/session_impl.cpp +++ b/library/cpp/messagebus/session_impl.cpp @@ -211,22 +211,22 @@ void TBusSessionImpl::GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef      }  } -size_t TBusSessionImpl::GetConnectSyscallsNumForTestImpl(const TNetAddr& addr) const { -    TRemoteConnectionPtr conn = const_cast<TBusSessionImpl*>(this)->GetConnection(addr, false); -    if (!!conn) { -        return conn->GetConnectSyscallsNumForTest(); -    } else { -        return 0; -    } -} - +size_t TBusSessionImpl::GetConnectSyscallsNumForTestImpl(const TNetAddr& addr) const {  +    TRemoteConnectionPtr conn = const_cast<TBusSessionImpl*>(this)->GetConnection(addr, false);  +    if (!!conn) {  +        return conn->GetConnectSyscallsNumForTest();  +    } else {  +        return 0;  +    }  +}  +   void TBusSessionImpl::GetConnectSyscallsNumBulkForTest(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const {      Y_VERIFY(addrs.size() == results.size(), "input.size != output.size"); -    for (size_t i = 0; i < addrs.size(); ++i) { -        results[i] = GetConnectSyscallsNumForTestImpl(addrs[i]); -    } -} - +    for (size_t i = 0; i < addrs.size(); ++i) {  +        results[i] = GetConnectSyscallsNumForTestImpl(addrs[i]);  +    }  +}  +   void TBusSessionImpl::FillStatus() {  } diff --git a/library/cpp/messagebus/test/helper/example.cpp b/library/cpp/messagebus/test/helper/example.cpp index 7c6d704042c..5ee0ec3958b 100644 --- a/library/cpp/messagebus/test/helper/example.cpp +++ b/library/cpp/messagebus/test/helper/example.cpp @@ -166,24 +166,24 @@ void TExampleClient::WaitReplies() {      ResetCounters();  } -EMessageStatus TExampleClient::WaitForError() { +EMessageStatus TExampleClient::WaitForError() {       WorkDone.WaitT(TDuration::Seconds(60));      UNIT_ASSERT_VALUES_EQUAL(1, MessageCount);      UNIT_ASSERT_VALUES_EQUAL(0, AtomicGet(RepliesCount));      UNIT_ASSERT_VALUES_EQUAL(0, Session->GetInFlight());      UNIT_ASSERT_VALUES_EQUAL(1, Errors); -    EMessageStatus result = LastError; +    EMessageStatus result = LastError;       ResetCounters(); -    return result; -} - -void TExampleClient::WaitForError(EMessageStatus status) { -    EMessageStatus error = WaitForError(); -    UNIT_ASSERT_VALUES_EQUAL(status, error); +    return result;   } +void TExampleClient::WaitForError(EMessageStatus status) {  +    EMessageStatus error = WaitForError();  +    UNIT_ASSERT_VALUES_EQUAL(status, error);  +}  +   void TExampleClient::SendMessagesWaitReplies(size_t count, const TNetAddr* addr) {      SendMessages(count, addr);      WaitReplies(); diff --git a/library/cpp/messagebus/test/ut/messagebus_ut.cpp b/library/cpp/messagebus/test/ut/messagebus_ut.cpp index 040f9b77022..20754561a2a 100644 --- a/library/cpp/messagebus/test/ut/messagebus_ut.cpp +++ b/library/cpp/messagebus/test/ut/messagebus_ut.cpp @@ -98,7 +98,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {          THolder<TExampleServer> server;          TBusClientSessionConfig clientConfig; -        clientConfig.RetryInterval = 0; +        clientConfig.RetryInterval = 0;           TExampleClient client(clientConfig);          server.Reset(new TExampleServer(port, "TExampleServer 1")); @@ -106,25 +106,25 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {          client.SendMessagesWaitReplies(17, serverAddr);          server.Destroy(); - -        // Making the client to detect disconnection. -        client.SendMessages(1, serverAddr); -        EMessageStatus error = client.WaitForError(); -        if (error == MESSAGE_DELIVERY_FAILED) { -            client.SendMessages(1, serverAddr); -            error = client.WaitForError(); -        } -        UNIT_ASSERT_VALUES_EQUAL(MESSAGE_CONNECT_FAILED, error); - +  +        // Making the client to detect disconnection.  +        client.SendMessages(1, serverAddr);  +        EMessageStatus error = client.WaitForError();  +        if (error == MESSAGE_DELIVERY_FAILED) {  +            client.SendMessages(1, serverAddr);  +            error = client.WaitForError();  +        }  +        UNIT_ASSERT_VALUES_EQUAL(MESSAGE_CONNECT_FAILED, error);  +           server.Reset(new TExampleServer(port, "TExampleServer 2"));          client.SendMessagesWaitReplies(19, serverAddr);      }      struct TestNoServerImplClient: public TExampleClient { -        TTestSync TestSync; -        int failures = 0; - +        TTestSync TestSync;  +        int failures = 0;  +           template <typename... Args>          TestNoServerImplClient(Args&&... args)              : TExampleClient(std::forward<Args>(args)...) @@ -132,48 +132,48 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {          }          ~TestNoServerImplClient() override { -            Session->Shutdown(); -        } - +            Session->Shutdown();  +        }  +           void OnError(TAutoPtr<TBusMessage> message, EMessageStatus status) override {              Y_UNUSED(message); - +               Y_VERIFY(status == MESSAGE_CONNECT_FAILED, "must be MESSAGE_CONNECT_FAILED, got %s", ToString(status).data()); - -            TestSync.CheckAndIncrement((failures++) * 2); -        } -    }; - -    void TestNoServerImpl(unsigned port, bool oneWay) { +  +            TestSync.CheckAndIncrement((failures++) * 2);  +        }  +    };  +  +    void TestNoServerImpl(unsigned port, bool oneWay) {           TNetAddr noServerAddr("localhost", port); -        TestNoServerImplClient client; - -        int count = 0; -        for (; count < 200; ++count) { -            EMessageStatus status; -            if (oneWay) { -                status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &noServerAddr); -            } else { +        TestNoServerImplClient client;  +  +        int count = 0;  +        for (; count < 200; ++count) {  +            EMessageStatus status;  +            if (oneWay) {  +                status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &noServerAddr);  +            } else {                   TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount)); -                status = client.Session->SendMessageAutoPtr(message, &noServerAddr); -            } - +                status = client.Session->SendMessageAutoPtr(message, &noServerAddr);  +            }  +               Y_VERIFY(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data()); - -            if (count == 0) { -                // lame way to wait until it is connected -                Sleep(TDuration::MilliSeconds(10)); -            } -            client.TestSync.WaitForAndIncrement(count * 2 + 1); -        } - +  +            if (count == 0) {  +                // lame way to wait until it is connected  +                Sleep(TDuration::MilliSeconds(10));  +            }  +            client.TestSync.WaitForAndIncrement(count * 2 + 1);  +        }  +           client.TestSync.WaitForAndIncrement(count * 2); -    } - -    void HangingServerImpl(unsigned port) { -        TNetAddr noServerAddr("localhost", port); - +    }  +  +    void HangingServerImpl(unsigned port) {  +        TNetAddr noServerAddr("localhost", port);  +           TExampleClient client;          int count = 0; @@ -199,13 +199,13 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {          THangingServer server(0); -        HangingServerImpl(server.GetPort()); +        HangingServerImpl(server.GetPort());       }      Y_UNIT_TEST(TestNoServer) {          TObjectCountCheck objectCountCheck; -        TestNoServerImpl(17, false); +        TestNoServerImpl(17, false);       }      Y_UNIT_TEST(PauseInput) { @@ -746,20 +746,20 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {          TResetAfterSendMessageOneWayDuringShutdown client; -        TExampleRequest* message = new TExampleRequest(&client.Proto.RequestCount); -        EMessageStatus ok = client.Session->SendMessageOneWay(message, &noServerAddr); +        TExampleRequest* message = new TExampleRequest(&client.Proto.RequestCount);  +        EMessageStatus ok = client.Session->SendMessageOneWay(message, &noServerAddr);           UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok); -        client.TestSync.WaitForAndIncrement(2); - +        client.TestSync.WaitForAndIncrement(2);  +           client.Session->Shutdown(); -        ok = client.Session->SendMessageOneWay(message); +        ok = client.Session->SendMessageOneWay(message);           Y_VERIFY(ok == MESSAGE_SHUTDOWN, "must be shutdown when sending during shutdown, got %s", ToString(ok).data()); - -        // check reset is possible here -        message->Reset(); -        client.TestSync.CheckAndIncrement(3); +  +        // check reset is possible here  +        message->Reset();  +        client.TestSync.CheckAndIncrement(3);           delete message;      } @@ -767,7 +767,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {      Y_UNIT_TEST(ResetAfterSendOneWayErrorInReturn) {          TObjectCountCheck objectCountCheck; -        TestNoServerImpl(17, true); +        TestNoServerImpl(17, true);       }      struct TResetAfterSendOneWaySuccessClient: public TExampleClient { @@ -904,40 +904,40 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {                  Sync.WaitForAndIncrement(2);              }          } - +           void OnError(TAutoPtr<TBusMessage>, EMessageStatus) override { -            // We do not check for message errors in this test. -        } - +            // We do not check for message errors in this test.  +        }  +           void OnMessageSentOneWay(TAutoPtr<TBusMessage>) override { -        } +        }       };      struct TOnConnectionEventServer: public TExampleServer { -        TOnConnectionEventServer() +        TOnConnectionEventServer()               : TExampleServer("TOnConnectionEventServer")          {          } - +           ~TOnConnectionEventServer() override { -            Session->Shutdown(); -        } - +            Session->Shutdown();  +        }  +           void OnError(TAutoPtr<TBusMessage>, EMessageStatus) override { -            // We do not check for server message errors in this test. -        } -    }; - +            // We do not check for server message errors in this test.  +        }  +    };  +       Y_UNIT_TEST(OnClientConnectionEvent_Shutdown) {          TObjectCountCheck objectCountCheck; -        TOnConnectionEventServer server; +        TOnConnectionEventServer server;           TOnConnectionEventClient client;          TNetAddr addr("127.0.0.1", server.Session->GetActualListenPort()); -        client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &addr); +        client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &addr);           client.Sync.WaitForAndIncrement(1); @@ -949,12 +949,12 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {      Y_UNIT_TEST(OnClientConnectionEvent_Disconnect) {          TObjectCountCheck objectCountCheck; -        THolder<TOnConnectionEventServer> server(new TOnConnectionEventServer); +        THolder<TOnConnectionEventServer> server(new TOnConnectionEventServer);           TOnConnectionEventClient client;          TNetAddr addr("127.0.0.1", server->Session->GetActualListenPort()); -        client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &addr); +        client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &addr);           client.Sync.WaitForAndIncrement(1); @@ -1059,66 +1059,66 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {          client.WaitReplies();          server.WaitForOnMessageCount(test_msg_count); -    }; - +    };  +       Y_UNIT_TEST(TestConnectionAttempts) { -        TObjectCountCheck objectCountCheck; - -        TNetAddr noServerAddr("localhost", 17); -        TBusClientSessionConfig clientConfig; +        TObjectCountCheck objectCountCheck;  +  +        TNetAddr noServerAddr("localhost", 17);  +        TBusClientSessionConfig clientConfig;           clientConfig.RetryInterval = 100;          TestNoServerImplClient client(clientConfig); - -        int count = 0; -        for (; count < 10; ++count) { -            EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), -                                                                      &noServerAddr); - +  +        int count = 0;  +        for (; count < 10; ++count) {  +            EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount),  +                                                                      &noServerAddr);  +               Y_VERIFY(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data()); -            client.TestSync.WaitForAndIncrement(count * 2 + 1); - -            // First connection attempt is for connect call; second one is to get connect result. -            UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2); -        } -        Sleep(TDuration::MilliSeconds(clientConfig.RetryInterval)); -        for (; count < 10; ++count) { -            EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), -                                                                      &noServerAddr); - +            client.TestSync.WaitForAndIncrement(count * 2 + 1);  +  +            // First connection attempt is for connect call; second one is to get connect result.  +            UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2);  +        }  +        Sleep(TDuration::MilliSeconds(clientConfig.RetryInterval));  +        for (; count < 10; ++count) {  +            EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount),  +                                                                      &noServerAddr);  +               Y_VERIFY(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data()); -            client.TestSync.WaitForAndIncrement(count * 2 + 1); - -            // First connection attempt is for connect call; second one is to get connect result. -            UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 4); -        } -    }; - +            client.TestSync.WaitForAndIncrement(count * 2 + 1);  +  +            // First connection attempt is for connect call; second one is to get connect result.  +            UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 4);  +        }  +    };  +       Y_UNIT_TEST(TestConnectionAttemptsOnNoMessagesAndNotReconnectWhenIdle) { -        TObjectCountCheck objectCountCheck; - -        TNetAddr noServerAddr("localhost", 17); -        TBusClientSessionConfig clientConfig; +        TObjectCountCheck objectCountCheck;  +  +        TNetAddr noServerAddr("localhost", 17);  +        TBusClientSessionConfig clientConfig;           clientConfig.RetryInterval = 100;          clientConfig.ReconnectWhenIdle = false;          TestNoServerImplClient client(clientConfig); - -        int count = 0; -        for (; count < 10; ++count) { -            EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), -                                                                      &noServerAddr); - +  +        int count = 0;  +        for (; count < 10; ++count) {  +            EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount),  +                                                                      &noServerAddr);  +               Y_VERIFY(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data()); -            client.TestSync.WaitForAndIncrement(count * 2 + 1); - -            // First connection attempt is for connect call; second one is to get connect result. -            UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2); -        } - +            client.TestSync.WaitForAndIncrement(count * 2 + 1);  +  +            // First connection attempt is for connect call; second one is to get connect result.  +            UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2);  +        }  +           Sleep(TDuration::MilliSeconds(clientConfig.RetryInterval / 2)); -        UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2); +        UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2);           Sleep(TDuration::MilliSeconds(10 * clientConfig.RetryInterval)); -        UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2); -    }; +        UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2);  +    };       Y_UNIT_TEST(TestConnectionAttemptsOnNoMessagesAndReconnectWhenIdle) {          TObjectCountCheck objectCountCheck; | 
