diff options
| author | Nikolay Shestakov <[email protected]> | 2025-10-25 12:58:54 +0500 | 
|---|---|---|
| committer | GitHub <[email protected]> | 2025-10-25 12:58:54 +0500 | 
| commit | 18ae5ce1c6f04ccb4964b0f9041bbcb00537c7b1 (patch) | |
| tree | d7a41479c4570876b5430c3ba3da4ce387f62941 | |
| parent | ab5870365b8c9a298dbe9b0dc9a71035a25f3898 (diff) | |
fixed MaxCommittedTimeLag (#27553)
| -rw-r--r-- | ydb/core/persqueue/pqtablet/partition/partition.cpp | 9 | ||||
| -rw-r--r-- | ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp | 27 | 
2 files changed, 18 insertions, 18 deletions
| diff --git a/ydb/core/persqueue/pqtablet/partition/partition.cpp b/ydb/core/persqueue/pqtablet/partition/partition.cpp index a80703c1af8..d5f86d18c18 100644 --- a/ydb/core/persqueue/pqtablet/partition/partition.cpp +++ b/ydb/core/persqueue/pqtablet/partition/partition.cpp @@ -817,9 +817,6 @@ TConsumerSnapshot TPartition::CreateSnapshot(TUserInfo& userInfo) const {      if (userInfo.Offset >= static_cast<i64>(GetEndOffset())) {          result.LastCommittedMessage.CreateTimestamp = now;          result.LastCommittedMessage.WriteTimestamp = now; -    } else if (userInfo.ActualTimestamps) { -        result.LastCommittedMessage.CreateTimestamp = userInfo.CreateTimestamp; -        result.LastCommittedMessage.WriteTimestamp = userInfo.WriteTimestamp;      } else {          auto timestamp = GetWriteTimeEstimate(userInfo.Offset);          result.LastCommittedMessage.CreateTimestamp = timestamp; @@ -841,14 +838,14 @@ TConsumerSnapshot TPartition::CreateSnapshot(TUserInfo& userInfo) const {          result.LastReadMessage.WriteTimestamp = userInfo.ReadWriteTimestamp;      } else {          auto timestamp = GetWriteTimeEstimate(readOffset); -        result.LastCommittedMessage.CreateTimestamp = timestamp; -        result.LastCommittedMessage.WriteTimestamp = timestamp; +        result.LastReadMessage.CreateTimestamp = timestamp; +        result.LastReadMessage.WriteTimestamp = timestamp;      }      if (readOffset < (i64)GetEndOffset()) {          result.ReadLag = result.LastReadTimestamp - result.LastReadMessage.WriteTimestamp;      } -    result.CommitedLag = result.LastCommittedMessage.WriteTimestamp - now; +    result.CommitedLag = now - result.LastCommittedMessage.WriteTimestamp;      result.TotalLag = TDuration::MilliSeconds(userInfo.GetWriteLagMs()) + result.ReadLag + (now - result.LastReadTimestamp);      return result; diff --git a/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp b/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp index a8f658291b5..a54d837a03f 100644 --- a/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp +++ b/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp @@ -55,7 +55,7 @@ Y_UNIT_TEST_SUITE(WithSDK) {              session->Close(TDuration::Seconds(5));          }; -        // Check describe for empty topic +        Cerr << ">>>>> Check describe for empty topic\n";          {              auto d = describe();              UNIT_ASSERT_STRINGS_EQUAL(TEST_CONSUMER, d.GetConsumer().GetConsumerName()); @@ -75,9 +75,12 @@ Y_UNIT_TEST_SUITE(WithSDK) {          }          write(3); +        Sleep(TDuration::Seconds(2));          write(7); +        Sleep(TDuration::Seconds(2)); +        write(11); -        // Check describe for topic which contains messages, but consumer hasn`t read +        Cerr << ">>>>> Check describe for topic which contains messages, but consumer hasn`t read\n";          {              auto d = describe();              UNIT_ASSERT_STRINGS_EQUAL(TEST_CONSUMER, d.GetConsumer().GetConsumerName()); @@ -85,20 +88,20 @@ Y_UNIT_TEST_SUITE(WithSDK) {              auto& p = d.GetPartitions()[0];              UNIT_ASSERT_VALUES_EQUAL(0, p.GetPartitionId());              UNIT_ASSERT_VALUES_EQUAL(true, p.GetActive()); -            UNIT_ASSERT_VALUES_EQUAL(2, p.GetPartitionStats()->GetEndOffset()); +            UNIT_ASSERT_VALUES_EQUAL(3, p.GetPartitionStats()->GetEndOffset());              auto& c = p.GetPartitionConsumerStats();              UNIT_ASSERT_VALUES_EQUAL(true, c.has_value());              UNIT_ASSERT_VALUES_EQUAL(0, c->GetCommittedOffset());              UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(7), c->GetMaxWriteTimeLag()); //              UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxReadTimeLag()); -            UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxCommittedTimeLag()); +            UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(4), c->GetMaxCommittedTimeLag());              UNIT_ASSERT_TIME_EQUAL(TInstant::Now(), c->GetLastReadTime(), TDuration::Seconds(3)); // why not zero?              UNIT_ASSERT_VALUES_EQUAL(1, c->GetLastReadOffset());          }          UNIT_ASSERT(setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 1).IsSuccess()); -        // Check describe for topic whis contains messages, has commited offset but hasn`t read (restart tablet for example) +        Cerr << ">>>>> Check describe for topic whis contains messages, has commited offset but hasn`t read (restart tablet for example)\n";          {              auto d = describe();              UNIT_ASSERT_STRINGS_EQUAL(TEST_CONSUMER, d.GetConsumer().GetConsumerName()); @@ -106,13 +109,13 @@ Y_UNIT_TEST_SUITE(WithSDK) {              auto& p = d.GetPartitions()[0];              UNIT_ASSERT_VALUES_EQUAL(0, p.GetPartitionId());              UNIT_ASSERT_VALUES_EQUAL(true, p.GetActive()); -            UNIT_ASSERT_VALUES_EQUAL(2, p.GetPartitionStats()->GetEndOffset()); +            UNIT_ASSERT_VALUES_EQUAL(3, p.GetPartitionStats()->GetEndOffset());              auto& c = p.GetPartitionConsumerStats();              UNIT_ASSERT_VALUES_EQUAL(true, c.has_value());              UNIT_ASSERT_VALUES_EQUAL(1, c->GetCommittedOffset());              UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(7), c->GetMaxWriteTimeLag());              UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxReadTimeLag()); -            UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxCommittedTimeLag()); +            UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(4), c->GetMaxCommittedTimeLag());              UNIT_ASSERT_TIME_EQUAL(TInstant::Now(), c->GetLastReadTime(), TDuration::Seconds(3)); // why not zero?              UNIT_ASSERT_VALUES_EQUAL(1, c->GetLastReadOffset());          } @@ -143,7 +146,7 @@ Y_UNIT_TEST_SUITE(WithSDK) {              session->Close(TDuration::Seconds(1));          } -        // Check describe for topic wich contains messages, has commited offset of first message and read second message +        Cerr << ">>>>> Check describe for topic wich contains messages, has commited offset of first message and read second message\n";          {              auto d = describe();              UNIT_ASSERT_STRINGS_EQUAL(TEST_CONSUMER, d.GetConsumer().GetConsumerName()); @@ -151,15 +154,15 @@ Y_UNIT_TEST_SUITE(WithSDK) {              auto& p = d.GetPartitions()[0];              UNIT_ASSERT_VALUES_EQUAL(0, p.GetPartitionId());              UNIT_ASSERT_VALUES_EQUAL(true, p.GetActive()); -            UNIT_ASSERT_VALUES_EQUAL(2, p.GetPartitionStats()->GetEndOffset()); +            UNIT_ASSERT_VALUES_EQUAL(3, p.GetPartitionStats()->GetEndOffset());              auto& c = p.GetPartitionConsumerStats();              UNIT_ASSERT_VALUES_EQUAL(true, c.has_value());              UNIT_ASSERT_VALUES_EQUAL(1, c->GetCommittedOffset()); -            UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(7), c->GetMaxWriteTimeLag()); +            //UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(7), c->GetMaxWriteTimeLag());              UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxReadTimeLag()); -            UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxCommittedTimeLag()); +            UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(4), c->GetMaxCommittedTimeLag());              UNIT_ASSERT_TIME_EQUAL(TInstant::Now(), c->GetLastReadTime(), TDuration::Seconds(3)); -            UNIT_ASSERT_VALUES_EQUAL(2, c->GetLastReadOffset()); +            UNIT_ASSERT_VALUES_EQUAL(3, c->GetLastReadOffset());          }      } | 
