aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikolay Shestakov <tesseract@ydb.tech>2025-04-09 16:44:42 +0500
committerGitHub <noreply@github.com>2025-04-09 16:44:42 +0500
commit651a677f8d638814bb2013fc87df737a92b279c9 (patch)
tree9c63354d4b150770837357dbe88d2dd617eab91b
parent13ca27d3cd2639beb3fd541fe64b07c9dd8193b7 (diff)
downloadydb-651a677f8d638814bb2013fc87df737a92b279c9.tar.gz
Added max_committed_time_lag for DescribeConsumer (#16857)
-rw-r--r--ydb/core/persqueue/partition.cpp150
-rw-r--r--ydb/core/persqueue/partition.h2
-rw-r--r--ydb/core/persqueue/partition_monitoring.cpp27
-rw-r--r--ydb/core/persqueue/partition_read.cpp12
-rw-r--r--ydb/core/persqueue/user_info.h59
-rw-r--r--ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp165
-rw-r--r--ydb/core/persqueue/ut/ut_with_sdk/ya.make1
-rw-r--r--ydb/core/protos/pqconfig.proto2
-rw-r--r--ydb/public/api/protos/ydb_topic.proto4
-rw-r--r--ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/control_plane.h2
-rw-r--r--ydb/public/sdk/cpp/src/client/topic/impl/topic.cpp5
-rw-r--r--ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp21
-rw-r--r--ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h3
-rw-r--r--ydb/services/persqueue_v1/actors/schema_actors.cpp5
14 files changed, 352 insertions, 106 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index 40796d3dffc..4d12b2167ab 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -650,6 +650,54 @@ void TPartition::Handle(TEvPQ::TEvPipeDisconnected::TPtr& ev, const TActorContex
}
+TConsumerSnapshot TPartition::CreateSnapshot(TUserInfo& userInfo) const {
+ auto now = TAppData::TimeProvider->Now();
+
+ userInfo.UpdateReadingTimeAndState(EndOffset, now);
+
+ TConsumerSnapshot result;
+ result.Now = now;
+
+ if (userInfo.Offset >= static_cast<i64>(EndOffset)) {
+ result.LastCommittedMessage.CreateTimestamp = now;
+ result.LastCommittedMessage.WriteTimestamp = now;
+ } else if (userInfo.ActualTimestamps) {
+ result.LastCommittedMessage.CreateTimestamp = userInfo.CreateTimestamp;
+ result.LastCommittedMessage.WriteTimestamp = userInfo.WriteTimestamp;
+ } else {
+ auto timestamp = GetWriteTimeEstimate(userInfo.Offset);
+ result.LastCommittedMessage.CreateTimestamp = timestamp;
+ result.LastCommittedMessage.WriteTimestamp = timestamp;
+ }
+
+ auto readOffset = userInfo.GetReadOffset();
+
+ result.ReadOffset = readOffset;
+ result.LastReadTimestamp = userInfo.ReadTimestamp;
+
+ if (readOffset >= static_cast<i64>(EndOffset)) {
+ result.LastReadMessage.CreateTimestamp = now;
+ result.LastReadMessage.WriteTimestamp = now;
+ } else if (userInfo.ReadOffset == -1) {
+ result.LastReadMessage = result.LastCommittedMessage;
+ } else if (userInfo.ReadWriteTimestamp) {
+ result.LastReadMessage.CreateTimestamp = userInfo.ReadCreateTimestamp;
+ result.LastReadMessage.WriteTimestamp = userInfo.ReadWriteTimestamp;
+ } else {
+ auto timestamp = GetWriteTimeEstimate(readOffset);
+ result.LastCommittedMessage.CreateTimestamp = timestamp;
+ result.LastCommittedMessage.WriteTimestamp = timestamp;
+ }
+
+ if (readOffset < (i64)EndOffset) {
+ result.ReadLag = result.LastReadTimestamp - result.LastReadMessage.WriteTimestamp;
+ }
+ result.CommitedLag = result.LastCommittedMessage.WriteTimestamp - now;
+ result.TotalLag = TDuration::MilliSeconds(userInfo.GetWriteLagMs()) + result.ReadLag + (now - result.LastReadTimestamp);
+
+ return result;
+}
+
void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext& ctx) {
const auto now = ctx.Now();
@@ -696,7 +744,7 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext
bool filterConsumers = !ev->Get()->Consumers.empty();
TSet<TString> requiredConsumers(ev->Get()->Consumers.begin(), ev->Get()->Consumers.end());
for (auto& userInfoPair : UsersInfoStorage->GetAll()) {
- const auto& userInfo = userInfoPair.second;
+ auto& userInfo = userInfoPair.second;
auto& clientId = ev->Get()->ClientId;
bool consumerShouldBeProcessed = filterConsumers
? requiredConsumers.contains(userInfo.User)
@@ -719,46 +767,32 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext
continue;
}
- auto estimateWriteTimestamp = [&]() {
- auto timestamp = userInfo.GetWriteTimestamp(EndOffset);
- if (!timestamp) {
- timestamp = GetWriteTimeEstimate(userInfo.Offset);
- }
- return timestamp;
- };
-
- auto estimateReadWriteTimestamp = [&]() {
- auto timestamp = userInfo.GetReadWriteTimestamp(EndOffset);
- if (!timestamp) {
- timestamp = GetWriteTimeEstimate(userInfo.GetReadOffset());
- };
- return timestamp;
- };
-
if (clientId == userInfo.User) { //fill lags
NKikimrPQ::TClientInfo* clientInfo = result.MutableLagsInfo();
clientInfo->SetClientId(userInfo.User);
+ auto snapshot = CreateSnapshot(userInfo);
+
auto write = clientInfo->MutableWritePosition();
write->SetOffset(userInfo.Offset);
- write->SetWriteTimestamp(estimateWriteTimestamp().MilliSeconds());
- write->SetCreateTimestamp(userInfo.GetCreateTimestamp(EndOffset).MilliSeconds());
+ write->SetWriteTimestamp(snapshot.LastCommittedMessage.WriteTimestamp.MilliSeconds());
+ write->SetCreateTimestamp(snapshot.LastCommittedMessage.CreateTimestamp.MilliSeconds());
write->SetSize(GetSizeLag(userInfo.Offset));
- auto read = clientInfo->MutableReadPosition();
- read->SetOffset(userInfo.GetReadOffset());
- read->SetWriteTimestamp(estimateReadWriteTimestamp().MilliSeconds());
- read->SetCreateTimestamp(userInfo.GetReadCreateTimestamp(EndOffset).MilliSeconds());
- read->SetSize(GetSizeLag(userInfo.GetReadOffset()));
+ auto readOffset = userInfo.GetReadOffset();
- clientInfo->SetLastReadTimestampMs(userInfo.GetReadTimestamp().MilliSeconds());
- if (IsActive() || userInfo.GetReadOffset() < (i64)EndOffset) {
- clientInfo->SetReadLagMs(userInfo.GetReadOffset() < (i64)EndOffset
- ? (userInfo.GetReadTimestamp() - TInstant::MilliSeconds(read->GetWriteTimestamp())).MilliSeconds()
- : 0);
+ auto read = clientInfo->MutableReadPosition();
+ read->SetOffset(readOffset);
+ read->SetWriteTimestamp(snapshot.LastReadMessage.WriteTimestamp.MilliSeconds());
+ read->SetCreateTimestamp(snapshot.LastReadMessage.CreateTimestamp.MilliSeconds());
+ read->SetSize(GetSizeLag(readOffset));
+
+ clientInfo->SetLastReadTimestampMs(snapshot.LastReadTimestamp.MilliSeconds());
+ clientInfo->SetCommitedLagMs(snapshot.CommitedLag.MilliSeconds());
+ if (IsActive() || readOffset < (i64)EndOffset) {
+ clientInfo->SetReadLagMs(snapshot.ReadLag.MilliSeconds());
clientInfo->SetWriteLagMs(userInfo.GetWriteLagMs());
- ui64 totalLag = clientInfo->GetReadLagMs() + userInfo.GetWriteLagMs() + (now - userInfo.GetReadTimestamp()).MilliSeconds();
- clientInfo->SetTotalLagMs(totalLag);
+ clientInfo->SetTotalLagMs(snapshot.TotalLag.MilliSeconds());
} else {
clientInfo->SetReadLagMs(0);
clientInfo->SetWriteLagMs(0);
@@ -767,14 +801,16 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext
}
if (ev->Get()->GetStatForAllConsumers) { //fill lags
+ auto snapshot = CreateSnapshot(userInfo);
+
auto* clientInfo = result.AddConsumerResult();
clientInfo->SetConsumer(userInfo.User);
clientInfo->SetLastReadTimestampMs(userInfo.GetReadTimestamp().MilliSeconds());
+ clientInfo->SetCommitedLagMs(snapshot.CommitedLag.MilliSeconds());
- if (IsActive() || userInfo.GetReadOffset() < (i64)EndOffset) {
- clientInfo->SetReadLagMs(userInfo.GetReadOffset() < (i64)EndOffset
- ? (userInfo.GetReadTimestamp() - estimateReadWriteTimestamp()).MilliSeconds()
- : 0);
+ auto readOffset = userInfo.GetReadOffset();
+ if (IsActive() || readOffset < (i64)EndOffset) {
+ clientInfo->SetReadLagMs(snapshot.ReadLag.MilliSeconds());
clientInfo->SetWriteLagMs(userInfo.GetWriteLagMs());
} else {
clientInfo->SetReadLagMs(0);
@@ -862,20 +898,22 @@ void TPartition::Handle(TEvPQ::TEvGetPartitionClientInfo::TPtr& ev, const TActor
result.SetEndOffset(EndOffset);
result.SetResponseTimestamp(ctx.Now().MilliSeconds());
for (auto& pr : UsersInfoStorage->GetAll()) {
+ auto snapshot = CreateSnapshot(pr.second);
+
TUserInfo& userInfo(pr.second);
NKikimrPQ::TClientInfo& clientInfo = *result.AddClientInfo();
clientInfo.SetClientId(pr.first);
auto& write = *clientInfo.MutableWritePosition();
write.SetOffset(userInfo.Offset);
- write.SetWriteTimestamp((userInfo.GetWriteTimestamp(EndOffset) ? userInfo.GetWriteTimestamp(EndOffset) : GetWriteTimeEstimate(userInfo.Offset)).MilliSeconds());
- write.SetCreateTimestamp(userInfo.GetCreateTimestamp(EndOffset).MilliSeconds());
+ write.SetWriteTimestamp(snapshot.LastCommittedMessage.WriteTimestamp.MilliSeconds());
+ write.SetCreateTimestamp(snapshot.LastCommittedMessage.CreateTimestamp.MilliSeconds());
write.SetSize(GetSizeLag(userInfo.Offset));
auto& read = *clientInfo.MutableReadPosition();
read.SetOffset(userInfo.GetReadOffset());
- read.SetWriteTimestamp((userInfo.GetReadWriteTimestamp(EndOffset) ? userInfo.GetReadWriteTimestamp(EndOffset) : GetWriteTimeEstimate(userInfo.GetReadOffset())).MilliSeconds());
- read.SetCreateTimestamp(userInfo.GetReadCreateTimestamp(EndOffset).MilliSeconds());
+ read.SetWriteTimestamp(snapshot.LastReadMessage.WriteTimestamp.MilliSeconds());
+ read.SetCreateTimestamp(snapshot.LastReadMessage.CreateTimestamp.MilliSeconds());
read.SetSize(GetSizeLag(userInfo.GetReadOffset()));
}
ctx.Send(ev->Get()->Sender, response.Release(), 0, ev->Cookie);
@@ -1528,37 +1566,31 @@ bool TPartition::UpdateCounters(const TActorContext& ctx, bool force) {
if (userInfoPair.first != CLIENTID_WITHOUT_CONSUMER && !userInfo.HasReadRule && !userInfo.Important)
continue;
bool haveChanges = false;
- userInfo.UpdateReadingTimeAndState(EndOffset, now);
- ui64 ts = userInfo.GetWriteTimestamp(EndOffset).MilliSeconds();
+ auto snapshot = CreateSnapshot(userInfo);
+ auto ts = snapshot.LastCommittedMessage.WriteTimestamp.MilliSeconds();
if (ts < MIN_TIMESTAMP_MS) ts = Max<i64>();
if (userInfo.LabeledCounters->GetCounters()[METRIC_COMMIT_WRITE_TIME].Get() != ts) {
haveChanges = true;
userInfo.LabeledCounters->GetCounters()[METRIC_COMMIT_WRITE_TIME].Set(ts);
}
- ts = userInfo.GetCreateTimestamp(EndOffset).MilliSeconds();
+ ts = snapshot.LastCommittedMessage.CreateTimestamp.MilliSeconds();
if (ts < MIN_TIMESTAMP_MS) ts = Max<i64>();
if (userInfo.LabeledCounters->GetCounters()[METRIC_COMMIT_CREATE_TIME].Get() != ts) {
haveChanges = true;
userInfo.LabeledCounters->GetCounters()[METRIC_COMMIT_CREATE_TIME].Set(ts);
}
- ts = userInfo.GetReadWriteTimestamp(EndOffset).MilliSeconds();
- if (userInfo.LabeledCounters->GetCounters()[METRIC_READ_WRITE_TIME].Get() != ts) {
+ auto readWriteTimestamp = snapshot.LastReadMessage.WriteTimestamp;
+ if (userInfo.LabeledCounters->GetCounters()[METRIC_READ_WRITE_TIME].Get() != readWriteTimestamp.MilliSeconds()) {
haveChanges = true;
- userInfo.LabeledCounters->GetCounters()[METRIC_READ_WRITE_TIME].Set(ts);
+ userInfo.LabeledCounters->GetCounters()[METRIC_READ_WRITE_TIME].Set(readWriteTimestamp.MilliSeconds());
}
- i64 off = userInfo.GetReadOffset(); //we want to track first not-readed offset
- TInstant wts = userInfo.GetReadWriteTimestamp(EndOffset) ? userInfo.GetReadWriteTimestamp(EndOffset) : GetWriteTimeEstimate(userInfo.GetReadOffset());
- TInstant readTimestamp = userInfo.GetReadTimestamp();
- ui64 readTimeLag = off >= (i64)EndOffset ? 0 : (readTimestamp - wts).MilliSeconds();
- ui64 totalLag = userInfo.GetWriteLagMs() + readTimeLag + (now - readTimestamp).MilliSeconds();
-
- if (userInfo.LabeledCounters->GetCounters()[METRIC_READ_TOTAL_TIME].Get() != totalLag) {
+ if (userInfo.LabeledCounters->GetCounters()[METRIC_READ_TOTAL_TIME].Get() != snapshot.TotalLag.MilliSeconds()) {
haveChanges = true;
- userInfo.LabeledCounters->GetCounters()[METRIC_READ_TOTAL_TIME].Set(totalLag);
+ userInfo.LabeledCounters->GetCounters()[METRIC_READ_TOTAL_TIME].Set(snapshot.TotalLag.MilliSeconds());
}
- ts = readTimestamp.MilliSeconds();
+ ts = snapshot.LastReadTimestamp.MilliSeconds();
if (userInfo.LabeledCounters->GetCounters()[METRIC_LAST_READ_TIME].Get() != ts) {
haveChanges = true;
userInfo.LabeledCounters->GetCounters()[METRIC_LAST_READ_TIME].Set(ts);
@@ -1570,9 +1602,9 @@ bool TPartition::UpdateCounters(const TActorContext& ctx, bool force) {
userInfo.LabeledCounters->GetCounters()[METRIC_WRITE_TIME_LAG].Set(timeLag);
}
- if (userInfo.LabeledCounters->GetCounters()[METRIC_READ_TIME_LAG].Get() != readTimeLag) {
+ if (userInfo.LabeledCounters->GetCounters()[METRIC_READ_TIME_LAG].Get() != snapshot.ReadLag.MilliSeconds()) {
haveChanges = true;
- userInfo.LabeledCounters->GetCounters()[METRIC_READ_TIME_LAG].Set(readTimeLag);
+ userInfo.LabeledCounters->GetCounters()[METRIC_READ_TIME_LAG].Set(snapshot.ReadLag.MilliSeconds());
}
if (userInfo.LabeledCounters->GetCounters()[METRIC_COMMIT_MESSAGE_LAG].Get() != EndOffset - userInfo.Offset) {
@@ -1580,10 +1612,10 @@ bool TPartition::UpdateCounters(const TActorContext& ctx, bool force) {
userInfo.LabeledCounters->GetCounters()[METRIC_COMMIT_MESSAGE_LAG].Set(EndOffset - userInfo.Offset);
}
- if (userInfo.LabeledCounters->GetCounters()[METRIC_READ_MESSAGE_LAG].Get() != EndOffset - off) {
+ if (userInfo.LabeledCounters->GetCounters()[METRIC_READ_MESSAGE_LAG].Get() != EndOffset - snapshot.ReadOffset) {
haveChanges = true;
- userInfo.LabeledCounters->GetCounters()[METRIC_READ_MESSAGE_LAG].Set(EndOffset - off);
- userInfo.LabeledCounters->GetCounters()[METRIC_READ_TOTAL_MESSAGE_LAG].Set(EndOffset - off);
+ userInfo.LabeledCounters->GetCounters()[METRIC_READ_MESSAGE_LAG].Set(EndOffset - snapshot.ReadOffset);
+ userInfo.LabeledCounters->GetCounters()[METRIC_READ_TOTAL_MESSAGE_LAG].Set(EndOffset - snapshot.ReadOffset);
}
ui64 sizeLag = GetSizeLag(userInfo.Offset);
diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h
index b801c87ab2c..0752dcde37c 100644
--- a/ydb/core/persqueue/partition.h
+++ b/ydb/core/persqueue/partition.h
@@ -456,6 +456,8 @@ private:
ui64 GetReadOffset(ui64 offset, TMaybe<TInstant> readTimestamp) const;
+ TConsumerSnapshot CreateSnapshot(TUserInfo& userInfo) const;
+
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::PERSQUEUE_PARTITION_ACTOR;
diff --git a/ydb/core/persqueue/partition_monitoring.cpp b/ydb/core/persqueue/partition_monitoring.cpp
index 582afdb234b..b4d8b61769e 100644
--- a/ydb/core/persqueue/partition_monitoring.cpp
+++ b/ydb/core/persqueue/partition_monitoring.cpp
@@ -232,20 +232,21 @@ void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorCo
}
}
TABLEBODY() {
- for (auto& d: UsersInfoStorage->GetAll()) {
+ for (auto& [user, userInfo]: UsersInfoStorage->GetAll()) {
+ auto snapshot = CreateSnapshot(userInfo);
TABLER() {
- TABLED() {out << EncodeHtmlPcdata(d.first);}
- TABLED() {out << d.second.Offset;}
- TABLED() {out << (EndOffset - d.second.Offset);}
- TABLED() {out << ToStringLocalTimeUpToSeconds(d.second.ReadFromTimestamp);}
- TABLED() {out << ToStringLocalTimeUpToSeconds(d.second.WriteTimestamp);}
- TABLED() {out << ToStringLocalTimeUpToSeconds(d.second.CreateTimestamp);}
- TABLED() {out << (d.second.GetReadOffset());}
- TABLED() {out << ToStringLocalTimeUpToSeconds(d.second.GetReadWriteTimestamp(EndOffset));}
- TABLED() {out << ToStringLocalTimeUpToSeconds(d.second.GetReadCreateTimestamp(EndOffset));}
- TABLED() {out << (d.second.ReadOffsetRewindSum);}
- TABLED() {out << d.second.ActiveReads;}
- TABLED() {out << d.second.Subscriptions;}
+ TABLED() {out << EncodeHtmlPcdata(user);}
+ TABLED() {out << userInfo.Offset;}
+ TABLED() {out << (EndOffset - userInfo.Offset);}
+ TABLED() {out << ToStringLocalTimeUpToSeconds(userInfo.ReadFromTimestamp);}
+ TABLED() {out << ToStringLocalTimeUpToSeconds(snapshot.LastCommittedMessage.WriteTimestamp);}
+ TABLED() {out << ToStringLocalTimeUpToSeconds(snapshot.LastCommittedMessage.WriteTimestamp);}
+ TABLED() {out << (userInfo.GetReadOffset());}
+ TABLED() {out << ToStringLocalTimeUpToSeconds(snapshot.LastReadMessage.WriteTimestamp);}
+ TABLED() {out << ToStringLocalTimeUpToSeconds(snapshot.LastReadMessage.CreateTimestamp);}
+ TABLED() {out << (userInfo.ReadOffsetRewindSum);}
+ TABLED() {out << userInfo.ActiveReads;}
+ TABLED() {out << userInfo.Subscriptions;}
}
}
}
diff --git a/ydb/core/persqueue/partition_read.cpp b/ydb/core/persqueue/partition_read.cpp
index ba9355f62c4..f7af353f3e8 100644
--- a/ydb/core/persqueue/partition_read.cpp
+++ b/ydb/core/persqueue/partition_read.cpp
@@ -281,15 +281,13 @@ void TPartition::Handle(TEvPQ::TEvPartitionOffsets::TPtr& ev, const TActorContex
if (!ev->Get()->ClientId.empty()) {
TUserInfo* userInfo = UsersInfoStorage->GetIfExists(ev->Get()->ClientId);
if (userInfo) {
- i64 offset = Max<i64>(userInfo->Offset, 0);
+ auto snapshot = CreateSnapshot(*userInfo);
result.SetClientOffset(userInfo->Offset);
- TInstant tmp = userInfo->GetWriteTimestamp(EndOffset) ? userInfo->GetWriteTimestamp(EndOffset) : GetWriteTimeEstimate(offset);
- result.SetWriteTimestampMS(tmp.MilliSeconds());
- result.SetCreateTimestampMS(userInfo->GetCreateTimestamp(EndOffset).MilliSeconds());
+ result.SetWriteTimestampMS(snapshot.LastCommittedMessage.WriteTimestamp.MilliSeconds());
+ result.SetCreateTimestampMS(snapshot.LastCommittedMessage.CreateTimestamp.MilliSeconds());
result.SetClientReadOffset(userInfo->GetReadOffset());
- tmp = userInfo->GetReadWriteTimestamp(EndOffset) ? userInfo->GetReadWriteTimestamp(EndOffset) : GetWriteTimeEstimate(userInfo->GetReadOffset());
- result.SetReadWriteTimestampMS(tmp.MilliSeconds());
- result.SetReadCreateTimestampMS(userInfo->GetReadCreateTimestamp(EndOffset).MilliSeconds());
+ result.SetReadWriteTimestampMS(snapshot.LastReadMessage.WriteTimestamp.MilliSeconds());
+ result.SetReadCreateTimestampMS(snapshot.LastReadMessage.CreateTimestamp.MilliSeconds());
}
}
ctx.Send(ev->Get()->Sender, new TEvPQ::TEvPartitionOffsetsResponse(result, Partition));
diff --git a/ydb/core/persqueue/user_info.h b/ydb/core/persqueue/user_info.h
index 5aaad167d9f..86559394bd0 100644
--- a/ydb/core/persqueue/user_info.h
+++ b/ydb/core/persqueue/user_info.h
@@ -37,6 +37,25 @@ static const TString CLIENTID_WITHOUT_CONSUMER = "$without_consumer";
typedef TProtobufTabletLabeledCounters<EClientLabeledCounters_descriptor> TUserLabeledCounters;
+struct TMessageInfo {
+ TInstant CreateTimestamp;
+ TInstant WriteTimestamp;
+};
+
+struct TConsumerSnapshot {
+ TInstant Now;
+
+ TMessageInfo LastCommittedMessage;
+
+ i64 ReadOffset;
+ TInstant LastReadTimestamp;
+ TMessageInfo LastReadMessage;
+
+ TDuration ReadLag;
+ TDuration CommitedLag;
+ TDuration TotalLag;
+};
+
struct TUserInfoBase {
TString User;
ui64 ReadRuleGeneration = 0;
@@ -55,13 +74,20 @@ struct TUserInfoBase {
};
struct TUserInfo: public TUserInfoBase {
+ bool ActualTimestamps = false;
+ // WriteTimestamp of the last committed message
TInstant WriteTimestamp;
+ // CreateTimestamp of the last committed message
TInstant CreateTimestamp;
+
+ // Timstamp of the last read
TInstant ReadTimestamp;
- bool ActualTimestamps = false;
i64 ReadOffset = -1;
+
+ // WriteTimestamp of the last read message
TInstant ReadWriteTimestamp;
+ // CreateTimestamp of the last read message
TInstant ReadCreateTimestamp;
ui64 ReadOffsetRewindSum = 0;
@@ -175,13 +201,13 @@ struct TUserInfo: public TUserInfoBase {
)
: TUserInfoBase{user, readRuleGeneration, session, gen, step, offset, anyCommits, important,
readFromTimestamp, partitionSession, pipeClient}
- , WriteTimestamp(TAppData::TimeProvider->Now())
- , CreateTimestamp(TAppData::TimeProvider->Now())
- , ReadTimestamp(TAppData::TimeProvider->Now())
, ActualTimestamps(false)
+ , WriteTimestamp(TInstant::Zero())
+ , CreateTimestamp(TInstant::Zero())
+ , ReadTimestamp(TAppData::TimeProvider->Now())
, ReadOffset(-1)
- , ReadWriteTimestamp(TAppData::TimeProvider->Now())
- , ReadCreateTimestamp(TAppData::TimeProvider->Now())
+ , ReadWriteTimestamp(TInstant::Zero())
+ , ReadCreateTimestamp(TInstant::Zero())
, ReadOffsetRewindSum(readOffsetRewindSum)
, ReadScheduled(false)
, HasReadRule(false)
@@ -334,30 +360,9 @@ struct TUserInfo: public TUserInfoBase {
return ReadTimestamp;
}
- TInstant GetWriteTimestamp(i64 endOffset) const {
- return Offset == endOffset ? TAppData::TimeProvider->Now() : WriteTimestamp;
- }
-
- TInstant GetCreateTimestamp(i64 endOffset) const {
- return Offset == endOffset ? TAppData::TimeProvider->Now() : CreateTimestamp;
- }
-
- TInstant GetReadWriteTimestamp(i64 endOffset) const {
- TInstant ts = ReadOffset == -1 ? WriteTimestamp : ReadWriteTimestamp;
- ts = GetReadOffset() >= endOffset ? TAppData::TimeProvider->Now() : ts;
- return ts;
- }
-
ui64 GetWriteLagMs() const {
return WriteLagMs.GetValue();
}
-
- TInstant GetReadCreateTimestamp(i64 endOffset) const {
- TInstant ts = ReadOffset == -1 ? CreateTimestamp : ReadCreateTimestamp;
- ts = GetReadOffset() >= endOffset ? TAppData::TimeProvider->Now() : ts;
- return ts;
- }
-
};
class TUsersInfoStorage {
diff --git a/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp b/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp
new file mode 100644
index 00000000000..da5d349a71f
--- /dev/null
+++ b/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp
@@ -0,0 +1,165 @@
+#include <ydb/core/persqueue/ut/common/autoscaling_ut_common.h>
+
+#include <ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+#include <ydb/core/persqueue/partition_key_range/partition_key_range.h>
+#include <ydb/core/persqueue/partition_scale_manager.h>
+#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h>
+#include <ydb/core/tx/schemeshard/ut_helpers/test_env.h>
+
+#include <util/stream/output.h>
+
+namespace NKikimr {
+
+using namespace NYdb::NTopic;
+using namespace NYdb::NTopic::NTests;
+using namespace NSchemeShardUT_Private;
+using namespace NKikimr::NPQ::NTest;
+
+#define UNIT_ASSERT_TIME_EQUAL(A, B, D) \
+ do { \
+ if (!(((A - B) >= TDuration::Zero()) && ((A - B) <= D)) \
+ && !(((B - A) >= TDuration::Zero()) && ((B - A) <= D))) { \
+ auto&& failMsg = Sprintf("%s and %s diferent more then %s", (::TStringBuilder() << A).data(), \
+ (::TStringBuilder() << B).data(), (::TStringBuilder() << D).data()); \
+ UNIT_FAIL_IMPL("assertion failure", failMsg); \
+ } \
+ } while (false)
+
+
+Y_UNIT_TEST_SUITE(WithSDK) {
+
+ Y_UNIT_TEST(DescribeConsumer) {
+ TTopicSdkTestSetup setup = CreateSetup();
+ setup.CreateTopic(std::string{TEST_TOPIC}, std::string{TEST_CONSUMER}, 1);
+
+ auto describe = [&]() {
+ return setup.DescribeConsumer(TString{TEST_TOPIC}, TString{TEST_CONSUMER});
+ };
+
+ auto write = [&](size_t seqNo) {
+ TTopicClient client(setup.MakeDriver());
+
+ TWriteSessionSettings settings;
+ settings.Path(TEST_TOPIC);
+ settings.PartitionId(0);
+ settings.DeduplicationEnabled(false);
+ auto session = client.CreateSimpleBlockingWriteSession(settings);
+
+ TWriteMessage msg(TStringBuilder() << "message_" << seqNo);
+ msg.CreateTimestamp(TInstant::Now() - TDuration::Seconds(10 - seqNo));
+ UNIT_ASSERT(session->Write(std::move(msg)));
+
+ session->Close(TDuration::Seconds(5));
+ };
+
+ // Check describe for empty topic
+ {
+ auto d = describe();
+ UNIT_ASSERT_STRINGS_EQUAL(TEST_CONSUMER, d.GetConsumer().GetConsumerName());
+ UNIT_ASSERT_VALUES_EQUAL(1, d.GetPartitions().size());
+ auto& p = d.GetPartitions()[0];
+ UNIT_ASSERT_VALUES_EQUAL(0, p.GetPartitionId());
+ UNIT_ASSERT_VALUES_EQUAL(true, p.GetActive());
+ UNIT_ASSERT_VALUES_EQUAL(0, p.GetPartitionStats()->GetEndOffset());
+ auto& c = p.GetPartitionConsumerStats();
+ UNIT_ASSERT_VALUES_EQUAL(true, c.has_value());
+ UNIT_ASSERT_VALUES_EQUAL(0, c->GetCommittedOffset());
+ UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxWriteTimeLag());
+ UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxReadTimeLag());
+ UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxCommittedTimeLag());
+ UNIT_ASSERT_TIME_EQUAL(TInstant::Now(), c->GetLastReadTime(), TDuration::Seconds(3)); // why not zero?
+ UNIT_ASSERT_VALUES_EQUAL(0, c->GetLastReadOffset());
+ }
+
+ write(3);
+ write(7);
+
+ // Check describe for topic which contains messages, but consumer hasn`t read
+ {
+ auto d = describe();
+ UNIT_ASSERT_STRINGS_EQUAL(TEST_CONSUMER, d.GetConsumer().GetConsumerName());
+ UNIT_ASSERT_VALUES_EQUAL(1, d.GetPartitions().size());
+ auto& p = d.GetPartitions()[0];
+ UNIT_ASSERT_VALUES_EQUAL(0, p.GetPartitionId());
+ UNIT_ASSERT_VALUES_EQUAL(true, p.GetActive());
+ UNIT_ASSERT_VALUES_EQUAL(2, p.GetPartitionStats()->GetEndOffset());
+ auto& c = p.GetPartitionConsumerStats();
+ UNIT_ASSERT_VALUES_EQUAL(true, c.has_value());
+ UNIT_ASSERT_VALUES_EQUAL(0, c->GetCommittedOffset());
+ UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(7), c->GetMaxWriteTimeLag()); //
+ UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxReadTimeLag());
+ UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxCommittedTimeLag());
+ UNIT_ASSERT_TIME_EQUAL(TInstant::Now(), c->GetLastReadTime(), TDuration::Seconds(3)); // why not zero?
+ UNIT_ASSERT_VALUES_EQUAL(1, c->GetLastReadOffset());
+ }
+
+ UNIT_ASSERT(setup.Commit(TString{TEST_TOPIC}, TEST_CONSUMER, 0, 1).IsSuccess());
+
+ // Check describe for topic whis contains messages, has commited offset but hasn`t read (restart tablet for example)
+ {
+ auto d = describe();
+ UNIT_ASSERT_STRINGS_EQUAL(TEST_CONSUMER, d.GetConsumer().GetConsumerName());
+ UNIT_ASSERT_VALUES_EQUAL(1, d.GetPartitions().size());
+ auto& p = d.GetPartitions()[0];
+ UNIT_ASSERT_VALUES_EQUAL(0, p.GetPartitionId());
+ UNIT_ASSERT_VALUES_EQUAL(true, p.GetActive());
+ UNIT_ASSERT_VALUES_EQUAL(2, p.GetPartitionStats()->GetEndOffset());
+ auto& c = p.GetPartitionConsumerStats();
+ UNIT_ASSERT_VALUES_EQUAL(true, c.has_value());
+ UNIT_ASSERT_VALUES_EQUAL(1, c->GetCommittedOffset());
+ UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(7), c->GetMaxWriteTimeLag());
+ UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxReadTimeLag());
+ UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxCommittedTimeLag());
+ UNIT_ASSERT_TIME_EQUAL(TInstant::Now(), c->GetLastReadTime(), TDuration::Seconds(3)); // why not zero?
+ UNIT_ASSERT_VALUES_EQUAL(1, c->GetLastReadOffset());
+ }
+
+ {
+ TTopicClient client(setup.MakeDriver());
+ TReadSessionSettings settings;
+ settings.ConsumerName(TEST_CONSUMER);
+ settings.AppendTopics(TTopicReadSettings().Path(TEST_TOPIC));
+
+ auto session = client.CreateReadSession(settings);
+
+ TInstant endTime = TInstant::Now() + TDuration::Seconds(5);
+ while (true) {
+ auto e = session->GetEvent();
+ if (e) {
+ Cerr << ">>>>> Event = " << e->index() << Endl << Flush;
+ }
+ if (e && std::holds_alternative<TReadSessionEvent::TDataReceivedEvent>(e.value())) {
+ // we must recive only one date event with second message
+ break;
+ } else if (e && std::holds_alternative<TReadSessionEvent::TStartPartitionSessionEvent>(e.value())) {
+ std::get<TReadSessionEvent::TStartPartitionSessionEvent>(e.value()).Confirm();
+ }
+ UNIT_ASSERT_C(endTime > TInstant::Now(), "Unable wait");
+ }
+ }
+
+ // Check describe for topic wich contains messages, has commited offset of first message and read second message
+ {
+ auto d = describe();
+ UNIT_ASSERT_STRINGS_EQUAL(TEST_CONSUMER, d.GetConsumer().GetConsumerName());
+ UNIT_ASSERT_VALUES_EQUAL(1, d.GetPartitions().size());
+ auto& p = d.GetPartitions()[0];
+ UNIT_ASSERT_VALUES_EQUAL(0, p.GetPartitionId());
+ UNIT_ASSERT_VALUES_EQUAL(true, p.GetActive());
+ UNIT_ASSERT_VALUES_EQUAL(2, p.GetPartitionStats()->GetEndOffset());
+ auto& c = p.GetPartitionConsumerStats();
+ UNIT_ASSERT_VALUES_EQUAL(true, c.has_value());
+ UNIT_ASSERT_VALUES_EQUAL(1, c->GetCommittedOffset());
+ UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(7), c->GetMaxWriteTimeLag());
+ UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxReadTimeLag());
+ UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxCommittedTimeLag());
+ UNIT_ASSERT_TIME_EQUAL(TInstant::Now(), c->GetLastReadTime(), TDuration::Seconds(3));
+ UNIT_ASSERT_VALUES_EQUAL(2, c->GetLastReadOffset());
+ }
+
+ }
+}
+
+} // namespace NKikimr
diff --git a/ydb/core/persqueue/ut/ut_with_sdk/ya.make b/ydb/core/persqueue/ut/ut_with_sdk/ya.make
index fdb6c2ff405..eb370dff024 100644
--- a/ydb/core/persqueue/ut/ut_with_sdk/ya.make
+++ b/ydb/core/persqueue/ut/ut_with_sdk/ya.make
@@ -32,6 +32,7 @@ SRCS(
autoscaling_ut.cpp
balancing_ut.cpp
mirrorer_ut.cpp
+ topic_ut.cpp
)
END()
diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto
index 2f27923c359..b2c682c588f 100644
--- a/ydb/core/protos/pqconfig.proto
+++ b/ydb/core/protos/pqconfig.proto
@@ -751,6 +751,7 @@ message TClientInfo {
optional uint64 ReadLagMs = 5;
optional uint64 LastReadTimestampMs = 8;
optional uint64 TotalLagMs = 9;
+ optional uint64 CommitedLagMs = 10;
}
@@ -851,6 +852,7 @@ message TStatusResponse {
optional uint64 CommitedOffset = 9;
optional bool ReadingFinished = 10;
+ optional uint64 CommitedLagMs = 11;
}
diff --git a/ydb/public/api/protos/ydb_topic.proto b/ydb/public/api/protos/ydb_topic.proto
index a45a8c67314..5ac93836d15 100644
--- a/ydb/public/api/protos/ydb_topic.proto
+++ b/ydb/public/api/protos/ydb_topic.proto
@@ -810,6 +810,8 @@ message Consumer {
google.protobuf.Duration max_read_time_lag = 2;
// Maximum of differences between write timestamp and create timestamp for all messages, read during last minute.
google.protobuf.Duration max_write_time_lag = 3;
+ // The difference between the write timestamp of the last commited message and the current time.
+ google.protobuf.Duration max_committed_time_lag = 5;
// Bytes read statistics.
MultipleWindowsStat bytes_read = 4;
}
@@ -1214,6 +1216,8 @@ message DescribeConsumerResult {
google.protobuf.Duration max_read_time_lag = 6;
// Maximum of differences between write timestamp and create timestamp for all messages, read during last minute.
google.protobuf.Duration max_write_time_lag = 7;
+ // The difference between the write timestamp of the last commited message and the current time.
+ google.protobuf.Duration max_committed_time_lag = 13;
// How much bytes were read during several windows statistics from this partition.
MultipleWindowsStat bytes_read = 8;
diff --git a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/control_plane.h b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/control_plane.h
index c776e3f5b45..09611b1e07c 100644
--- a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/control_plane.h
+++ b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/control_plane.h
@@ -111,6 +111,7 @@ public:
const TInstant& GetLastReadTime() const;
const TDuration& GetMaxReadTimeLag() const;
const TDuration& GetMaxWriteTimeLag() const;
+ const TDuration& GetMaxCommittedTimeLag() const;
private:
uint64_t CommittedOffset_;
@@ -120,6 +121,7 @@ private:
TInstant LastReadTime_;
TDuration MaxReadTimeLag_;
TDuration MaxWriteTimeLag_;
+ TDuration MaxCommittedTimeLag_;
};
// Topic partition location
diff --git a/ydb/public/sdk/cpp/src/client/topic/impl/topic.cpp b/ydb/public/sdk/cpp/src/client/topic/impl/topic.cpp
index 280cd800b59..1717f29cd7e 100644
--- a/ydb/public/sdk/cpp/src/client/topic/impl/topic.cpp
+++ b/ydb/public/sdk/cpp/src/client/topic/impl/topic.cpp
@@ -376,6 +376,7 @@ TPartitionConsumerStats::TPartitionConsumerStats(const Ydb::Topic::DescribeConsu
, LastReadTime_(TInstant::Seconds(partitionStats.last_read_time().seconds()))
, MaxReadTimeLag_(TDuration::Seconds(partitionStats.max_read_time_lag().seconds()))
, MaxWriteTimeLag_(TDuration::Seconds(partitionStats.max_write_time_lag().seconds()))
+ , MaxCommittedTimeLag_(TDuration::Seconds(partitionStats.max_committed_time_lag().seconds()))
{}
uint64_t TPartitionConsumerStats::GetCommittedOffset() const {
@@ -406,6 +407,10 @@ const TDuration& TPartitionConsumerStats::GetMaxWriteTimeLag() const {
return MaxWriteTimeLag_;
}
+const TDuration& TPartitionConsumerStats::GetMaxCommittedTimeLag() const {
+ return MaxCommittedTimeLag_;
+}
+
TPartitionLocation::TPartitionLocation(const Ydb::Topic::PartitionLocation& partitionLocation)
: NodeId_(partitionLocation.node_id())
, Generation_(partitionLocation.generation())
diff --git a/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp b/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp
index c3253d1ea76..b61ec85322e 100644
--- a/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp
+++ b/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp
@@ -66,6 +66,27 @@ TTopicDescription TTopicSdkTestSetup::DescribeTopic(const TString& path)
return status.GetTopicDescription();
}
+TConsumerDescription TTopicSdkTestSetup::DescribeConsumer(const TString& path, const TString& consumer)
+{
+ TTopicClient client(MakeDriver());
+
+ TDescribeConsumerSettings settings;
+ settings.IncludeStats(true);
+ settings.IncludeLocation(true);
+
+ auto status = client.DescribeConsumer(path, consumer, settings).GetValueSync();
+ UNIT_ASSERT(status.IsSuccess());
+
+ return status.GetConsumerDescription();
+}
+
+TStatus TTopicSdkTestSetup::Commit(const TString& path, const TString& consumerName, size_t partitionId, size_t offset) {
+ TTopicClient client(MakeDriver());
+
+ return client.CommitOffset(path, partitionId, consumerName, offset).GetValueSync();
+}
+
+
TString TTopicSdkTestSetup::GetEndpoint() const {
return "localhost:" + ToString(Server.GrpcPort);
}
diff --git a/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h b/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h
index 3473b883b7c..4b6ee30e296 100644
--- a/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h
+++ b/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h
@@ -22,6 +22,9 @@ public:
size_t maxPartitionCount = 100);
TTopicDescription DescribeTopic(const TString& path = TString{TEST_TOPIC});
+ TConsumerDescription DescribeConsumer(const TString& path = TString{TEST_TOPIC}, const TString& consumer = TString{TEST_CONSUMER});
+
+ TStatus Commit(const TString& path, const TString& consumerName, size_t partitionId, size_t offset);
TString GetEndpoint() const;
TString GetTopicPath(const TString& name = TString{TEST_TOPIC}) const;
diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp
index bfd6519c95a..fd1ac8a47a3 100644
--- a/ydb/services/persqueue_v1/actors/schema_actors.cpp
+++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp
@@ -874,12 +874,14 @@ void TDescribeTopicActor::ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPer
SetProtoTime(stats->mutable_min_partitions_last_read_time(), cons.GetLastReadTimestampMs());
SetProtoTime(stats->mutable_max_read_time_lag(), cons.GetReadLagMs());
SetProtoTime(stats->mutable_max_write_time_lag(), cons.GetWriteLagMs());
+ SetProtoTime(stats->mutable_max_committed_time_lag(), cons.GetCommitedLagMs());
} else {
auto* stats = it->second->mutable_consumer_stats();
UpdateProtoTime(stats->mutable_min_partitions_last_read_time(), cons.GetLastReadTimestampMs(), true);
UpdateProtoTime(stats->mutable_max_read_time_lag(), cons.GetReadLagMs(), false);
UpdateProtoTime(stats->mutable_max_write_time_lag(), cons.GetWriteLagMs(), false);
+ UpdateProtoTime(stats->mutable_max_committed_time_lag(), cons.GetCommitedLagMs(), false);
}
AddWindowsStat(it->second->mutable_consumer_stats()->mutable_bytes_read(), cons.GetAvgReadSpeedPerMin(), cons.GetAvgReadSpeedPerHour(), cons.GetAvgReadSpeedPerDay());
@@ -988,6 +990,7 @@ void TDescribeConsumerActor::ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEv
SetProtoTime(consStats->mutable_last_read_time(), partResult.GetLagsInfo().GetLastReadTimestampMs());
SetProtoTime(consStats->mutable_max_read_time_lag(), partResult.GetLagsInfo().GetReadLagMs());
SetProtoTime(consStats->mutable_max_write_time_lag(), partResult.GetLagsInfo().GetWriteLagMs());
+ SetProtoTime(consStats->mutable_max_committed_time_lag(), partResult.GetLagsInfo().GetCommitedLagMs());
AddWindowsStat(consStats->mutable_bytes_read(), partResult.GetAvgReadSpeedPerMin(), partResult.GetAvgReadSpeedPerHour(), partResult.GetAvgReadSpeedPerDay());
@@ -997,12 +1000,14 @@ void TDescribeConsumerActor::ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEv
SetProtoTime(stats->mutable_min_partitions_last_read_time(), partResult.GetLagsInfo().GetLastReadTimestampMs());
SetProtoTime(stats->mutable_max_read_time_lag(), partResult.GetLagsInfo().GetReadLagMs());
SetProtoTime(stats->mutable_max_write_time_lag(), partResult.GetLagsInfo().GetWriteLagMs());
+ SetProtoTime(stats->mutable_max_committed_time_lag(), partResult.GetLagsInfo().GetCommitedLagMs());
} else {
auto* stats = Result.mutable_consumer()->mutable_consumer_stats();
UpdateProtoTime(stats->mutable_min_partitions_last_read_time(), partResult.GetLagsInfo().GetLastReadTimestampMs(), true);
UpdateProtoTime(stats->mutable_max_read_time_lag(), partResult.GetLagsInfo().GetReadLagMs(), false);
UpdateProtoTime(stats->mutable_max_write_time_lag(), partResult.GetLagsInfo().GetWriteLagMs(), false);
+ UpdateProtoTime(stats->mutable_max_committed_time_lag(), partResult.GetLagsInfo().GetCommitedLagMs(), false);
}
}
}