diff options
author | nohttp <nohttp@yandex-team.ru> | 2022-02-10 16:50:31 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:31 +0300 |
commit | dbdd851418e92f8821ee1b1041352d66e3bde170 (patch) | |
tree | c51433a8aa6b29260a3fbf22c9e9272b2c792c83 /library/cpp | |
parent | 8de5e9fef85b2ab655e3bc1d77ee2674f417cd15 (diff) | |
download | ydb-dbdd851418e92f8821ee1b1041352d66e3bde170.tar.gz |
Restoring authorship annotation for <nohttp@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp')
-rw-r--r-- | library/cpp/messagebus/remote_client_connection.cpp | 14 | ||||
-rw-r--r-- | library/cpp/messagebus/remote_connection.cpp | 6 | ||||
-rw-r--r-- | library/cpp/messagebus/session.cpp | 2 | ||||
-rw-r--r-- | library/cpp/messagebus/session_impl.cpp | 28 | ||||
-rw-r--r-- | library/cpp/messagebus/test/helper/example.cpp | 16 | ||||
-rw-r--r-- | library/cpp/messagebus/test/ut/messagebus_ut.cpp | 252 |
6 files changed, 159 insertions, 159 deletions
diff --git a/library/cpp/messagebus/remote_client_connection.cpp b/library/cpp/messagebus/remote_client_connection.cpp index 8c7a6db3a8..977f53af7d 100644 --- a/library/cpp/messagebus/remote_client_connection.cpp +++ b/library/cpp/messagebus/remote_client_connection.cpp @@ -67,7 +67,7 @@ void TRemoteClientConnection::TryConnect() { if (!WriterData.Channel) { if ((now - LastConnectAttempt) < TDuration::MilliSeconds(Config.RetryInterval)) { - DropEnqueuedData(MESSAGE_CONNECT_FAILED, MESSAGE_CONNECT_FAILED); + DropEnqueuedData(MESSAGE_CONNECT_FAILED, MESSAGE_CONNECT_FAILED); return; } LastConnectAttempt = now; @@ -77,11 +77,11 @@ void TRemoteClientConnection::TryConnect() { } if (BeforeSendQueue.IsEmpty() && WriterData.SendQueue.Empty() && !Config.ReconnectWhenIdle) { - // TryConnect is called from Writer::Act, which is called in cycle - // from session's ScheduleTimeoutMessages via Cron. This prevent these excessive connects. - return; - } - + // TryConnect is called from Writer::Act, which is called in cycle + // from session's ScheduleTimeoutMessages via Cron. This prevent these excessive connects. + return; + } + ++WriterData.Status.ConnectSyscalls; int ret = connect(WriterData.Channel->GetSocket(), PeerAddr.Addr(), PeerAddr.Len()); @@ -113,7 +113,7 @@ void TRemoteClientConnection::TryConnect() { WriterData.Status.Connected = false; WriterData.Status.ConnectError = err; - DropEnqueuedData(MESSAGE_CONNECT_FAILED, MESSAGE_CONNECT_FAILED); + DropEnqueuedData(MESSAGE_CONNECT_FAILED, MESSAGE_CONNECT_FAILED); } } } diff --git a/library/cpp/messagebus/remote_connection.cpp b/library/cpp/messagebus/remote_connection.cpp index 22932569db..915947b193 100644 --- a/library/cpp/messagebus/remote_connection.cpp +++ b/library/cpp/messagebus/remote_connection.cpp @@ -572,9 +572,9 @@ namespace NBus { BeforeTryWrite(); WriterFillInFlight(); - + WriterGetReconnectQueue()->DequeueAllLikelyEmpty(); - + if (!WriterData.Status.Connected) { TryConnect(); } else { @@ -584,7 +584,7 @@ namespace NBus { GetWriterActor()->AddTaskFromActorLoop(); break; } - + if (WriterData.State == WRITER_FILLING) { WriterFillBuffer(); diff --git a/library/cpp/messagebus/session.cpp b/library/cpp/messagebus/session.cpp index 46a7ece6a8..afeceec54d 100644 --- a/library/cpp/messagebus/session.cpp +++ b/library/cpp/messagebus/session.cpp @@ -59,7 +59,7 @@ namespace NBus { // '[]' and '[<address>' are errors. return false; } - + *hostName = host.substr(1, pos - 1); pos++; diff --git a/library/cpp/messagebus/session_impl.cpp b/library/cpp/messagebus/session_impl.cpp index ddf9f360c4..d9163d3f3a 100644 --- a/library/cpp/messagebus/session_impl.cpp +++ b/library/cpp/messagebus/session_impl.cpp @@ -211,22 +211,22 @@ void TBusSessionImpl::GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef } } -size_t TBusSessionImpl::GetConnectSyscallsNumForTestImpl(const TNetAddr& addr) const { - TRemoteConnectionPtr conn = const_cast<TBusSessionImpl*>(this)->GetConnection(addr, false); - if (!!conn) { - return conn->GetConnectSyscallsNumForTest(); - } else { - return 0; - } -} - +size_t TBusSessionImpl::GetConnectSyscallsNumForTestImpl(const TNetAddr& addr) const { + TRemoteConnectionPtr conn = const_cast<TBusSessionImpl*>(this)->GetConnection(addr, false); + if (!!conn) { + return conn->GetConnectSyscallsNumForTest(); + } else { + return 0; + } +} + void TBusSessionImpl::GetConnectSyscallsNumBulkForTest(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const { Y_VERIFY(addrs.size() == results.size(), "input.size != output.size"); - for (size_t i = 0; i < addrs.size(); ++i) { - results[i] = GetConnectSyscallsNumForTestImpl(addrs[i]); - } -} - + for (size_t i = 0; i < addrs.size(); ++i) { + results[i] = GetConnectSyscallsNumForTestImpl(addrs[i]); + } +} + void TBusSessionImpl::FillStatus() { } diff --git a/library/cpp/messagebus/test/helper/example.cpp b/library/cpp/messagebus/test/helper/example.cpp index 7c6d704042..5ee0ec3958 100644 --- a/library/cpp/messagebus/test/helper/example.cpp +++ b/library/cpp/messagebus/test/helper/example.cpp @@ -166,24 +166,24 @@ void TExampleClient::WaitReplies() { ResetCounters(); } -EMessageStatus TExampleClient::WaitForError() { +EMessageStatus TExampleClient::WaitForError() { WorkDone.WaitT(TDuration::Seconds(60)); UNIT_ASSERT_VALUES_EQUAL(1, MessageCount); UNIT_ASSERT_VALUES_EQUAL(0, AtomicGet(RepliesCount)); UNIT_ASSERT_VALUES_EQUAL(0, Session->GetInFlight()); UNIT_ASSERT_VALUES_EQUAL(1, Errors); - EMessageStatus result = LastError; + EMessageStatus result = LastError; ResetCounters(); - return result; -} - -void TExampleClient::WaitForError(EMessageStatus status) { - EMessageStatus error = WaitForError(); - UNIT_ASSERT_VALUES_EQUAL(status, error); + return result; } +void TExampleClient::WaitForError(EMessageStatus status) { + EMessageStatus error = WaitForError(); + UNIT_ASSERT_VALUES_EQUAL(status, error); +} + void TExampleClient::SendMessagesWaitReplies(size_t count, const TNetAddr* addr) { SendMessages(count, addr); WaitReplies(); diff --git a/library/cpp/messagebus/test/ut/messagebus_ut.cpp b/library/cpp/messagebus/test/ut/messagebus_ut.cpp index 040f9b7702..20754561a2 100644 --- a/library/cpp/messagebus/test/ut/messagebus_ut.cpp +++ b/library/cpp/messagebus/test/ut/messagebus_ut.cpp @@ -98,7 +98,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { THolder<TExampleServer> server; TBusClientSessionConfig clientConfig; - clientConfig.RetryInterval = 0; + clientConfig.RetryInterval = 0; TExampleClient client(clientConfig); server.Reset(new TExampleServer(port, "TExampleServer 1")); @@ -106,25 +106,25 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { client.SendMessagesWaitReplies(17, serverAddr); server.Destroy(); - - // Making the client to detect disconnection. - client.SendMessages(1, serverAddr); - EMessageStatus error = client.WaitForError(); - if (error == MESSAGE_DELIVERY_FAILED) { - client.SendMessages(1, serverAddr); - error = client.WaitForError(); - } - UNIT_ASSERT_VALUES_EQUAL(MESSAGE_CONNECT_FAILED, error); - + + // Making the client to detect disconnection. + client.SendMessages(1, serverAddr); + EMessageStatus error = client.WaitForError(); + if (error == MESSAGE_DELIVERY_FAILED) { + client.SendMessages(1, serverAddr); + error = client.WaitForError(); + } + UNIT_ASSERT_VALUES_EQUAL(MESSAGE_CONNECT_FAILED, error); + server.Reset(new TExampleServer(port, "TExampleServer 2")); client.SendMessagesWaitReplies(19, serverAddr); } struct TestNoServerImplClient: public TExampleClient { - TTestSync TestSync; - int failures = 0; - + TTestSync TestSync; + int failures = 0; + template <typename... Args> TestNoServerImplClient(Args&&... args) : TExampleClient(std::forward<Args>(args)...) @@ -132,48 +132,48 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { } ~TestNoServerImplClient() override { - Session->Shutdown(); - } - + Session->Shutdown(); + } + void OnError(TAutoPtr<TBusMessage> message, EMessageStatus status) override { Y_UNUSED(message); - + Y_VERIFY(status == MESSAGE_CONNECT_FAILED, "must be MESSAGE_CONNECT_FAILED, got %s", ToString(status).data()); - - TestSync.CheckAndIncrement((failures++) * 2); - } - }; - - void TestNoServerImpl(unsigned port, bool oneWay) { + + TestSync.CheckAndIncrement((failures++) * 2); + } + }; + + void TestNoServerImpl(unsigned port, bool oneWay) { TNetAddr noServerAddr("localhost", port); - TestNoServerImplClient client; - - int count = 0; - for (; count < 200; ++count) { - EMessageStatus status; - if (oneWay) { - status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &noServerAddr); - } else { + TestNoServerImplClient client; + + int count = 0; + for (; count < 200; ++count) { + EMessageStatus status; + if (oneWay) { + status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &noServerAddr); + } else { TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount)); - status = client.Session->SendMessageAutoPtr(message, &noServerAddr); - } - + status = client.Session->SendMessageAutoPtr(message, &noServerAddr); + } + Y_VERIFY(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data()); - - if (count == 0) { - // lame way to wait until it is connected - Sleep(TDuration::MilliSeconds(10)); - } - client.TestSync.WaitForAndIncrement(count * 2 + 1); - } - + + if (count == 0) { + // lame way to wait until it is connected + Sleep(TDuration::MilliSeconds(10)); + } + client.TestSync.WaitForAndIncrement(count * 2 + 1); + } + client.TestSync.WaitForAndIncrement(count * 2); - } - - void HangingServerImpl(unsigned port) { - TNetAddr noServerAddr("localhost", port); - + } + + void HangingServerImpl(unsigned port) { + TNetAddr noServerAddr("localhost", port); + TExampleClient client; int count = 0; @@ -199,13 +199,13 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { THangingServer server(0); - HangingServerImpl(server.GetPort()); + HangingServerImpl(server.GetPort()); } Y_UNIT_TEST(TestNoServer) { TObjectCountCheck objectCountCheck; - TestNoServerImpl(17, false); + TestNoServerImpl(17, false); } Y_UNIT_TEST(PauseInput) { @@ -746,20 +746,20 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { TResetAfterSendMessageOneWayDuringShutdown client; - TExampleRequest* message = new TExampleRequest(&client.Proto.RequestCount); - EMessageStatus ok = client.Session->SendMessageOneWay(message, &noServerAddr); + TExampleRequest* message = new TExampleRequest(&client.Proto.RequestCount); + EMessageStatus ok = client.Session->SendMessageOneWay(message, &noServerAddr); UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok); - client.TestSync.WaitForAndIncrement(2); - + client.TestSync.WaitForAndIncrement(2); + client.Session->Shutdown(); - ok = client.Session->SendMessageOneWay(message); + ok = client.Session->SendMessageOneWay(message); Y_VERIFY(ok == MESSAGE_SHUTDOWN, "must be shutdown when sending during shutdown, got %s", ToString(ok).data()); - - // check reset is possible here - message->Reset(); - client.TestSync.CheckAndIncrement(3); + + // check reset is possible here + message->Reset(); + client.TestSync.CheckAndIncrement(3); delete message; } @@ -767,7 +767,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { Y_UNIT_TEST(ResetAfterSendOneWayErrorInReturn) { TObjectCountCheck objectCountCheck; - TestNoServerImpl(17, true); + TestNoServerImpl(17, true); } struct TResetAfterSendOneWaySuccessClient: public TExampleClient { @@ -904,40 +904,40 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { Sync.WaitForAndIncrement(2); } } - + void OnError(TAutoPtr<TBusMessage>, EMessageStatus) override { - // We do not check for message errors in this test. - } - + // We do not check for message errors in this test. + } + void OnMessageSentOneWay(TAutoPtr<TBusMessage>) override { - } + } }; struct TOnConnectionEventServer: public TExampleServer { - TOnConnectionEventServer() + TOnConnectionEventServer() : TExampleServer("TOnConnectionEventServer") { } - + ~TOnConnectionEventServer() override { - Session->Shutdown(); - } - + Session->Shutdown(); + } + void OnError(TAutoPtr<TBusMessage>, EMessageStatus) override { - // We do not check for server message errors in this test. - } - }; - + // We do not check for server message errors in this test. + } + }; + Y_UNIT_TEST(OnClientConnectionEvent_Shutdown) { TObjectCountCheck objectCountCheck; - TOnConnectionEventServer server; + TOnConnectionEventServer server; TOnConnectionEventClient client; TNetAddr addr("127.0.0.1", server.Session->GetActualListenPort()); - client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &addr); + client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &addr); client.Sync.WaitForAndIncrement(1); @@ -949,12 +949,12 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { Y_UNIT_TEST(OnClientConnectionEvent_Disconnect) { TObjectCountCheck objectCountCheck; - THolder<TOnConnectionEventServer> server(new TOnConnectionEventServer); + THolder<TOnConnectionEventServer> server(new TOnConnectionEventServer); TOnConnectionEventClient client; TNetAddr addr("127.0.0.1", server->Session->GetActualListenPort()); - client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &addr); + client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &addr); client.Sync.WaitForAndIncrement(1); @@ -1059,66 +1059,66 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { client.WaitReplies(); server.WaitForOnMessageCount(test_msg_count); - }; - + }; + Y_UNIT_TEST(TestConnectionAttempts) { - TObjectCountCheck objectCountCheck; - - TNetAddr noServerAddr("localhost", 17); - TBusClientSessionConfig clientConfig; + TObjectCountCheck objectCountCheck; + + TNetAddr noServerAddr("localhost", 17); + TBusClientSessionConfig clientConfig; clientConfig.RetryInterval = 100; TestNoServerImplClient client(clientConfig); - - int count = 0; - for (; count < 10; ++count) { - EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), - &noServerAddr); - + + int count = 0; + for (; count < 10; ++count) { + EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), + &noServerAddr); + Y_VERIFY(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data()); - client.TestSync.WaitForAndIncrement(count * 2 + 1); - - // First connection attempt is for connect call; second one is to get connect result. - UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2); - } - Sleep(TDuration::MilliSeconds(clientConfig.RetryInterval)); - for (; count < 10; ++count) { - EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), - &noServerAddr); - + client.TestSync.WaitForAndIncrement(count * 2 + 1); + + // First connection attempt is for connect call; second one is to get connect result. + UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2); + } + Sleep(TDuration::MilliSeconds(clientConfig.RetryInterval)); + for (; count < 10; ++count) { + EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), + &noServerAddr); + Y_VERIFY(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data()); - client.TestSync.WaitForAndIncrement(count * 2 + 1); - - // First connection attempt is for connect call; second one is to get connect result. - UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 4); - } - }; - + client.TestSync.WaitForAndIncrement(count * 2 + 1); + + // First connection attempt is for connect call; second one is to get connect result. + UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 4); + } + }; + Y_UNIT_TEST(TestConnectionAttemptsOnNoMessagesAndNotReconnectWhenIdle) { - TObjectCountCheck objectCountCheck; - - TNetAddr noServerAddr("localhost", 17); - TBusClientSessionConfig clientConfig; + TObjectCountCheck objectCountCheck; + + TNetAddr noServerAddr("localhost", 17); + TBusClientSessionConfig clientConfig; clientConfig.RetryInterval = 100; clientConfig.ReconnectWhenIdle = false; TestNoServerImplClient client(clientConfig); - - int count = 0; - for (; count < 10; ++count) { - EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), - &noServerAddr); - + + int count = 0; + for (; count < 10; ++count) { + EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), + &noServerAddr); + Y_VERIFY(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data()); - client.TestSync.WaitForAndIncrement(count * 2 + 1); - - // First connection attempt is for connect call; second one is to get connect result. - UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2); - } - + client.TestSync.WaitForAndIncrement(count * 2 + 1); + + // First connection attempt is for connect call; second one is to get connect result. + UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2); + } + Sleep(TDuration::MilliSeconds(clientConfig.RetryInterval / 2)); - UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2); + UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2); Sleep(TDuration::MilliSeconds(10 * clientConfig.RetryInterval)); - UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2); - }; + UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2); + }; Y_UNIT_TEST(TestConnectionAttemptsOnNoMessagesAndReconnectWhenIdle) { TObjectCountCheck objectCountCheck; |