aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornohttp <nohttp@yandex-team.ru>2022-02-10 16:50:31 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:50:31 +0300
commitdbdd851418e92f8821ee1b1041352d66e3bde170 (patch)
treec51433a8aa6b29260a3fbf22c9e9272b2c792c83
parent8de5e9fef85b2ab655e3bc1d77ee2674f417cd15 (diff)
downloadydb-dbdd851418e92f8821ee1b1041352d66e3bde170.tar.gz
Restoring authorship annotation for <nohttp@yandex-team.ru>. Commit 1 of 2.
-rw-r--r--library/cpp/messagebus/remote_client_connection.cpp14
-rw-r--r--library/cpp/messagebus/remote_connection.cpp6
-rw-r--r--library/cpp/messagebus/session.cpp2
-rw-r--r--library/cpp/messagebus/session_impl.cpp28
-rw-r--r--library/cpp/messagebus/test/helper/example.cpp16
-rw-r--r--library/cpp/messagebus/test/ut/messagebus_ut.cpp252
6 files changed, 159 insertions, 159 deletions
diff --git a/library/cpp/messagebus/remote_client_connection.cpp b/library/cpp/messagebus/remote_client_connection.cpp
index 8c7a6db3a8..977f53af7d 100644
--- a/library/cpp/messagebus/remote_client_connection.cpp
+++ b/library/cpp/messagebus/remote_client_connection.cpp
@@ -67,7 +67,7 @@ void TRemoteClientConnection::TryConnect() {
if (!WriterData.Channel) {
if ((now - LastConnectAttempt) < TDuration::MilliSeconds(Config.RetryInterval)) {
- DropEnqueuedData(MESSAGE_CONNECT_FAILED, MESSAGE_CONNECT_FAILED);
+ DropEnqueuedData(MESSAGE_CONNECT_FAILED, MESSAGE_CONNECT_FAILED);
return;
}
LastConnectAttempt = now;
@@ -77,11 +77,11 @@ void TRemoteClientConnection::TryConnect() {
}
if (BeforeSendQueue.IsEmpty() && WriterData.SendQueue.Empty() && !Config.ReconnectWhenIdle) {
- // TryConnect is called from Writer::Act, which is called in cycle
- // from session's ScheduleTimeoutMessages via Cron. This prevent these excessive connects.
- return;
- }
-
+ // TryConnect is called from Writer::Act, which is called in cycle
+ // from session's ScheduleTimeoutMessages via Cron. This prevent these excessive connects.
+ return;
+ }
+
++WriterData.Status.ConnectSyscalls;
int ret = connect(WriterData.Channel->GetSocket(), PeerAddr.Addr(), PeerAddr.Len());
@@ -113,7 +113,7 @@ void TRemoteClientConnection::TryConnect() {
WriterData.Status.Connected = false;
WriterData.Status.ConnectError = err;
- DropEnqueuedData(MESSAGE_CONNECT_FAILED, MESSAGE_CONNECT_FAILED);
+ DropEnqueuedData(MESSAGE_CONNECT_FAILED, MESSAGE_CONNECT_FAILED);
}
}
}
diff --git a/library/cpp/messagebus/remote_connection.cpp b/library/cpp/messagebus/remote_connection.cpp
index 22932569db..915947b193 100644
--- a/library/cpp/messagebus/remote_connection.cpp
+++ b/library/cpp/messagebus/remote_connection.cpp
@@ -572,9 +572,9 @@ namespace NBus {
BeforeTryWrite();
WriterFillInFlight();
-
+
WriterGetReconnectQueue()->DequeueAllLikelyEmpty();
-
+
if (!WriterData.Status.Connected) {
TryConnect();
} else {
@@ -584,7 +584,7 @@ namespace NBus {
GetWriterActor()->AddTaskFromActorLoop();
break;
}
-
+
if (WriterData.State == WRITER_FILLING) {
WriterFillBuffer();
diff --git a/library/cpp/messagebus/session.cpp b/library/cpp/messagebus/session.cpp
index 46a7ece6a8..afeceec54d 100644
--- a/library/cpp/messagebus/session.cpp
+++ b/library/cpp/messagebus/session.cpp
@@ -59,7 +59,7 @@ namespace NBus {
// '[]' and '[<address>' are errors.
return false;
}
-
+
*hostName = host.substr(1, pos - 1);
pos++;
diff --git a/library/cpp/messagebus/session_impl.cpp b/library/cpp/messagebus/session_impl.cpp
index ddf9f360c4..d9163d3f3a 100644
--- a/library/cpp/messagebus/session_impl.cpp
+++ b/library/cpp/messagebus/session_impl.cpp
@@ -211,22 +211,22 @@ void TBusSessionImpl::GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef
}
}
-size_t TBusSessionImpl::GetConnectSyscallsNumForTestImpl(const TNetAddr& addr) const {
- TRemoteConnectionPtr conn = const_cast<TBusSessionImpl*>(this)->GetConnection(addr, false);
- if (!!conn) {
- return conn->GetConnectSyscallsNumForTest();
- } else {
- return 0;
- }
-}
-
+size_t TBusSessionImpl::GetConnectSyscallsNumForTestImpl(const TNetAddr& addr) const {
+ TRemoteConnectionPtr conn = const_cast<TBusSessionImpl*>(this)->GetConnection(addr, false);
+ if (!!conn) {
+ return conn->GetConnectSyscallsNumForTest();
+ } else {
+ return 0;
+ }
+}
+
void TBusSessionImpl::GetConnectSyscallsNumBulkForTest(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const {
Y_VERIFY(addrs.size() == results.size(), "input.size != output.size");
- for (size_t i = 0; i < addrs.size(); ++i) {
- results[i] = GetConnectSyscallsNumForTestImpl(addrs[i]);
- }
-}
-
+ for (size_t i = 0; i < addrs.size(); ++i) {
+ results[i] = GetConnectSyscallsNumForTestImpl(addrs[i]);
+ }
+}
+
void TBusSessionImpl::FillStatus() {
}
diff --git a/library/cpp/messagebus/test/helper/example.cpp b/library/cpp/messagebus/test/helper/example.cpp
index 7c6d704042..5ee0ec3958 100644
--- a/library/cpp/messagebus/test/helper/example.cpp
+++ b/library/cpp/messagebus/test/helper/example.cpp
@@ -166,24 +166,24 @@ void TExampleClient::WaitReplies() {
ResetCounters();
}
-EMessageStatus TExampleClient::WaitForError() {
+EMessageStatus TExampleClient::WaitForError() {
WorkDone.WaitT(TDuration::Seconds(60));
UNIT_ASSERT_VALUES_EQUAL(1, MessageCount);
UNIT_ASSERT_VALUES_EQUAL(0, AtomicGet(RepliesCount));
UNIT_ASSERT_VALUES_EQUAL(0, Session->GetInFlight());
UNIT_ASSERT_VALUES_EQUAL(1, Errors);
- EMessageStatus result = LastError;
+ EMessageStatus result = LastError;
ResetCounters();
- return result;
-}
-
-void TExampleClient::WaitForError(EMessageStatus status) {
- EMessageStatus error = WaitForError();
- UNIT_ASSERT_VALUES_EQUAL(status, error);
+ return result;
}
+void TExampleClient::WaitForError(EMessageStatus status) {
+ EMessageStatus error = WaitForError();
+ UNIT_ASSERT_VALUES_EQUAL(status, error);
+}
+
void TExampleClient::SendMessagesWaitReplies(size_t count, const TNetAddr* addr) {
SendMessages(count, addr);
WaitReplies();
diff --git a/library/cpp/messagebus/test/ut/messagebus_ut.cpp b/library/cpp/messagebus/test/ut/messagebus_ut.cpp
index 040f9b7702..20754561a2 100644
--- a/library/cpp/messagebus/test/ut/messagebus_ut.cpp
+++ b/library/cpp/messagebus/test/ut/messagebus_ut.cpp
@@ -98,7 +98,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
THolder<TExampleServer> server;
TBusClientSessionConfig clientConfig;
- clientConfig.RetryInterval = 0;
+ clientConfig.RetryInterval = 0;
TExampleClient client(clientConfig);
server.Reset(new TExampleServer(port, "TExampleServer 1"));
@@ -106,25 +106,25 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
client.SendMessagesWaitReplies(17, serverAddr);
server.Destroy();
-
- // Making the client to detect disconnection.
- client.SendMessages(1, serverAddr);
- EMessageStatus error = client.WaitForError();
- if (error == MESSAGE_DELIVERY_FAILED) {
- client.SendMessages(1, serverAddr);
- error = client.WaitForError();
- }
- UNIT_ASSERT_VALUES_EQUAL(MESSAGE_CONNECT_FAILED, error);
-
+
+ // Making the client to detect disconnection.
+ client.SendMessages(1, serverAddr);
+ EMessageStatus error = client.WaitForError();
+ if (error == MESSAGE_DELIVERY_FAILED) {
+ client.SendMessages(1, serverAddr);
+ error = client.WaitForError();
+ }
+ UNIT_ASSERT_VALUES_EQUAL(MESSAGE_CONNECT_FAILED, error);
+
server.Reset(new TExampleServer(port, "TExampleServer 2"));
client.SendMessagesWaitReplies(19, serverAddr);
}
struct TestNoServerImplClient: public TExampleClient {
- TTestSync TestSync;
- int failures = 0;
-
+ TTestSync TestSync;
+ int failures = 0;
+
template <typename... Args>
TestNoServerImplClient(Args&&... args)
: TExampleClient(std::forward<Args>(args)...)
@@ -132,48 +132,48 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
}
~TestNoServerImplClient() override {
- Session->Shutdown();
- }
-
+ Session->Shutdown();
+ }
+
void OnError(TAutoPtr<TBusMessage> message, EMessageStatus status) override {
Y_UNUSED(message);
-
+
Y_VERIFY(status == MESSAGE_CONNECT_FAILED, "must be MESSAGE_CONNECT_FAILED, got %s", ToString(status).data());
-
- TestSync.CheckAndIncrement((failures++) * 2);
- }
- };
-
- void TestNoServerImpl(unsigned port, bool oneWay) {
+
+ TestSync.CheckAndIncrement((failures++) * 2);
+ }
+ };
+
+ void TestNoServerImpl(unsigned port, bool oneWay) {
TNetAddr noServerAddr("localhost", port);
- TestNoServerImplClient client;
-
- int count = 0;
- for (; count < 200; ++count) {
- EMessageStatus status;
- if (oneWay) {
- status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &noServerAddr);
- } else {
+ TestNoServerImplClient client;
+
+ int count = 0;
+ for (; count < 200; ++count) {
+ EMessageStatus status;
+ if (oneWay) {
+ status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &noServerAddr);
+ } else {
TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount));
- status = client.Session->SendMessageAutoPtr(message, &noServerAddr);
- }
-
+ status = client.Session->SendMessageAutoPtr(message, &noServerAddr);
+ }
+
Y_VERIFY(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data());
-
- if (count == 0) {
- // lame way to wait until it is connected
- Sleep(TDuration::MilliSeconds(10));
- }
- client.TestSync.WaitForAndIncrement(count * 2 + 1);
- }
-
+
+ if (count == 0) {
+ // lame way to wait until it is connected
+ Sleep(TDuration::MilliSeconds(10));
+ }
+ client.TestSync.WaitForAndIncrement(count * 2 + 1);
+ }
+
client.TestSync.WaitForAndIncrement(count * 2);
- }
-
- void HangingServerImpl(unsigned port) {
- TNetAddr noServerAddr("localhost", port);
-
+ }
+
+ void HangingServerImpl(unsigned port) {
+ TNetAddr noServerAddr("localhost", port);
+
TExampleClient client;
int count = 0;
@@ -199,13 +199,13 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
THangingServer server(0);
- HangingServerImpl(server.GetPort());
+ HangingServerImpl(server.GetPort());
}
Y_UNIT_TEST(TestNoServer) {
TObjectCountCheck objectCountCheck;
- TestNoServerImpl(17, false);
+ TestNoServerImpl(17, false);
}
Y_UNIT_TEST(PauseInput) {
@@ -746,20 +746,20 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
TResetAfterSendMessageOneWayDuringShutdown client;
- TExampleRequest* message = new TExampleRequest(&client.Proto.RequestCount);
- EMessageStatus ok = client.Session->SendMessageOneWay(message, &noServerAddr);
+ TExampleRequest* message = new TExampleRequest(&client.Proto.RequestCount);
+ EMessageStatus ok = client.Session->SendMessageOneWay(message, &noServerAddr);
UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok);
- client.TestSync.WaitForAndIncrement(2);
-
+ client.TestSync.WaitForAndIncrement(2);
+
client.Session->Shutdown();
- ok = client.Session->SendMessageOneWay(message);
+ ok = client.Session->SendMessageOneWay(message);
Y_VERIFY(ok == MESSAGE_SHUTDOWN, "must be shutdown when sending during shutdown, got %s", ToString(ok).data());
-
- // check reset is possible here
- message->Reset();
- client.TestSync.CheckAndIncrement(3);
+
+ // check reset is possible here
+ message->Reset();
+ client.TestSync.CheckAndIncrement(3);
delete message;
}
@@ -767,7 +767,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
Y_UNIT_TEST(ResetAfterSendOneWayErrorInReturn) {
TObjectCountCheck objectCountCheck;
- TestNoServerImpl(17, true);
+ TestNoServerImpl(17, true);
}
struct TResetAfterSendOneWaySuccessClient: public TExampleClient {
@@ -904,40 +904,40 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
Sync.WaitForAndIncrement(2);
}
}
-
+
void OnError(TAutoPtr<TBusMessage>, EMessageStatus) override {
- // We do not check for message errors in this test.
- }
-
+ // We do not check for message errors in this test.
+ }
+
void OnMessageSentOneWay(TAutoPtr<TBusMessage>) override {
- }
+ }
};
struct TOnConnectionEventServer: public TExampleServer {
- TOnConnectionEventServer()
+ TOnConnectionEventServer()
: TExampleServer("TOnConnectionEventServer")
{
}
-
+
~TOnConnectionEventServer() override {
- Session->Shutdown();
- }
-
+ Session->Shutdown();
+ }
+
void OnError(TAutoPtr<TBusMessage>, EMessageStatus) override {
- // We do not check for server message errors in this test.
- }
- };
-
+ // We do not check for server message errors in this test.
+ }
+ };
+
Y_UNIT_TEST(OnClientConnectionEvent_Shutdown) {
TObjectCountCheck objectCountCheck;
- TOnConnectionEventServer server;
+ TOnConnectionEventServer server;
TOnConnectionEventClient client;
TNetAddr addr("127.0.0.1", server.Session->GetActualListenPort());
- client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &addr);
+ client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &addr);
client.Sync.WaitForAndIncrement(1);
@@ -949,12 +949,12 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
Y_UNIT_TEST(OnClientConnectionEvent_Disconnect) {
TObjectCountCheck objectCountCheck;
- THolder<TOnConnectionEventServer> server(new TOnConnectionEventServer);
+ THolder<TOnConnectionEventServer> server(new TOnConnectionEventServer);
TOnConnectionEventClient client;
TNetAddr addr("127.0.0.1", server->Session->GetActualListenPort());
- client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &addr);
+ client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &addr);
client.Sync.WaitForAndIncrement(1);
@@ -1059,66 +1059,66 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
client.WaitReplies();
server.WaitForOnMessageCount(test_msg_count);
- };
-
+ };
+
Y_UNIT_TEST(TestConnectionAttempts) {
- TObjectCountCheck objectCountCheck;
-
- TNetAddr noServerAddr("localhost", 17);
- TBusClientSessionConfig clientConfig;
+ TObjectCountCheck objectCountCheck;
+
+ TNetAddr noServerAddr("localhost", 17);
+ TBusClientSessionConfig clientConfig;
clientConfig.RetryInterval = 100;
TestNoServerImplClient client(clientConfig);
-
- int count = 0;
- for (; count < 10; ++count) {
- EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount),
- &noServerAddr);
-
+
+ int count = 0;
+ for (; count < 10; ++count) {
+ EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount),
+ &noServerAddr);
+
Y_VERIFY(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data());
- client.TestSync.WaitForAndIncrement(count * 2 + 1);
-
- // First connection attempt is for connect call; second one is to get connect result.
- UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2);
- }
- Sleep(TDuration::MilliSeconds(clientConfig.RetryInterval));
- for (; count < 10; ++count) {
- EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount),
- &noServerAddr);
-
+ client.TestSync.WaitForAndIncrement(count * 2 + 1);
+
+ // First connection attempt is for connect call; second one is to get connect result.
+ UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2);
+ }
+ Sleep(TDuration::MilliSeconds(clientConfig.RetryInterval));
+ for (; count < 10; ++count) {
+ EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount),
+ &noServerAddr);
+
Y_VERIFY(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data());
- client.TestSync.WaitForAndIncrement(count * 2 + 1);
-
- // First connection attempt is for connect call; second one is to get connect result.
- UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 4);
- }
- };
-
+ client.TestSync.WaitForAndIncrement(count * 2 + 1);
+
+ // First connection attempt is for connect call; second one is to get connect result.
+ UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 4);
+ }
+ };
+
Y_UNIT_TEST(TestConnectionAttemptsOnNoMessagesAndNotReconnectWhenIdle) {
- TObjectCountCheck objectCountCheck;
-
- TNetAddr noServerAddr("localhost", 17);
- TBusClientSessionConfig clientConfig;
+ TObjectCountCheck objectCountCheck;
+
+ TNetAddr noServerAddr("localhost", 17);
+ TBusClientSessionConfig clientConfig;
clientConfig.RetryInterval = 100;
clientConfig.ReconnectWhenIdle = false;
TestNoServerImplClient client(clientConfig);
-
- int count = 0;
- for (; count < 10; ++count) {
- EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount),
- &noServerAddr);
-
+
+ int count = 0;
+ for (; count < 10; ++count) {
+ EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount),
+ &noServerAddr);
+
Y_VERIFY(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data());
- client.TestSync.WaitForAndIncrement(count * 2 + 1);
-
- // First connection attempt is for connect call; second one is to get connect result.
- UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2);
- }
-
+ client.TestSync.WaitForAndIncrement(count * 2 + 1);
+
+ // First connection attempt is for connect call; second one is to get connect result.
+ UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2);
+ }
+
Sleep(TDuration::MilliSeconds(clientConfig.RetryInterval / 2));
- UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2);
+ UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2);
Sleep(TDuration::MilliSeconds(10 * clientConfig.RetryInterval));
- UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2);
- };
+ UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2);
+ };
Y_UNIT_TEST(TestConnectionAttemptsOnNoMessagesAndReconnectWhenIdle) {
TObjectCountCheck objectCountCheck;