diff options
author | tesseract <tesseract@yandex-team.com> | 2023-09-30 13:18:12 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-09-30 13:31:58 +0300 |
commit | 8fdc2f37e43f846ca5405525508074333bae94b1 (patch) | |
tree | 3e3e08909be11e4906685b9fd5e4fa27df1bebca | |
parent | 387ebef344bd4106313ba94d62266865c2a33aee (diff) | |
download | ydb-8fdc2f37e43f846ca5405525508074333bae94b1.tar.gz |
Refactor TPersQueueNewSchemeCacheTest::TestReadAtTimestamp
intermediate #2
intermediate
-rw-r--r-- | ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp | 128 |
1 files changed, 76 insertions, 52 deletions
diff --git a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp index 3ea60c3588b..63996a29c7b 100644 --- a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp @@ -142,35 +142,43 @@ namespace NKikimr::NPersQueueTests { TTestServer server(false); server.ServerSettings.PQConfig.SetTopicsAreFirstClassCitizen(true); server.StartServer(); + server.EnableLogs({NKikimrServices::PQ_READ_PROXY, NKikimrServices::TX_PROXY_SCHEME_CACHE}); + + Cerr << ">>>>> Prepare scheme" << Endl; PrepareForGrpcNoDC(*server.AnnoyingClient); - NYdb::TDriverConfig driverCfg; - driverCfg.SetEndpoint(TStringBuilder() << "localhost:" << server.GrpcPort).SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG)).SetDatabase("/Root"); + Cerr << ">>>>> Create PersQueue client" << Endl; + NYdb::TDriverConfig driverCfg; + driverCfg.SetEndpoint(TStringBuilder() << "localhost:" << server.GrpcPort) + .SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG)) + .SetDatabase("/Root"); auto ydbDriver = MakeHolder<NYdb::TDriver>(driverCfg); auto persqueueClient = MakeHolder<NYdb::NPersQueue::TPersQueueClient>(*ydbDriver); - TString topic = "account2/topic2"; - server.EnableLogs({ NKikimrServices::PQ_READ_PROXY}); - - NYdb::NPersQueue::TWriteSessionSettings writeSessionSettings; - writeSessionSettings.ClusterDiscoveryMode(NYdb::NPersQueue::EClusterDiscoveryMode::Off) - .Path(topic) - .MessageGroupId(topic) - .Codec(NYdb::NPersQueue::ECodec::RAW); + // Topic was created in PrepareForGrpcNoDC + const TString topic = "account2/topic2"; + const TString consumerName = "userx"; { + Cerr << ">>>>> Create consumer '" << consumerName << "'" << Endl; auto res = persqueueClient->AddReadRule("/Root/" + topic, - NYdb::NPersQueue::TAddReadRuleSettings().ReadRule(NYdb::NPersQueue::TReadRuleSettings().ConsumerName("userx"))); + NYdb::NPersQueue::TAddReadRuleSettings() + .ReadRule(NYdb::NPersQueue::TReadRuleSettings() + .ConsumerName(consumerName))); res.Wait(); UNIT_ASSERT(res.GetValue().IsSuccess()); } - TVector<TInstant> ts; - TVector<ui32> firstOffset; - + Cerr << ">>>>> Create writeSession" << Endl; + auto writeSessionSettings = NYdb::NPersQueue::TWriteSessionSettings() + .ClusterDiscoveryMode(NYdb::NPersQueue::EClusterDiscoveryMode::Off) + .Path(topic) + .MessageGroupId(topic) + .Codec(NYdb::NPersQueue::ECodec::RAW); auto writeSession = persqueueClient->CreateWriteSession(writeSessionSettings); + TMaybe<TContinuationToken> continuationToken = Nothing(); ui32 messagesAcked = 0; auto processEvent = [&](TWriteSessionEvent::TEvent& event) { @@ -194,11 +202,13 @@ namespace NKikimr::NPersQueueTests { }, event); }; + Cerr << ">>>>> Receiving continuationToken" << Endl; for (auto& event: writeSession->GetEvents(true)) { processEvent(event); } UNIT_ASSERT(continuationToken.Defined()); + Cerr << ">>>>> Write messages" << Endl; for (ui32 i = 0; i < maxMessagesCount; ++i) { TString message = generateMessage(i); Cerr << "WRITTEN message " << i << "\n"; @@ -212,71 +222,86 @@ namespace NKikimr::NPersQueueTests { } } - //TODO check skip inside big blob - ui32 tsIt = 0; - while (true) { + // Ts and firstOffset and expectingQuantities will be set in first iteration of reading by received messages. + // Each will contains shifts from the message: before, equals and after. + // It allow check reading from different shift. First iteration read from zero. + TVector<TInstant> ts { TInstant::Zero() }; + TVector<ui32> firstOffset { 0 }; + TVector<size_t> expectingQuantities { maxMessagesCount }; + + // Start test scenario + + Cerr << ">>>>> Start reading" << Endl << Flush; + for (size_t i = 0; i < ts.size(); ++i) { + TInstant curTs = ts[i]; + size_t expectingQuantity = expectingQuantities[i]; + + Cerr << ">>>>> Iteration: " << i << " Start reading from " << curTs << ". ExpectingQuantity" << expectingQuantity << Endl << Flush; + + // Accumulate received messages + // Key is unique message body + // Value is quantity of received messages with it body + TMap<TString, size_t> map; + std::shared_ptr<NYdb::NPersQueue::IReadSession> reader; - TInstant curTs = tsIt == 0 ? TInstant::Zero() : (ts[tsIt]); auto settings = NYdb::NPersQueue::TReadSessionSettings() .AppendTopics(topic) - .ConsumerName("userx") + .ConsumerName(consumerName) .StartingMessageTimestamp(curTs) .ReadOnlyOriginal(true); - TMap<TString, ui32> map; - ui32 messagesReceived = 0; settings.EventHandlers_.SimpleDataHandlers([&](NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent& event) mutable { for (const auto& msg : event.GetMessages()) { - Cerr << "TS: " << curTs << " Got message: " << msg.DebugString(false) << Endl; - Cout.Flush(); - auto count = ++map[msg.GetData()]; - UNIT_ASSERT(count == 1); - if (tsIt == 0) { - if (ts.empty()) { - ts.push_back(TInstant::Zero()); - firstOffset.push_back(0); - } + Cerr << "Iteration: " << curTs << " Got message: " << msg.DebugString(false) << Endl << Flush; + auto count = ++map[msg.GetData()]; + UNIT_ASSERT_C(count == 1, "Each message must be received once"); + if (i == 0) { + // First iteration. Filling ts and firstOffset vectors from received messages ts.push_back(msg.GetWriteTime() - TDuration::MilliSeconds(1)); ts.push_back(msg.GetWriteTime()); ts.push_back(msg.GetWriteTime() + TDuration::MilliSeconds(1)); + firstOffset.push_back(msg.GetOffset()); firstOffset.push_back(msg.GetOffset()); firstOffset.push_back(msg.GetOffset() + 1); - Cerr << "GOT MESSAGE TIMESTAMP " << ts.back() << "\n"; - } else { - Cerr << "WAITING FIRST MESSAGE " << firstOffset[tsIt] << " got " << msg.GetOffset() << "\n"; + size_t prevQuantity = expectingQuantities.back(); + expectingQuantities.push_back(prevQuantity); + expectingQuantities.push_back(prevQuantity); + expectingQuantities.push_back(prevQuantity - 1); - UNIT_ASSERT(messagesReceived > 0 || msg.GetOffset() == firstOffset[tsIt]); + Cerr << "Iteration: " << i << " GOT MESSAGE TIMESTAMP " << ts.back() << Endl << Flush; + } else if (map.size() == 1) { + auto expectedOffset = firstOffset[i]; + UNIT_ASSERT_EQUAL_C(msg.GetOffset(), expectedOffset, + "Iteration: " << i << " Expected first message offset " << expectedOffset << " but got " << msg.GetOffset()); } - messagesReceived = msg.GetOffset() + 1; } }, false); reader = CreateReader(*ydbDriver, settings); - Cout << "Created reader\n"; + Cerr << ">>>>> Iteration: " << i << " Reader was created" << Endl << Flush; - Cout.Flush(); - while (messagesReceived < maxMessagesCount) Sleep(TDuration::MilliSeconds(10)); - Cerr << "Closing session. Got " << messagesReceived << " messages" << Endl; - reader->Close(TDuration::Seconds(0)); - Cerr << "Session closed" << Endl; + Cerr << ">>>>> Iteration: " << i << " Wait receiving all messages" << Endl << Flush; + Sleep(TDuration::MilliSeconds(10)); + while (map.size() < expectingQuantity) Sleep(TDuration::MilliSeconds(10)); - if (tsIt == 0) { - for (ui32 i = 0; i < ts.size(); ++i) { - Cout << "TS " << ts[i] << " OFFSET " << firstOffset[i] << "\n"; + Cerr << ">>>>> Iteration: " << i << " Closing session. Got " << map.size() << " messages" << Endl << Flush; + reader->Close(TDuration::Seconds(0)); + Cerr << ">>>>> Iteration: " << i << " Session closed" << Endl << Flush; + + if (i == 0) { + for (ui32 j = 1; j < ts.size(); ++j) { + Cerr << ">>>>> Planed iteration: " << j + << ". Start reading from time: " << ts[j] + << ". Expected first message offset: " << firstOffset[j] + << ". Expected message quantity: " << expectingQuantities[j] << Endl; } } - - - tsIt++; - if (tsIt == ts.size()) break; - if (firstOffset[tsIt] >= messagesReceived) break; } } - Y_UNIT_TEST(TestReadAtTimestamp) { auto generate1 = [](ui32 messageId) { Y_UNUSED(messageId); @@ -288,12 +313,11 @@ namespace NKikimr::NPersQueueTests { auto generate2 = [](ui32 messageId) { Y_UNUSED(messageId); - TString message = "Hello___" + CreateGuidAsString() + TString(1_MB, 'a'); + TString message = "Hello___" + CreateGuidAsString() + TString(10_MB, 'b'); return message; }; TestReadAtTimestampImpl(3, generate2); - } Y_UNIT_TEST(TestWriteStat1stClass) { |