aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-09-30 13:18:12 +0300
committertesseract <tesseract@yandex-team.com>2023-09-30 13:31:58 +0300
commit8fdc2f37e43f846ca5405525508074333bae94b1 (patch)
tree3e3e08909be11e4906685b9fd5e4fa27df1bebca
parent387ebef344bd4106313ba94d62266865c2a33aee (diff)
downloadydb-8fdc2f37e43f846ca5405525508074333bae94b1.tar.gz
Refactor TPersQueueNewSchemeCacheTest::TestReadAtTimestamp
intermediate #2 intermediate
-rw-r--r--ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp128
1 files changed, 76 insertions, 52 deletions
diff --git a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp
index 3ea60c3588b..63996a29c7b 100644
--- a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp
@@ -142,35 +142,43 @@ namespace NKikimr::NPersQueueTests {
TTestServer server(false);
server.ServerSettings.PQConfig.SetTopicsAreFirstClassCitizen(true);
server.StartServer();
+
server.EnableLogs({NKikimrServices::PQ_READ_PROXY, NKikimrServices::TX_PROXY_SCHEME_CACHE});
+
+ Cerr << ">>>>> Prepare scheme" << Endl;
PrepareForGrpcNoDC(*server.AnnoyingClient);
- NYdb::TDriverConfig driverCfg;
- driverCfg.SetEndpoint(TStringBuilder() << "localhost:" << server.GrpcPort).SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG)).SetDatabase("/Root");
+ Cerr << ">>>>> Create PersQueue client" << Endl;
+ NYdb::TDriverConfig driverCfg;
+ driverCfg.SetEndpoint(TStringBuilder() << "localhost:" << server.GrpcPort)
+ .SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG))
+ .SetDatabase("/Root");
auto ydbDriver = MakeHolder<NYdb::TDriver>(driverCfg);
auto persqueueClient = MakeHolder<NYdb::NPersQueue::TPersQueueClient>(*ydbDriver);
- TString topic = "account2/topic2";
- server.EnableLogs({ NKikimrServices::PQ_READ_PROXY});
-
- NYdb::NPersQueue::TWriteSessionSettings writeSessionSettings;
- writeSessionSettings.ClusterDiscoveryMode(NYdb::NPersQueue::EClusterDiscoveryMode::Off)
- .Path(topic)
- .MessageGroupId(topic)
- .Codec(NYdb::NPersQueue::ECodec::RAW);
+ // Topic was created in PrepareForGrpcNoDC
+ const TString topic = "account2/topic2";
+ const TString consumerName = "userx";
{
+ Cerr << ">>>>> Create consumer '" << consumerName << "'" << Endl;
auto res = persqueueClient->AddReadRule("/Root/" + topic,
- NYdb::NPersQueue::TAddReadRuleSettings().ReadRule(NYdb::NPersQueue::TReadRuleSettings().ConsumerName("userx")));
+ NYdb::NPersQueue::TAddReadRuleSettings()
+ .ReadRule(NYdb::NPersQueue::TReadRuleSettings()
+ .ConsumerName(consumerName)));
res.Wait();
UNIT_ASSERT(res.GetValue().IsSuccess());
}
- TVector<TInstant> ts;
- TVector<ui32> firstOffset;
-
+ Cerr << ">>>>> Create writeSession" << Endl;
+ auto writeSessionSettings = NYdb::NPersQueue::TWriteSessionSettings()
+ .ClusterDiscoveryMode(NYdb::NPersQueue::EClusterDiscoveryMode::Off)
+ .Path(topic)
+ .MessageGroupId(topic)
+ .Codec(NYdb::NPersQueue::ECodec::RAW);
auto writeSession = persqueueClient->CreateWriteSession(writeSessionSettings);
+
TMaybe<TContinuationToken> continuationToken = Nothing();
ui32 messagesAcked = 0;
auto processEvent = [&](TWriteSessionEvent::TEvent& event) {
@@ -194,11 +202,13 @@ namespace NKikimr::NPersQueueTests {
}, event);
};
+ Cerr << ">>>>> Receiving continuationToken" << Endl;
for (auto& event: writeSession->GetEvents(true)) {
processEvent(event);
}
UNIT_ASSERT(continuationToken.Defined());
+ Cerr << ">>>>> Write messages" << Endl;
for (ui32 i = 0; i < maxMessagesCount; ++i) {
TString message = generateMessage(i);
Cerr << "WRITTEN message " << i << "\n";
@@ -212,71 +222,86 @@ namespace NKikimr::NPersQueueTests {
}
}
- //TODO check skip inside big blob
- ui32 tsIt = 0;
- while (true) {
+ // Ts and firstOffset and expectingQuantities will be set in first iteration of reading by received messages.
+ // Each will contains shifts from the message: before, equals and after.
+ // It allow check reading from different shift. First iteration read from zero.
+ TVector<TInstant> ts { TInstant::Zero() };
+ TVector<ui32> firstOffset { 0 };
+ TVector<size_t> expectingQuantities { maxMessagesCount };
+
+ // Start test scenario
+
+ Cerr << ">>>>> Start reading" << Endl << Flush;
+ for (size_t i = 0; i < ts.size(); ++i) {
+ TInstant curTs = ts[i];
+ size_t expectingQuantity = expectingQuantities[i];
+
+ Cerr << ">>>>> Iteration: " << i << " Start reading from " << curTs << ". ExpectingQuantity" << expectingQuantity << Endl << Flush;
+
+ // Accumulate received messages
+ // Key is unique message body
+ // Value is quantity of received messages with it body
+ TMap<TString, size_t> map;
+
std::shared_ptr<NYdb::NPersQueue::IReadSession> reader;
- TInstant curTs = tsIt == 0 ? TInstant::Zero() : (ts[tsIt]);
auto settings = NYdb::NPersQueue::TReadSessionSettings()
.AppendTopics(topic)
- .ConsumerName("userx")
+ .ConsumerName(consumerName)
.StartingMessageTimestamp(curTs)
.ReadOnlyOriginal(true);
- TMap<TString, ui32> map;
- ui32 messagesReceived = 0;
settings.EventHandlers_.SimpleDataHandlers([&](NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent& event) mutable {
for (const auto& msg : event.GetMessages()) {
- Cerr << "TS: " << curTs << " Got message: " << msg.DebugString(false) << Endl;
- Cout.Flush();
- auto count = ++map[msg.GetData()];
- UNIT_ASSERT(count == 1);
- if (tsIt == 0) {
- if (ts.empty()) {
- ts.push_back(TInstant::Zero());
- firstOffset.push_back(0);
- }
+ Cerr << "Iteration: " << curTs << " Got message: " << msg.DebugString(false) << Endl << Flush;
+ auto count = ++map[msg.GetData()];
+ UNIT_ASSERT_C(count == 1, "Each message must be received once");
+ if (i == 0) {
+ // First iteration. Filling ts and firstOffset vectors from received messages
ts.push_back(msg.GetWriteTime() - TDuration::MilliSeconds(1));
ts.push_back(msg.GetWriteTime());
ts.push_back(msg.GetWriteTime() + TDuration::MilliSeconds(1));
+
firstOffset.push_back(msg.GetOffset());
firstOffset.push_back(msg.GetOffset());
firstOffset.push_back(msg.GetOffset() + 1);
- Cerr << "GOT MESSAGE TIMESTAMP " << ts.back() << "\n";
- } else {
- Cerr << "WAITING FIRST MESSAGE " << firstOffset[tsIt] << " got " << msg.GetOffset() << "\n";
+ size_t prevQuantity = expectingQuantities.back();
+ expectingQuantities.push_back(prevQuantity);
+ expectingQuantities.push_back(prevQuantity);
+ expectingQuantities.push_back(prevQuantity - 1);
- UNIT_ASSERT(messagesReceived > 0 || msg.GetOffset() == firstOffset[tsIt]);
+ Cerr << "Iteration: " << i << " GOT MESSAGE TIMESTAMP " << ts.back() << Endl << Flush;
+ } else if (map.size() == 1) {
+ auto expectedOffset = firstOffset[i];
+ UNIT_ASSERT_EQUAL_C(msg.GetOffset(), expectedOffset,
+ "Iteration: " << i << " Expected first message offset " << expectedOffset << " but got " << msg.GetOffset());
}
- messagesReceived = msg.GetOffset() + 1;
}
}, false);
reader = CreateReader(*ydbDriver, settings);
- Cout << "Created reader\n";
+ Cerr << ">>>>> Iteration: " << i << " Reader was created" << Endl << Flush;
- Cout.Flush();
- while (messagesReceived < maxMessagesCount) Sleep(TDuration::MilliSeconds(10));
- Cerr << "Closing session. Got " << messagesReceived << " messages" << Endl;
- reader->Close(TDuration::Seconds(0));
- Cerr << "Session closed" << Endl;
+ Cerr << ">>>>> Iteration: " << i << " Wait receiving all messages" << Endl << Flush;
+ Sleep(TDuration::MilliSeconds(10));
+ while (map.size() < expectingQuantity) Sleep(TDuration::MilliSeconds(10));
- if (tsIt == 0) {
- for (ui32 i = 0; i < ts.size(); ++i) {
- Cout << "TS " << ts[i] << " OFFSET " << firstOffset[i] << "\n";
+ Cerr << ">>>>> Iteration: " << i << " Closing session. Got " << map.size() << " messages" << Endl << Flush;
+ reader->Close(TDuration::Seconds(0));
+ Cerr << ">>>>> Iteration: " << i << " Session closed" << Endl << Flush;
+
+ if (i == 0) {
+ for (ui32 j = 1; j < ts.size(); ++j) {
+ Cerr << ">>>>> Planed iteration: " << j
+ << ". Start reading from time: " << ts[j]
+ << ". Expected first message offset: " << firstOffset[j]
+ << ". Expected message quantity: " << expectingQuantities[j] << Endl;
}
}
-
-
- tsIt++;
- if (tsIt == ts.size()) break;
- if (firstOffset[tsIt] >= messagesReceived) break;
}
}
-
Y_UNIT_TEST(TestReadAtTimestamp) {
auto generate1 = [](ui32 messageId) {
Y_UNUSED(messageId);
@@ -288,12 +313,11 @@ namespace NKikimr::NPersQueueTests {
auto generate2 = [](ui32 messageId) {
Y_UNUSED(messageId);
- TString message = "Hello___" + CreateGuidAsString() + TString(1_MB, 'a');
+ TString message = "Hello___" + CreateGuidAsString() + TString(10_MB, 'b');
return message;
};
TestReadAtTimestampImpl(3, generate2);
-
}
Y_UNIT_TEST(TestWriteStat1stClass) {