aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorazevaykin <azevaykin@yandex-team.com>2023-07-31 11:37:37 +0300
committerazevaykin <azevaykin@yandex-team.com>2023-07-31 11:37:37 +0300
commit2a9b86aea68825318c083a9327c6d3df42543d5f (patch)
tree1bd5ab96a6eb5c7ed979769ac0241266f6db3639
parent8f3aab1e29cfbbe4943cda9a42c5a9e835bc3aa3 (diff)
downloadydb-2a9b86aea68825318c083a9327c6d3df42543d5f.tar.gz
Wait for TCommitOffsetAcknowledgementEvent
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp34
1 files changed, 26 insertions, 8 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp
index a89ed6f171..0e172f64aa 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp
@@ -290,17 +290,35 @@ namespace NYdb::NTopic::NTests {
.AppendTopics(setup.GetTestTopicPath());
auto readSession = client.CreateReadSession(readSettings);
- auto event = readSession->GetEvent(true);
- UNIT_ASSERT(event.Defined());
+ // Event 1: start partition session
+ {
+ TMaybe<TReadSessionEvent::TEvent> event = readSession->GetEvent(true);
+ UNIT_ASSERT(event);
+ auto startPartitionSession = std::get_if<TReadSessionEvent::TStartPartitionSessionEvent>(event.Get());
+ UNIT_ASSERT_C(startPartitionSession, DebugString(*event));
+
+ startPartitionSession->Confirm();
+ }
+
+ // Event 2: data received
+ {
+ TMaybe<TReadSessionEvent::TEvent> event = readSession->GetEvent(true);
+ UNIT_ASSERT(event);
+ auto dataReceived = std::get_if<TReadSessionEvent::TDataReceivedEvent>(event.Get());
+ UNIT_ASSERT_C(dataReceived, DebugString(*event));
- auto& startPartitionSession = std::get<TReadSessionEvent::TStartPartitionSessionEvent>(*event);
- startPartitionSession.Confirm();
+ dataReceived->Commit();
+ }
- event = readSession->GetEvent(true);
- UNIT_ASSERT(event.Defined());
+ // Event 3: commit acknowledgement
+ {
+ TMaybe<TReadSessionEvent::TEvent> event = readSession->GetEvent(true);
+ UNIT_ASSERT(event);
+ auto commitOffsetAck = std::get_if<TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(event.Get());
+ UNIT_ASSERT_C(commitOffsetAck, DebugString(*event));
- auto& dataReceived = std::get<TReadSessionEvent::TDataReceivedEvent>(*event);
- dataReceived.Commit();
+ UNIT_ASSERT_VALUES_EQUAL(commitOffsetAck->GetCommittedOffset(), 1);
+ }
}
// Get non-empty description