diff options
author | tesseract <tesseract@yandex-team.com> | 2023-10-03 15:02:41 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-10-03 16:10:09 +0300 |
commit | b8c57b027c3584fb00c34fa01a42b1bfb0282521 (patch) | |
tree | 5b9c2e64bf0fdab4bf131308b42f98295ba81221 | |
parent | 5478b8f55cc7055a4861c4030e0c401b5c72714c (diff) | |
download | ydb-b8c57b027c3584fb00c34fa01a42b1bfb0282521.tar.gz |
Fixed the order of messages
fix message order
accurately close read session
3 files changed, 43 insertions, 26 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h index 29440c8d363..17d231c626b 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h @@ -726,6 +726,10 @@ public: TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TCompressedMessage>& compressedMessages, TUserRetrievedEventsInfoAccumulator<UseMigrationProtocol>& accumulator); + TMutex& GetLock() { + return Lock; + } + private: const TKey Key; ui64 AssignId; @@ -737,6 +741,8 @@ private: TDisjointIntervalTree<ui64> Commits; TDisjointIntervalTree<ui64> ClientCommits; + + TMutex Lock; }; template <bool UseMigrationProtocol> diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp index 5256feb1fee..1388158baa1 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp @@ -180,6 +180,7 @@ void TRawPartitionStreamEventQueue<UseMigrationProtocol>::SignalReadyEvents(TInt compressedMessages, accumulator, NotReady); + TDataReceivedEvent<UseMigrationProtocol> data(std::move(messages), std::move(compressedMessages), stream); @@ -2096,9 +2097,11 @@ void TReadSessionEventsQueue<UseMigrationProtocol>::SignalReadyEvents( TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream) { Y_ASSERT(partitionStream); - TDeferredActions<UseMigrationProtocol> deferred; - with_lock (TParent::Mutex) { - SignalReadyEventsImpl(partitionStream, deferred); + with_lock (partitionStream->GetLock()) { + TDeferredActions<UseMigrationProtocol> deferred; + with_lock (TParent::Mutex) { + SignalReadyEventsImpl(partitionStream, deferred); + } } } @@ -2174,7 +2177,6 @@ void TReadSessionEventsQueue<UseMigrationProtocol>::GetDataEventCallbackSettings template<bool UseMigrationProtocol> void TReadSessionEventsQueue<UseMigrationProtocol>::ClearAllEvents() { - TDeferredActions<UseMigrationProtocol> deferred; with_lock (TParent::Mutex) { while (!TParent::Events.empty()) { auto& event = TParent::Events.front(); diff --git a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp index 63996a29c7b..5258dfd286b 100644 --- a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp @@ -220,6 +220,7 @@ namespace NKikimr::NPersQueueTests { processEvent(event); } } + Sleep(TDuration::MilliSeconds(10)); } // Ts and firstOffset and expectingQuantities will be set in first iteration of reading by received messages. @@ -236,7 +237,7 @@ namespace NKikimr::NPersQueueTests { TInstant curTs = ts[i]; size_t expectingQuantity = expectingQuantities[i]; - Cerr << ">>>>> Iteration: " << i << " Start reading from " << curTs << ". ExpectingQuantity" << expectingQuantity << Endl << Flush; + Cerr << ">>>>> Iteration: " << i << " Start reading from " << curTs << ". ExpectingQuantity " << expectingQuantity << Endl << Flush; // Accumulate received messages // Key is unique message body @@ -250,9 +251,14 @@ namespace NKikimr::NPersQueueTests { .StartingMessageTimestamp(curTs) .ReadOnlyOriginal(true); + ui32 lastOffset = 0; + settings.EventHandlers_.SimpleDataHandlers([&](NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent& event) mutable { + Cerr << ">>>>> Iteration: " << i << " TDataReceivedEvent: " << event.DebugString(false) + << " size=" << event.GetMessages().size() << Endl << Flush; for (const auto& msg : event.GetMessages()) { - Cerr << "Iteration: " << curTs << " Got message: " << msg.DebugString(false) << Endl << Flush; + Cerr << ">>>>> Iteration: " << i << " Got message: " << msg.GetData().substr(0, 16) + << " :: " << msg.DebugString(false) << Endl << Flush; auto count = ++map[msg.GetData()]; UNIT_ASSERT_C(count == 1, "Each message must be received once"); @@ -271,11 +277,20 @@ namespace NKikimr::NPersQueueTests { expectingQuantities.push_back(prevQuantity); expectingQuantities.push_back(prevQuantity - 1); - Cerr << "Iteration: " << i << " GOT MESSAGE TIMESTAMP " << ts.back() << Endl << Flush; - } else if (map.size() == 1) { - auto expectedOffset = firstOffset[i]; - UNIT_ASSERT_EQUAL_C(msg.GetOffset(), expectedOffset, - "Iteration: " << i << " Expected first message offset " << expectedOffset << " but got " << msg.GetOffset()); + Cerr << ">>>>> Iteration: " << i << " GOT MESSAGE TIMESTAMP " << msg.GetWriteTime() << Endl << Flush; + } else { + if (map.size() == 1) { + auto expectedOffset = firstOffset[i]; + UNIT_ASSERT_EQUAL_C(msg.GetOffset(), expectedOffset, "Iteration: " << i + << " Expected first message offset " << expectedOffset + << " but got " << msg.GetOffset()); + } else { + UNIT_ASSERT_C(lastOffset < msg.GetOffset(), "Iteration: " << i + << " unexpected offset order. Last offset " << lastOffset + << " Message offset " << msg.GetOffset()); + } + + lastOffset = msg.GetOffset(); } } }, false); @@ -285,10 +300,12 @@ namespace NKikimr::NPersQueueTests { Cerr << ">>>>> Iteration: " << i << " Wait receiving all messages" << Endl << Flush; Sleep(TDuration::MilliSeconds(10)); - while (map.size() < expectingQuantity) Sleep(TDuration::MilliSeconds(10)); + for (size_t k = 0; k < 1000 && map.size() < expectingQuantity; ++k) { // Wait 10 seconds + Sleep(TDuration::MilliSeconds(10)); + } Cerr << ">>>>> Iteration: " << i << " Closing session. Got " << map.size() << " messages" << Endl << Flush; - reader->Close(TDuration::Seconds(0)); + while(!reader->Close(TDuration::Seconds(1))) {}; Cerr << ">>>>> Iteration: " << i << " Session closed" << Endl << Flush; if (i == 0) { @@ -299,25 +316,17 @@ namespace NKikimr::NPersQueueTests { << ". Expected message quantity: " << expectingQuantities[j] << Endl; } } + UNIT_ASSERT_EQUAL_C(map.size(), expectingQuantity, "Wrong message number. Received: " << map.size() << ". Excpected: " << expectingQuantity); } } Y_UNIT_TEST(TestReadAtTimestamp) { - auto generate1 = [](ui32 messageId) { - Y_UNUSED(messageId); - TString message = "Hello___" + CreateGuidAsString() + TString(1_MB, 'a'); - return message; - }; - - TestReadAtTimestampImpl(10, generate1); - - auto generate2 = [](ui32 messageId) { - Y_UNUSED(messageId); - TString message = "Hello___" + CreateGuidAsString() + TString(10_MB, 'b'); - return message; + auto generate = [](ui32 messageId) { + return TStringBuilder() << "Hello___" << messageId << "___" << CreateGuidAsString() << TString(1_MB, 'a'); }; - TestReadAtTimestampImpl(3, generate2); + TestReadAtTimestampImpl(10, generate); + TestReadAtTimestampImpl(3, generate); } Y_UNIT_TEST(TestWriteStat1stClass) { |