aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-03-17 15:00:21 +0300
committerilnaz <ilnaz@ydb.tech>2023-03-17 15:00:21 +0300
commit789c30e0cc85b443de9f8f409c32fd0e7b3cd42b (patch)
tree0879910cf6c0c1b5d774947e41ee79cf967b13d9
parent36a55ecf2533a755443ba6b35a54ccfeb78e7290 (diff)
downloadydb-789c30e0cc85b443de9f8f409c32fd0e7b3cd42b.tar.gz
Additional ReadTopic test
-rw-r--r--ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp95
1 files changed, 82 insertions, 13 deletions
diff --git a/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp b/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp
index d6f0cb5be6e..59be35e268d 100644
--- a/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp
+++ b/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp
@@ -49,12 +49,6 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) {
UNIT_ASSERT(!resp->Get()->Record.GetToken().empty());
}
- template <typename TEvResponse>
- auto SendImpl(const TActorId& recipient, IEventBase* ev) {
- Server.GetRuntime()->Send(new IEventHandleFat(recipient, Sender, ev));
- return Server.GetRuntime()->GrabEdgeEvent<TEvResponse>(Sender);
- }
-
public:
TEnv(bool init = true)
: Settings(Tests::TServerSettings(PortManager.GetPort(), {}, MakePqConfig())
@@ -110,14 +104,28 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) {
}
}
+ void SendAsync(const TActorId& recipient, IEventBase* ev) {
+ Server.GetRuntime()->Send(new IEventHandleFat(recipient, Sender, ev));
+ }
+
template <typename TEvResponse>
- auto Send(IEventBase* ev) {
- return SendImpl<TEvResponse>(YdbProxy, ev);
+ auto Wait(bool rethrow = false) {
+ if (rethrow) {
+ return Server.GetRuntime()->GrabEdgeEventRethrow<TEvResponse>(Sender);
+ } else {
+ return Server.GetRuntime()->GrabEdgeEvent<TEvResponse>(Sender);
+ }
}
template <typename TEvResponse>
auto Send(const TActorId& recipient, IEventBase* ev) {
- return SendImpl<TEvResponse>(recipient, ev);
+ SendAsync(recipient, ev);
+ return Wait<TEvResponse>();
+ }
+
+ template <typename TEvResponse>
+ auto Send(IEventBase* ev) {
+ return Send<TEvResponse>(YdbProxy, ev);
}
const NYdb::TDriver& GetDriver() const {
@@ -734,16 +742,21 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) {
return result;
}
+ template <typename TEvent>
+ TEvent ReadTopicAsync(TEvYdbProxy::TEvReadTopicResponse::TPtr& ev) {
+ const auto* event = std::get_if<TEvent>(&ev->Get()->Result);
+ UNIT_ASSERT(event);
+
+ return *event;
+ }
+
template <typename TEvent, typename Env>
TEvent ReadTopic(Env& env, const TActorId& reader) {
auto ev = env.template Send<TEvYdbProxy::TEvReadTopicResponse>(reader,
new TEvYdbProxy::TEvReadTopicRequest());
UNIT_ASSERT(ev);
- const auto* event = std::get_if<TEvent>(&ev->Get()->Result);
- UNIT_ASSERT(event);
-
- return *event;
+ return ReadTopicAsync<TEvent>(ev);
}
Y_UNIT_TEST(ReadTopic) {
@@ -777,6 +790,62 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) {
auto ack = ReadTopic<TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(env, reader);
UNIT_ASSERT_VALUES_EQUAL(ack.GetCommittedOffset(), 1);
}
+
+ // wait next event
+ env.SendAsync(reader, new TEvYdbProxy::TEvReadTopicRequest());
+
+ TActorId newReader = CreateTopicReader(env, "/Root/topic");
+ // wait next event
+ env.SendAsync(newReader, new TEvYdbProxy::TEvReadTopicRequest());
+
+ bool stopped = false;
+ bool closed = false;
+ bool started = false;
+ while (!stopped || !closed || !started) {
+ // wait response from any reader
+ TEvYdbProxy::TEvReadTopicResponse::TPtr ev;
+ try {
+ ev = env.Wait<TEvYdbProxy::TEvReadTopicResponse>(true);
+ } catch (yexception&) {
+ // bad luck, previous session was not closed, close it manually
+ env.SendAsync(reader, new TEvents::TEvPoison());
+ stopped = closed = true;
+ continue;
+ }
+
+ if (ev->Sender == reader) {
+ if (!stopped) {
+ ReadTopicAsync<TReadSessionEvent::TStopPartitionSessionEvent>(ev).Confirm();
+ env.SendAsync(reader, new TEvYdbProxy::TEvReadTopicRequest());
+ stopped = true;
+ } else if (!closed) {
+ ReadTopicAsync<TReadSessionEvent::TPartitionSessionClosedEvent>(ev);
+ closed = true;
+ } else {
+ UNIT_ASSERT_C(false, "Unexpected event from previous reader");
+ }
+ } else if (ev->Sender == newReader) {
+ if (!started) {
+ ReadTopicAsync<TReadSessionEvent::TStartPartitionSessionEvent>(ev).Confirm();
+ started = true;
+ } else {
+ UNIT_ASSERT_C(false, "Unexpected event from new reader");
+ }
+ } else {
+ UNIT_ASSERT_C(false, "Unknown reader");
+ }
+ }
+
+ UNIT_ASSERT(WriteTopic(env, "/Root/topic", "message-2"));
+ {
+ auto data = ReadTopic<TReadSessionEvent::TDataReceivedEvent>(env, newReader);
+ UNIT_ASSERT_VALUES_EQUAL(data.GetMessages().size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(data.GetMessages().at(0).GetData(), "message-2");
+ data.Commit();
+
+ auto ack = ReadTopic<TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(env, newReader);
+ UNIT_ASSERT_VALUES_EQUAL(ack.GetCommittedOffset(), 2);
+ }
}
} // YdbProxyTests