summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikolay Shestakov <[email protected]>2025-10-25 12:58:54 +0500
committerGitHub <[email protected]>2025-10-25 12:58:54 +0500
commit18ae5ce1c6f04ccb4964b0f9041bbcb00537c7b1 (patch)
treed7a41479c4570876b5430c3ba3da4ce387f62941
parentab5870365b8c9a298dbe9b0dc9a71035a25f3898 (diff)
fixed MaxCommittedTimeLag (#27553)
-rw-r--r--ydb/core/persqueue/pqtablet/partition/partition.cpp9
-rw-r--r--ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp27
2 files changed, 18 insertions, 18 deletions
diff --git a/ydb/core/persqueue/pqtablet/partition/partition.cpp b/ydb/core/persqueue/pqtablet/partition/partition.cpp
index a80703c1af8..d5f86d18c18 100644
--- a/ydb/core/persqueue/pqtablet/partition/partition.cpp
+++ b/ydb/core/persqueue/pqtablet/partition/partition.cpp
@@ -817,9 +817,6 @@ TConsumerSnapshot TPartition::CreateSnapshot(TUserInfo& userInfo) const {
if (userInfo.Offset >= static_cast<i64>(GetEndOffset())) {
result.LastCommittedMessage.CreateTimestamp = now;
result.LastCommittedMessage.WriteTimestamp = now;
- } else if (userInfo.ActualTimestamps) {
- result.LastCommittedMessage.CreateTimestamp = userInfo.CreateTimestamp;
- result.LastCommittedMessage.WriteTimestamp = userInfo.WriteTimestamp;
} else {
auto timestamp = GetWriteTimeEstimate(userInfo.Offset);
result.LastCommittedMessage.CreateTimestamp = timestamp;
@@ -841,14 +838,14 @@ TConsumerSnapshot TPartition::CreateSnapshot(TUserInfo& userInfo) const {
result.LastReadMessage.WriteTimestamp = userInfo.ReadWriteTimestamp;
} else {
auto timestamp = GetWriteTimeEstimate(readOffset);
- result.LastCommittedMessage.CreateTimestamp = timestamp;
- result.LastCommittedMessage.WriteTimestamp = timestamp;
+ result.LastReadMessage.CreateTimestamp = timestamp;
+ result.LastReadMessage.WriteTimestamp = timestamp;
}
if (readOffset < (i64)GetEndOffset()) {
result.ReadLag = result.LastReadTimestamp - result.LastReadMessage.WriteTimestamp;
}
- result.CommitedLag = result.LastCommittedMessage.WriteTimestamp - now;
+ result.CommitedLag = now - result.LastCommittedMessage.WriteTimestamp;
result.TotalLag = TDuration::MilliSeconds(userInfo.GetWriteLagMs()) + result.ReadLag + (now - result.LastReadTimestamp);
return result;
diff --git a/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp b/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp
index a8f658291b5..a54d837a03f 100644
--- a/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp
+++ b/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp
@@ -55,7 +55,7 @@ Y_UNIT_TEST_SUITE(WithSDK) {
session->Close(TDuration::Seconds(5));
};
- // Check describe for empty topic
+ Cerr << ">>>>> Check describe for empty topic\n";
{
auto d = describe();
UNIT_ASSERT_STRINGS_EQUAL(TEST_CONSUMER, d.GetConsumer().GetConsumerName());
@@ -75,9 +75,12 @@ Y_UNIT_TEST_SUITE(WithSDK) {
}
write(3);
+ Sleep(TDuration::Seconds(2));
write(7);
+ Sleep(TDuration::Seconds(2));
+ write(11);
- // Check describe for topic which contains messages, but consumer hasn`t read
+ Cerr << ">>>>> Check describe for topic which contains messages, but consumer hasn`t read\n";
{
auto d = describe();
UNIT_ASSERT_STRINGS_EQUAL(TEST_CONSUMER, d.GetConsumer().GetConsumerName());
@@ -85,20 +88,20 @@ Y_UNIT_TEST_SUITE(WithSDK) {
auto& p = d.GetPartitions()[0];
UNIT_ASSERT_VALUES_EQUAL(0, p.GetPartitionId());
UNIT_ASSERT_VALUES_EQUAL(true, p.GetActive());
- UNIT_ASSERT_VALUES_EQUAL(2, p.GetPartitionStats()->GetEndOffset());
+ UNIT_ASSERT_VALUES_EQUAL(3, p.GetPartitionStats()->GetEndOffset());
auto& c = p.GetPartitionConsumerStats();
UNIT_ASSERT_VALUES_EQUAL(true, c.has_value());
UNIT_ASSERT_VALUES_EQUAL(0, c->GetCommittedOffset());
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(7), c->GetMaxWriteTimeLag()); //
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxReadTimeLag());
- UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxCommittedTimeLag());
+ UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(4), c->GetMaxCommittedTimeLag());
UNIT_ASSERT_TIME_EQUAL(TInstant::Now(), c->GetLastReadTime(), TDuration::Seconds(3)); // why not zero?
UNIT_ASSERT_VALUES_EQUAL(1, c->GetLastReadOffset());
}
UNIT_ASSERT(setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 1).IsSuccess());
- // Check describe for topic whis contains messages, has commited offset but hasn`t read (restart tablet for example)
+ Cerr << ">>>>> Check describe for topic whis contains messages, has commited offset but hasn`t read (restart tablet for example)\n";
{
auto d = describe();
UNIT_ASSERT_STRINGS_EQUAL(TEST_CONSUMER, d.GetConsumer().GetConsumerName());
@@ -106,13 +109,13 @@ Y_UNIT_TEST_SUITE(WithSDK) {
auto& p = d.GetPartitions()[0];
UNIT_ASSERT_VALUES_EQUAL(0, p.GetPartitionId());
UNIT_ASSERT_VALUES_EQUAL(true, p.GetActive());
- UNIT_ASSERT_VALUES_EQUAL(2, p.GetPartitionStats()->GetEndOffset());
+ UNIT_ASSERT_VALUES_EQUAL(3, p.GetPartitionStats()->GetEndOffset());
auto& c = p.GetPartitionConsumerStats();
UNIT_ASSERT_VALUES_EQUAL(true, c.has_value());
UNIT_ASSERT_VALUES_EQUAL(1, c->GetCommittedOffset());
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(7), c->GetMaxWriteTimeLag());
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxReadTimeLag());
- UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxCommittedTimeLag());
+ UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(4), c->GetMaxCommittedTimeLag());
UNIT_ASSERT_TIME_EQUAL(TInstant::Now(), c->GetLastReadTime(), TDuration::Seconds(3)); // why not zero?
UNIT_ASSERT_VALUES_EQUAL(1, c->GetLastReadOffset());
}
@@ -143,7 +146,7 @@ Y_UNIT_TEST_SUITE(WithSDK) {
session->Close(TDuration::Seconds(1));
}
- // Check describe for topic wich contains messages, has commited offset of first message and read second message
+ Cerr << ">>>>> Check describe for topic wich contains messages, has commited offset of first message and read second message\n";
{
auto d = describe();
UNIT_ASSERT_STRINGS_EQUAL(TEST_CONSUMER, d.GetConsumer().GetConsumerName());
@@ -151,15 +154,15 @@ Y_UNIT_TEST_SUITE(WithSDK) {
auto& p = d.GetPartitions()[0];
UNIT_ASSERT_VALUES_EQUAL(0, p.GetPartitionId());
UNIT_ASSERT_VALUES_EQUAL(true, p.GetActive());
- UNIT_ASSERT_VALUES_EQUAL(2, p.GetPartitionStats()->GetEndOffset());
+ UNIT_ASSERT_VALUES_EQUAL(3, p.GetPartitionStats()->GetEndOffset());
auto& c = p.GetPartitionConsumerStats();
UNIT_ASSERT_VALUES_EQUAL(true, c.has_value());
UNIT_ASSERT_VALUES_EQUAL(1, c->GetCommittedOffset());
- UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(7), c->GetMaxWriteTimeLag());
+ //UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(7), c->GetMaxWriteTimeLag());
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxReadTimeLag());
- UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxCommittedTimeLag());
+ UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(4), c->GetMaxCommittedTimeLag());
UNIT_ASSERT_TIME_EQUAL(TInstant::Now(), c->GetLastReadTime(), TDuration::Seconds(3));
- UNIT_ASSERT_VALUES_EQUAL(2, c->GetLastReadOffset());
+ UNIT_ASSERT_VALUES_EQUAL(3, c->GetLastReadOffset());
}
}