aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-10-03 15:02:41 +0300
committertesseract <tesseract@yandex-team.com>2023-10-03 16:10:09 +0300
commitb8c57b027c3584fb00c34fa01a42b1bfb0282521 (patch)
tree5b9c2e64bf0fdab4bf131308b42f98295ba81221
parent5478b8f55cc7055a4861c4030e0c401b5c72714c (diff)
downloadydb-b8c57b027c3584fb00c34fa01a42b1bfb0282521.tar.gz
Fixed the order of messages
fix message order accurately close read session
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h6
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp10
-rw-r--r--ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp53
3 files changed, 43 insertions, 26 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
index 29440c8d363..17d231c626b 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
@@ -726,6 +726,10 @@ public:
TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TCompressedMessage>& compressedMessages,
TUserRetrievedEventsInfoAccumulator<UseMigrationProtocol>& accumulator);
+ TMutex& GetLock() {
+ return Lock;
+ }
+
private:
const TKey Key;
ui64 AssignId;
@@ -737,6 +741,8 @@ private:
TDisjointIntervalTree<ui64> Commits;
TDisjointIntervalTree<ui64> ClientCommits;
+
+ TMutex Lock;
};
template <bool UseMigrationProtocol>
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
index 5256feb1fee..1388158baa1 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
@@ -180,6 +180,7 @@ void TRawPartitionStreamEventQueue<UseMigrationProtocol>::SignalReadyEvents(TInt
compressedMessages,
accumulator,
NotReady);
+
TDataReceivedEvent<UseMigrationProtocol> data(std::move(messages),
std::move(compressedMessages),
stream);
@@ -2096,9 +2097,11 @@ void TReadSessionEventsQueue<UseMigrationProtocol>::SignalReadyEvents(
TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream) {
Y_ASSERT(partitionStream);
- TDeferredActions<UseMigrationProtocol> deferred;
- with_lock (TParent::Mutex) {
- SignalReadyEventsImpl(partitionStream, deferred);
+ with_lock (partitionStream->GetLock()) {
+ TDeferredActions<UseMigrationProtocol> deferred;
+ with_lock (TParent::Mutex) {
+ SignalReadyEventsImpl(partitionStream, deferred);
+ }
}
}
@@ -2174,7 +2177,6 @@ void TReadSessionEventsQueue<UseMigrationProtocol>::GetDataEventCallbackSettings
template<bool UseMigrationProtocol>
void TReadSessionEventsQueue<UseMigrationProtocol>::ClearAllEvents() {
- TDeferredActions<UseMigrationProtocol> deferred;
with_lock (TParent::Mutex) {
while (!TParent::Events.empty()) {
auto& event = TParent::Events.front();
diff --git a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp
index 63996a29c7b..5258dfd286b 100644
--- a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp
@@ -220,6 +220,7 @@ namespace NKikimr::NPersQueueTests {
processEvent(event);
}
}
+ Sleep(TDuration::MilliSeconds(10));
}
// Ts and firstOffset and expectingQuantities will be set in first iteration of reading by received messages.
@@ -236,7 +237,7 @@ namespace NKikimr::NPersQueueTests {
TInstant curTs = ts[i];
size_t expectingQuantity = expectingQuantities[i];
- Cerr << ">>>>> Iteration: " << i << " Start reading from " << curTs << ". ExpectingQuantity" << expectingQuantity << Endl << Flush;
+ Cerr << ">>>>> Iteration: " << i << " Start reading from " << curTs << ". ExpectingQuantity " << expectingQuantity << Endl << Flush;
// Accumulate received messages
// Key is unique message body
@@ -250,9 +251,14 @@ namespace NKikimr::NPersQueueTests {
.StartingMessageTimestamp(curTs)
.ReadOnlyOriginal(true);
+ ui32 lastOffset = 0;
+
settings.EventHandlers_.SimpleDataHandlers([&](NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent& event) mutable {
+ Cerr << ">>>>> Iteration: " << i << " TDataReceivedEvent: " << event.DebugString(false)
+ << " size=" << event.GetMessages().size() << Endl << Flush;
for (const auto& msg : event.GetMessages()) {
- Cerr << "Iteration: " << curTs << " Got message: " << msg.DebugString(false) << Endl << Flush;
+ Cerr << ">>>>> Iteration: " << i << " Got message: " << msg.GetData().substr(0, 16)
+ << " :: " << msg.DebugString(false) << Endl << Flush;
auto count = ++map[msg.GetData()];
UNIT_ASSERT_C(count == 1, "Each message must be received once");
@@ -271,11 +277,20 @@ namespace NKikimr::NPersQueueTests {
expectingQuantities.push_back(prevQuantity);
expectingQuantities.push_back(prevQuantity - 1);
- Cerr << "Iteration: " << i << " GOT MESSAGE TIMESTAMP " << ts.back() << Endl << Flush;
- } else if (map.size() == 1) {
- auto expectedOffset = firstOffset[i];
- UNIT_ASSERT_EQUAL_C(msg.GetOffset(), expectedOffset,
- "Iteration: " << i << " Expected first message offset " << expectedOffset << " but got " << msg.GetOffset());
+ Cerr << ">>>>> Iteration: " << i << " GOT MESSAGE TIMESTAMP " << msg.GetWriteTime() << Endl << Flush;
+ } else {
+ if (map.size() == 1) {
+ auto expectedOffset = firstOffset[i];
+ UNIT_ASSERT_EQUAL_C(msg.GetOffset(), expectedOffset, "Iteration: " << i
+ << " Expected first message offset " << expectedOffset
+ << " but got " << msg.GetOffset());
+ } else {
+ UNIT_ASSERT_C(lastOffset < msg.GetOffset(), "Iteration: " << i
+ << " unexpected offset order. Last offset " << lastOffset
+ << " Message offset " << msg.GetOffset());
+ }
+
+ lastOffset = msg.GetOffset();
}
}
}, false);
@@ -285,10 +300,12 @@ namespace NKikimr::NPersQueueTests {
Cerr << ">>>>> Iteration: " << i << " Wait receiving all messages" << Endl << Flush;
Sleep(TDuration::MilliSeconds(10));
- while (map.size() < expectingQuantity) Sleep(TDuration::MilliSeconds(10));
+ for (size_t k = 0; k < 1000 && map.size() < expectingQuantity; ++k) { // Wait 10 seconds
+ Sleep(TDuration::MilliSeconds(10));
+ }
Cerr << ">>>>> Iteration: " << i << " Closing session. Got " << map.size() << " messages" << Endl << Flush;
- reader->Close(TDuration::Seconds(0));
+ while(!reader->Close(TDuration::Seconds(1))) {};
Cerr << ">>>>> Iteration: " << i << " Session closed" << Endl << Flush;
if (i == 0) {
@@ -299,25 +316,17 @@ namespace NKikimr::NPersQueueTests {
<< ". Expected message quantity: " << expectingQuantities[j] << Endl;
}
}
+ UNIT_ASSERT_EQUAL_C(map.size(), expectingQuantity, "Wrong message number. Received: " << map.size() << ". Excpected: " << expectingQuantity);
}
}
Y_UNIT_TEST(TestReadAtTimestamp) {
- auto generate1 = [](ui32 messageId) {
- Y_UNUSED(messageId);
- TString message = "Hello___" + CreateGuidAsString() + TString(1_MB, 'a');
- return message;
- };
-
- TestReadAtTimestampImpl(10, generate1);
-
- auto generate2 = [](ui32 messageId) {
- Y_UNUSED(messageId);
- TString message = "Hello___" + CreateGuidAsString() + TString(10_MB, 'b');
- return message;
+ auto generate = [](ui32 messageId) {
+ return TStringBuilder() << "Hello___" << messageId << "___" << CreateGuidAsString() << TString(1_MB, 'a');
};
- TestReadAtTimestampImpl(3, generate2);
+ TestReadAtTimestampImpl(10, generate);
+ TestReadAtTimestampImpl(3, generate);
}
Y_UNIT_TEST(TestWriteStat1stClass) {