diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-03-17 15:00:21 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-03-17 15:00:21 +0300 |
commit | 789c30e0cc85b443de9f8f409c32fd0e7b3cd42b (patch) | |
tree | 0879910cf6c0c1b5d774947e41ee79cf967b13d9 | |
parent | 36a55ecf2533a755443ba6b35a54ccfeb78e7290 (diff) | |
download | ydb-789c30e0cc85b443de9f8f409c32fd0e7b3cd42b.tar.gz |
Additional ReadTopic test
-rw-r--r-- | ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp | 95 |
1 files changed, 82 insertions, 13 deletions
diff --git a/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp b/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp index d6f0cb5be6e..59be35e268d 100644 --- a/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp +++ b/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp @@ -49,12 +49,6 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) { UNIT_ASSERT(!resp->Get()->Record.GetToken().empty()); } - template <typename TEvResponse> - auto SendImpl(const TActorId& recipient, IEventBase* ev) { - Server.GetRuntime()->Send(new IEventHandleFat(recipient, Sender, ev)); - return Server.GetRuntime()->GrabEdgeEvent<TEvResponse>(Sender); - } - public: TEnv(bool init = true) : Settings(Tests::TServerSettings(PortManager.GetPort(), {}, MakePqConfig()) @@ -110,14 +104,28 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) { } } + void SendAsync(const TActorId& recipient, IEventBase* ev) { + Server.GetRuntime()->Send(new IEventHandleFat(recipient, Sender, ev)); + } + template <typename TEvResponse> - auto Send(IEventBase* ev) { - return SendImpl<TEvResponse>(YdbProxy, ev); + auto Wait(bool rethrow = false) { + if (rethrow) { + return Server.GetRuntime()->GrabEdgeEventRethrow<TEvResponse>(Sender); + } else { + return Server.GetRuntime()->GrabEdgeEvent<TEvResponse>(Sender); + } } template <typename TEvResponse> auto Send(const TActorId& recipient, IEventBase* ev) { - return SendImpl<TEvResponse>(recipient, ev); + SendAsync(recipient, ev); + return Wait<TEvResponse>(); + } + + template <typename TEvResponse> + auto Send(IEventBase* ev) { + return Send<TEvResponse>(YdbProxy, ev); } const NYdb::TDriver& GetDriver() const { @@ -734,16 +742,21 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) { return result; } + template <typename TEvent> + TEvent ReadTopicAsync(TEvYdbProxy::TEvReadTopicResponse::TPtr& ev) { + const auto* event = std::get_if<TEvent>(&ev->Get()->Result); + UNIT_ASSERT(event); + + return *event; + } + template <typename TEvent, typename Env> TEvent ReadTopic(Env& env, const TActorId& reader) { auto ev = env.template Send<TEvYdbProxy::TEvReadTopicResponse>(reader, new TEvYdbProxy::TEvReadTopicRequest()); UNIT_ASSERT(ev); - const auto* event = std::get_if<TEvent>(&ev->Get()->Result); - UNIT_ASSERT(event); - - return *event; + return ReadTopicAsync<TEvent>(ev); } Y_UNIT_TEST(ReadTopic) { @@ -777,6 +790,62 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) { auto ack = ReadTopic<TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(env, reader); UNIT_ASSERT_VALUES_EQUAL(ack.GetCommittedOffset(), 1); } + + // wait next event + env.SendAsync(reader, new TEvYdbProxy::TEvReadTopicRequest()); + + TActorId newReader = CreateTopicReader(env, "/Root/topic"); + // wait next event + env.SendAsync(newReader, new TEvYdbProxy::TEvReadTopicRequest()); + + bool stopped = false; + bool closed = false; + bool started = false; + while (!stopped || !closed || !started) { + // wait response from any reader + TEvYdbProxy::TEvReadTopicResponse::TPtr ev; + try { + ev = env.Wait<TEvYdbProxy::TEvReadTopicResponse>(true); + } catch (yexception&) { + // bad luck, previous session was not closed, close it manually + env.SendAsync(reader, new TEvents::TEvPoison()); + stopped = closed = true; + continue; + } + + if (ev->Sender == reader) { + if (!stopped) { + ReadTopicAsync<TReadSessionEvent::TStopPartitionSessionEvent>(ev).Confirm(); + env.SendAsync(reader, new TEvYdbProxy::TEvReadTopicRequest()); + stopped = true; + } else if (!closed) { + ReadTopicAsync<TReadSessionEvent::TPartitionSessionClosedEvent>(ev); + closed = true; + } else { + UNIT_ASSERT_C(false, "Unexpected event from previous reader"); + } + } else if (ev->Sender == newReader) { + if (!started) { + ReadTopicAsync<TReadSessionEvent::TStartPartitionSessionEvent>(ev).Confirm(); + started = true; + } else { + UNIT_ASSERT_C(false, "Unexpected event from new reader"); + } + } else { + UNIT_ASSERT_C(false, "Unknown reader"); + } + } + + UNIT_ASSERT(WriteTopic(env, "/Root/topic", "message-2")); + { + auto data = ReadTopic<TReadSessionEvent::TDataReceivedEvent>(env, newReader); + UNIT_ASSERT_VALUES_EQUAL(data.GetMessages().size(), 1); + UNIT_ASSERT_VALUES_EQUAL(data.GetMessages().at(0).GetData(), "message-2"); + data.Commit(); + + auto ack = ReadTopic<TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(env, newReader); + UNIT_ASSERT_VALUES_EQUAL(ack.GetCommittedOffset(), 2); + } } } // YdbProxyTests |