diff options
author | Nikolay Shestakov <tesseract@ydb.tech> | 2025-04-09 16:44:42 +0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-04-09 16:44:42 +0500 |
commit | 651a677f8d638814bb2013fc87df737a92b279c9 (patch) | |
tree | 9c63354d4b150770837357dbe88d2dd617eab91b | |
parent | 13ca27d3cd2639beb3fd541fe64b07c9dd8193b7 (diff) | |
download | ydb-651a677f8d638814bb2013fc87df737a92b279c9.tar.gz |
Added max_committed_time_lag for DescribeConsumer (#16857)
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 150 | ||||
-rw-r--r-- | ydb/core/persqueue/partition.h | 2 | ||||
-rw-r--r-- | ydb/core/persqueue/partition_monitoring.cpp | 27 | ||||
-rw-r--r-- | ydb/core/persqueue/partition_read.cpp | 12 | ||||
-rw-r--r-- | ydb/core/persqueue/user_info.h | 59 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp | 165 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/ut_with_sdk/ya.make | 1 | ||||
-rw-r--r-- | ydb/core/protos/pqconfig.proto | 2 | ||||
-rw-r--r-- | ydb/public/api/protos/ydb_topic.proto | 4 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/control_plane.h | 2 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/src/client/topic/impl/topic.cpp | 5 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp | 21 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h | 3 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/schema_actors.cpp | 5 |
14 files changed, 352 insertions, 106 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 40796d3dffc..4d12b2167ab 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -650,6 +650,54 @@ void TPartition::Handle(TEvPQ::TEvPipeDisconnected::TPtr& ev, const TActorContex } +TConsumerSnapshot TPartition::CreateSnapshot(TUserInfo& userInfo) const { + auto now = TAppData::TimeProvider->Now(); + + userInfo.UpdateReadingTimeAndState(EndOffset, now); + + TConsumerSnapshot result; + result.Now = now; + + if (userInfo.Offset >= static_cast<i64>(EndOffset)) { + result.LastCommittedMessage.CreateTimestamp = now; + result.LastCommittedMessage.WriteTimestamp = now; + } else if (userInfo.ActualTimestamps) { + result.LastCommittedMessage.CreateTimestamp = userInfo.CreateTimestamp; + result.LastCommittedMessage.WriteTimestamp = userInfo.WriteTimestamp; + } else { + auto timestamp = GetWriteTimeEstimate(userInfo.Offset); + result.LastCommittedMessage.CreateTimestamp = timestamp; + result.LastCommittedMessage.WriteTimestamp = timestamp; + } + + auto readOffset = userInfo.GetReadOffset(); + + result.ReadOffset = readOffset; + result.LastReadTimestamp = userInfo.ReadTimestamp; + + if (readOffset >= static_cast<i64>(EndOffset)) { + result.LastReadMessage.CreateTimestamp = now; + result.LastReadMessage.WriteTimestamp = now; + } else if (userInfo.ReadOffset == -1) { + result.LastReadMessage = result.LastCommittedMessage; + } else if (userInfo.ReadWriteTimestamp) { + result.LastReadMessage.CreateTimestamp = userInfo.ReadCreateTimestamp; + result.LastReadMessage.WriteTimestamp = userInfo.ReadWriteTimestamp; + } else { + auto timestamp = GetWriteTimeEstimate(readOffset); + result.LastCommittedMessage.CreateTimestamp = timestamp; + result.LastCommittedMessage.WriteTimestamp = timestamp; + } + + if (readOffset < (i64)EndOffset) { + result.ReadLag = result.LastReadTimestamp - result.LastReadMessage.WriteTimestamp; + } + result.CommitedLag = result.LastCommittedMessage.WriteTimestamp - now; + result.TotalLag = TDuration::MilliSeconds(userInfo.GetWriteLagMs()) + result.ReadLag + (now - result.LastReadTimestamp); + + return result; +} + void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext& ctx) { const auto now = ctx.Now(); @@ -696,7 +744,7 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext bool filterConsumers = !ev->Get()->Consumers.empty(); TSet<TString> requiredConsumers(ev->Get()->Consumers.begin(), ev->Get()->Consumers.end()); for (auto& userInfoPair : UsersInfoStorage->GetAll()) { - const auto& userInfo = userInfoPair.second; + auto& userInfo = userInfoPair.second; auto& clientId = ev->Get()->ClientId; bool consumerShouldBeProcessed = filterConsumers ? requiredConsumers.contains(userInfo.User) @@ -719,46 +767,32 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext continue; } - auto estimateWriteTimestamp = [&]() { - auto timestamp = userInfo.GetWriteTimestamp(EndOffset); - if (!timestamp) { - timestamp = GetWriteTimeEstimate(userInfo.Offset); - } - return timestamp; - }; - - auto estimateReadWriteTimestamp = [&]() { - auto timestamp = userInfo.GetReadWriteTimestamp(EndOffset); - if (!timestamp) { - timestamp = GetWriteTimeEstimate(userInfo.GetReadOffset()); - }; - return timestamp; - }; - if (clientId == userInfo.User) { //fill lags NKikimrPQ::TClientInfo* clientInfo = result.MutableLagsInfo(); clientInfo->SetClientId(userInfo.User); + auto snapshot = CreateSnapshot(userInfo); + auto write = clientInfo->MutableWritePosition(); write->SetOffset(userInfo.Offset); - write->SetWriteTimestamp(estimateWriteTimestamp().MilliSeconds()); - write->SetCreateTimestamp(userInfo.GetCreateTimestamp(EndOffset).MilliSeconds()); + write->SetWriteTimestamp(snapshot.LastCommittedMessage.WriteTimestamp.MilliSeconds()); + write->SetCreateTimestamp(snapshot.LastCommittedMessage.CreateTimestamp.MilliSeconds()); write->SetSize(GetSizeLag(userInfo.Offset)); - auto read = clientInfo->MutableReadPosition(); - read->SetOffset(userInfo.GetReadOffset()); - read->SetWriteTimestamp(estimateReadWriteTimestamp().MilliSeconds()); - read->SetCreateTimestamp(userInfo.GetReadCreateTimestamp(EndOffset).MilliSeconds()); - read->SetSize(GetSizeLag(userInfo.GetReadOffset())); + auto readOffset = userInfo.GetReadOffset(); - clientInfo->SetLastReadTimestampMs(userInfo.GetReadTimestamp().MilliSeconds()); - if (IsActive() || userInfo.GetReadOffset() < (i64)EndOffset) { - clientInfo->SetReadLagMs(userInfo.GetReadOffset() < (i64)EndOffset - ? (userInfo.GetReadTimestamp() - TInstant::MilliSeconds(read->GetWriteTimestamp())).MilliSeconds() - : 0); + auto read = clientInfo->MutableReadPosition(); + read->SetOffset(readOffset); + read->SetWriteTimestamp(snapshot.LastReadMessage.WriteTimestamp.MilliSeconds()); + read->SetCreateTimestamp(snapshot.LastReadMessage.CreateTimestamp.MilliSeconds()); + read->SetSize(GetSizeLag(readOffset)); + + clientInfo->SetLastReadTimestampMs(snapshot.LastReadTimestamp.MilliSeconds()); + clientInfo->SetCommitedLagMs(snapshot.CommitedLag.MilliSeconds()); + if (IsActive() || readOffset < (i64)EndOffset) { + clientInfo->SetReadLagMs(snapshot.ReadLag.MilliSeconds()); clientInfo->SetWriteLagMs(userInfo.GetWriteLagMs()); - ui64 totalLag = clientInfo->GetReadLagMs() + userInfo.GetWriteLagMs() + (now - userInfo.GetReadTimestamp()).MilliSeconds(); - clientInfo->SetTotalLagMs(totalLag); + clientInfo->SetTotalLagMs(snapshot.TotalLag.MilliSeconds()); } else { clientInfo->SetReadLagMs(0); clientInfo->SetWriteLagMs(0); @@ -767,14 +801,16 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext } if (ev->Get()->GetStatForAllConsumers) { //fill lags + auto snapshot = CreateSnapshot(userInfo); + auto* clientInfo = result.AddConsumerResult(); clientInfo->SetConsumer(userInfo.User); clientInfo->SetLastReadTimestampMs(userInfo.GetReadTimestamp().MilliSeconds()); + clientInfo->SetCommitedLagMs(snapshot.CommitedLag.MilliSeconds()); - if (IsActive() || userInfo.GetReadOffset() < (i64)EndOffset) { - clientInfo->SetReadLagMs(userInfo.GetReadOffset() < (i64)EndOffset - ? (userInfo.GetReadTimestamp() - estimateReadWriteTimestamp()).MilliSeconds() - : 0); + auto readOffset = userInfo.GetReadOffset(); + if (IsActive() || readOffset < (i64)EndOffset) { + clientInfo->SetReadLagMs(snapshot.ReadLag.MilliSeconds()); clientInfo->SetWriteLagMs(userInfo.GetWriteLagMs()); } else { clientInfo->SetReadLagMs(0); @@ -862,20 +898,22 @@ void TPartition::Handle(TEvPQ::TEvGetPartitionClientInfo::TPtr& ev, const TActor result.SetEndOffset(EndOffset); result.SetResponseTimestamp(ctx.Now().MilliSeconds()); for (auto& pr : UsersInfoStorage->GetAll()) { + auto snapshot = CreateSnapshot(pr.second); + TUserInfo& userInfo(pr.second); NKikimrPQ::TClientInfo& clientInfo = *result.AddClientInfo(); clientInfo.SetClientId(pr.first); auto& write = *clientInfo.MutableWritePosition(); write.SetOffset(userInfo.Offset); - write.SetWriteTimestamp((userInfo.GetWriteTimestamp(EndOffset) ? userInfo.GetWriteTimestamp(EndOffset) : GetWriteTimeEstimate(userInfo.Offset)).MilliSeconds()); - write.SetCreateTimestamp(userInfo.GetCreateTimestamp(EndOffset).MilliSeconds()); + write.SetWriteTimestamp(snapshot.LastCommittedMessage.WriteTimestamp.MilliSeconds()); + write.SetCreateTimestamp(snapshot.LastCommittedMessage.CreateTimestamp.MilliSeconds()); write.SetSize(GetSizeLag(userInfo.Offset)); auto& read = *clientInfo.MutableReadPosition(); read.SetOffset(userInfo.GetReadOffset()); - read.SetWriteTimestamp((userInfo.GetReadWriteTimestamp(EndOffset) ? userInfo.GetReadWriteTimestamp(EndOffset) : GetWriteTimeEstimate(userInfo.GetReadOffset())).MilliSeconds()); - read.SetCreateTimestamp(userInfo.GetReadCreateTimestamp(EndOffset).MilliSeconds()); + read.SetWriteTimestamp(snapshot.LastReadMessage.WriteTimestamp.MilliSeconds()); + read.SetCreateTimestamp(snapshot.LastReadMessage.CreateTimestamp.MilliSeconds()); read.SetSize(GetSizeLag(userInfo.GetReadOffset())); } ctx.Send(ev->Get()->Sender, response.Release(), 0, ev->Cookie); @@ -1528,37 +1566,31 @@ bool TPartition::UpdateCounters(const TActorContext& ctx, bool force) { if (userInfoPair.first != CLIENTID_WITHOUT_CONSUMER && !userInfo.HasReadRule && !userInfo.Important) continue; bool haveChanges = false; - userInfo.UpdateReadingTimeAndState(EndOffset, now); - ui64 ts = userInfo.GetWriteTimestamp(EndOffset).MilliSeconds(); + auto snapshot = CreateSnapshot(userInfo); + auto ts = snapshot.LastCommittedMessage.WriteTimestamp.MilliSeconds(); if (ts < MIN_TIMESTAMP_MS) ts = Max<i64>(); if (userInfo.LabeledCounters->GetCounters()[METRIC_COMMIT_WRITE_TIME].Get() != ts) { haveChanges = true; userInfo.LabeledCounters->GetCounters()[METRIC_COMMIT_WRITE_TIME].Set(ts); } - ts = userInfo.GetCreateTimestamp(EndOffset).MilliSeconds(); + ts = snapshot.LastCommittedMessage.CreateTimestamp.MilliSeconds(); if (ts < MIN_TIMESTAMP_MS) ts = Max<i64>(); if (userInfo.LabeledCounters->GetCounters()[METRIC_COMMIT_CREATE_TIME].Get() != ts) { haveChanges = true; userInfo.LabeledCounters->GetCounters()[METRIC_COMMIT_CREATE_TIME].Set(ts); } - ts = userInfo.GetReadWriteTimestamp(EndOffset).MilliSeconds(); - if (userInfo.LabeledCounters->GetCounters()[METRIC_READ_WRITE_TIME].Get() != ts) { + auto readWriteTimestamp = snapshot.LastReadMessage.WriteTimestamp; + if (userInfo.LabeledCounters->GetCounters()[METRIC_READ_WRITE_TIME].Get() != readWriteTimestamp.MilliSeconds()) { haveChanges = true; - userInfo.LabeledCounters->GetCounters()[METRIC_READ_WRITE_TIME].Set(ts); + userInfo.LabeledCounters->GetCounters()[METRIC_READ_WRITE_TIME].Set(readWriteTimestamp.MilliSeconds()); } - i64 off = userInfo.GetReadOffset(); //we want to track first not-readed offset - TInstant wts = userInfo.GetReadWriteTimestamp(EndOffset) ? userInfo.GetReadWriteTimestamp(EndOffset) : GetWriteTimeEstimate(userInfo.GetReadOffset()); - TInstant readTimestamp = userInfo.GetReadTimestamp(); - ui64 readTimeLag = off >= (i64)EndOffset ? 0 : (readTimestamp - wts).MilliSeconds(); - ui64 totalLag = userInfo.GetWriteLagMs() + readTimeLag + (now - readTimestamp).MilliSeconds(); - - if (userInfo.LabeledCounters->GetCounters()[METRIC_READ_TOTAL_TIME].Get() != totalLag) { + if (userInfo.LabeledCounters->GetCounters()[METRIC_READ_TOTAL_TIME].Get() != snapshot.TotalLag.MilliSeconds()) { haveChanges = true; - userInfo.LabeledCounters->GetCounters()[METRIC_READ_TOTAL_TIME].Set(totalLag); + userInfo.LabeledCounters->GetCounters()[METRIC_READ_TOTAL_TIME].Set(snapshot.TotalLag.MilliSeconds()); } - ts = readTimestamp.MilliSeconds(); + ts = snapshot.LastReadTimestamp.MilliSeconds(); if (userInfo.LabeledCounters->GetCounters()[METRIC_LAST_READ_TIME].Get() != ts) { haveChanges = true; userInfo.LabeledCounters->GetCounters()[METRIC_LAST_READ_TIME].Set(ts); @@ -1570,9 +1602,9 @@ bool TPartition::UpdateCounters(const TActorContext& ctx, bool force) { userInfo.LabeledCounters->GetCounters()[METRIC_WRITE_TIME_LAG].Set(timeLag); } - if (userInfo.LabeledCounters->GetCounters()[METRIC_READ_TIME_LAG].Get() != readTimeLag) { + if (userInfo.LabeledCounters->GetCounters()[METRIC_READ_TIME_LAG].Get() != snapshot.ReadLag.MilliSeconds()) { haveChanges = true; - userInfo.LabeledCounters->GetCounters()[METRIC_READ_TIME_LAG].Set(readTimeLag); + userInfo.LabeledCounters->GetCounters()[METRIC_READ_TIME_LAG].Set(snapshot.ReadLag.MilliSeconds()); } if (userInfo.LabeledCounters->GetCounters()[METRIC_COMMIT_MESSAGE_LAG].Get() != EndOffset - userInfo.Offset) { @@ -1580,10 +1612,10 @@ bool TPartition::UpdateCounters(const TActorContext& ctx, bool force) { userInfo.LabeledCounters->GetCounters()[METRIC_COMMIT_MESSAGE_LAG].Set(EndOffset - userInfo.Offset); } - if (userInfo.LabeledCounters->GetCounters()[METRIC_READ_MESSAGE_LAG].Get() != EndOffset - off) { + if (userInfo.LabeledCounters->GetCounters()[METRIC_READ_MESSAGE_LAG].Get() != EndOffset - snapshot.ReadOffset) { haveChanges = true; - userInfo.LabeledCounters->GetCounters()[METRIC_READ_MESSAGE_LAG].Set(EndOffset - off); - userInfo.LabeledCounters->GetCounters()[METRIC_READ_TOTAL_MESSAGE_LAG].Set(EndOffset - off); + userInfo.LabeledCounters->GetCounters()[METRIC_READ_MESSAGE_LAG].Set(EndOffset - snapshot.ReadOffset); + userInfo.LabeledCounters->GetCounters()[METRIC_READ_TOTAL_MESSAGE_LAG].Set(EndOffset - snapshot.ReadOffset); } ui64 sizeLag = GetSizeLag(userInfo.Offset); diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index b801c87ab2c..0752dcde37c 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -456,6 +456,8 @@ private: ui64 GetReadOffset(ui64 offset, TMaybe<TInstant> readTimestamp) const; + TConsumerSnapshot CreateSnapshot(TUserInfo& userInfo) const; + public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::PERSQUEUE_PARTITION_ACTOR; diff --git a/ydb/core/persqueue/partition_monitoring.cpp b/ydb/core/persqueue/partition_monitoring.cpp index 582afdb234b..b4d8b61769e 100644 --- a/ydb/core/persqueue/partition_monitoring.cpp +++ b/ydb/core/persqueue/partition_monitoring.cpp @@ -232,20 +232,21 @@ void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorCo } } TABLEBODY() { - for (auto& d: UsersInfoStorage->GetAll()) { + for (auto& [user, userInfo]: UsersInfoStorage->GetAll()) { + auto snapshot = CreateSnapshot(userInfo); TABLER() { - TABLED() {out << EncodeHtmlPcdata(d.first);} - TABLED() {out << d.second.Offset;} - TABLED() {out << (EndOffset - d.second.Offset);} - TABLED() {out << ToStringLocalTimeUpToSeconds(d.second.ReadFromTimestamp);} - TABLED() {out << ToStringLocalTimeUpToSeconds(d.second.WriteTimestamp);} - TABLED() {out << ToStringLocalTimeUpToSeconds(d.second.CreateTimestamp);} - TABLED() {out << (d.second.GetReadOffset());} - TABLED() {out << ToStringLocalTimeUpToSeconds(d.second.GetReadWriteTimestamp(EndOffset));} - TABLED() {out << ToStringLocalTimeUpToSeconds(d.second.GetReadCreateTimestamp(EndOffset));} - TABLED() {out << (d.second.ReadOffsetRewindSum);} - TABLED() {out << d.second.ActiveReads;} - TABLED() {out << d.second.Subscriptions;} + TABLED() {out << EncodeHtmlPcdata(user);} + TABLED() {out << userInfo.Offset;} + TABLED() {out << (EndOffset - userInfo.Offset);} + TABLED() {out << ToStringLocalTimeUpToSeconds(userInfo.ReadFromTimestamp);} + TABLED() {out << ToStringLocalTimeUpToSeconds(snapshot.LastCommittedMessage.WriteTimestamp);} + TABLED() {out << ToStringLocalTimeUpToSeconds(snapshot.LastCommittedMessage.WriteTimestamp);} + TABLED() {out << (userInfo.GetReadOffset());} + TABLED() {out << ToStringLocalTimeUpToSeconds(snapshot.LastReadMessage.WriteTimestamp);} + TABLED() {out << ToStringLocalTimeUpToSeconds(snapshot.LastReadMessage.CreateTimestamp);} + TABLED() {out << (userInfo.ReadOffsetRewindSum);} + TABLED() {out << userInfo.ActiveReads;} + TABLED() {out << userInfo.Subscriptions;} } } } diff --git a/ydb/core/persqueue/partition_read.cpp b/ydb/core/persqueue/partition_read.cpp index ba9355f62c4..f7af353f3e8 100644 --- a/ydb/core/persqueue/partition_read.cpp +++ b/ydb/core/persqueue/partition_read.cpp @@ -281,15 +281,13 @@ void TPartition::Handle(TEvPQ::TEvPartitionOffsets::TPtr& ev, const TActorContex if (!ev->Get()->ClientId.empty()) { TUserInfo* userInfo = UsersInfoStorage->GetIfExists(ev->Get()->ClientId); if (userInfo) { - i64 offset = Max<i64>(userInfo->Offset, 0); + auto snapshot = CreateSnapshot(*userInfo); result.SetClientOffset(userInfo->Offset); - TInstant tmp = userInfo->GetWriteTimestamp(EndOffset) ? userInfo->GetWriteTimestamp(EndOffset) : GetWriteTimeEstimate(offset); - result.SetWriteTimestampMS(tmp.MilliSeconds()); - result.SetCreateTimestampMS(userInfo->GetCreateTimestamp(EndOffset).MilliSeconds()); + result.SetWriteTimestampMS(snapshot.LastCommittedMessage.WriteTimestamp.MilliSeconds()); + result.SetCreateTimestampMS(snapshot.LastCommittedMessage.CreateTimestamp.MilliSeconds()); result.SetClientReadOffset(userInfo->GetReadOffset()); - tmp = userInfo->GetReadWriteTimestamp(EndOffset) ? userInfo->GetReadWriteTimestamp(EndOffset) : GetWriteTimeEstimate(userInfo->GetReadOffset()); - result.SetReadWriteTimestampMS(tmp.MilliSeconds()); - result.SetReadCreateTimestampMS(userInfo->GetReadCreateTimestamp(EndOffset).MilliSeconds()); + result.SetReadWriteTimestampMS(snapshot.LastReadMessage.WriteTimestamp.MilliSeconds()); + result.SetReadCreateTimestampMS(snapshot.LastReadMessage.CreateTimestamp.MilliSeconds()); } } ctx.Send(ev->Get()->Sender, new TEvPQ::TEvPartitionOffsetsResponse(result, Partition)); diff --git a/ydb/core/persqueue/user_info.h b/ydb/core/persqueue/user_info.h index 5aaad167d9f..86559394bd0 100644 --- a/ydb/core/persqueue/user_info.h +++ b/ydb/core/persqueue/user_info.h @@ -37,6 +37,25 @@ static const TString CLIENTID_WITHOUT_CONSUMER = "$without_consumer"; typedef TProtobufTabletLabeledCounters<EClientLabeledCounters_descriptor> TUserLabeledCounters; +struct TMessageInfo { + TInstant CreateTimestamp; + TInstant WriteTimestamp; +}; + +struct TConsumerSnapshot { + TInstant Now; + + TMessageInfo LastCommittedMessage; + + i64 ReadOffset; + TInstant LastReadTimestamp; + TMessageInfo LastReadMessage; + + TDuration ReadLag; + TDuration CommitedLag; + TDuration TotalLag; +}; + struct TUserInfoBase { TString User; ui64 ReadRuleGeneration = 0; @@ -55,13 +74,20 @@ struct TUserInfoBase { }; struct TUserInfo: public TUserInfoBase { + bool ActualTimestamps = false; + // WriteTimestamp of the last committed message TInstant WriteTimestamp; + // CreateTimestamp of the last committed message TInstant CreateTimestamp; + + // Timstamp of the last read TInstant ReadTimestamp; - bool ActualTimestamps = false; i64 ReadOffset = -1; + + // WriteTimestamp of the last read message TInstant ReadWriteTimestamp; + // CreateTimestamp of the last read message TInstant ReadCreateTimestamp; ui64 ReadOffsetRewindSum = 0; @@ -175,13 +201,13 @@ struct TUserInfo: public TUserInfoBase { ) : TUserInfoBase{user, readRuleGeneration, session, gen, step, offset, anyCommits, important, readFromTimestamp, partitionSession, pipeClient} - , WriteTimestamp(TAppData::TimeProvider->Now()) - , CreateTimestamp(TAppData::TimeProvider->Now()) - , ReadTimestamp(TAppData::TimeProvider->Now()) , ActualTimestamps(false) + , WriteTimestamp(TInstant::Zero()) + , CreateTimestamp(TInstant::Zero()) + , ReadTimestamp(TAppData::TimeProvider->Now()) , ReadOffset(-1) - , ReadWriteTimestamp(TAppData::TimeProvider->Now()) - , ReadCreateTimestamp(TAppData::TimeProvider->Now()) + , ReadWriteTimestamp(TInstant::Zero()) + , ReadCreateTimestamp(TInstant::Zero()) , ReadOffsetRewindSum(readOffsetRewindSum) , ReadScheduled(false) , HasReadRule(false) @@ -334,30 +360,9 @@ struct TUserInfo: public TUserInfoBase { return ReadTimestamp; } - TInstant GetWriteTimestamp(i64 endOffset) const { - return Offset == endOffset ? TAppData::TimeProvider->Now() : WriteTimestamp; - } - - TInstant GetCreateTimestamp(i64 endOffset) const { - return Offset == endOffset ? TAppData::TimeProvider->Now() : CreateTimestamp; - } - - TInstant GetReadWriteTimestamp(i64 endOffset) const { - TInstant ts = ReadOffset == -1 ? WriteTimestamp : ReadWriteTimestamp; - ts = GetReadOffset() >= endOffset ? TAppData::TimeProvider->Now() : ts; - return ts; - } - ui64 GetWriteLagMs() const { return WriteLagMs.GetValue(); } - - TInstant GetReadCreateTimestamp(i64 endOffset) const { - TInstant ts = ReadOffset == -1 ? CreateTimestamp : ReadCreateTimestamp; - ts = GetReadOffset() >= endOffset ? TAppData::TimeProvider->Now() : ts; - return ts; - } - }; class TUsersInfoStorage { diff --git a/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp b/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp new file mode 100644 index 00000000000..da5d349a71f --- /dev/null +++ b/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp @@ -0,0 +1,165 @@ +#include <ydb/core/persqueue/ut/common/autoscaling_ut_common.h> + +#include <ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h> + +#include <library/cpp/testing/unittest/registar.h> +#include <ydb/core/persqueue/partition_key_range/partition_key_range.h> +#include <ydb/core/persqueue/partition_scale_manager.h> +#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h> +#include <ydb/core/tx/schemeshard/ut_helpers/test_env.h> + +#include <util/stream/output.h> + +namespace NKikimr { + +using namespace NYdb::NTopic; +using namespace NYdb::NTopic::NTests; +using namespace NSchemeShardUT_Private; +using namespace NKikimr::NPQ::NTest; + +#define UNIT_ASSERT_TIME_EQUAL(A, B, D) \ + do { \ + if (!(((A - B) >= TDuration::Zero()) && ((A - B) <= D)) \ + && !(((B - A) >= TDuration::Zero()) && ((B - A) <= D))) { \ + auto&& failMsg = Sprintf("%s and %s diferent more then %s", (::TStringBuilder() << A).data(), \ + (::TStringBuilder() << B).data(), (::TStringBuilder() << D).data()); \ + UNIT_FAIL_IMPL("assertion failure", failMsg); \ + } \ + } while (false) + + +Y_UNIT_TEST_SUITE(WithSDK) { + + Y_UNIT_TEST(DescribeConsumer) { + TTopicSdkTestSetup setup = CreateSetup(); + setup.CreateTopic(std::string{TEST_TOPIC}, std::string{TEST_CONSUMER}, 1); + + auto describe = [&]() { + return setup.DescribeConsumer(TString{TEST_TOPIC}, TString{TEST_CONSUMER}); + }; + + auto write = [&](size_t seqNo) { + TTopicClient client(setup.MakeDriver()); + + TWriteSessionSettings settings; + settings.Path(TEST_TOPIC); + settings.PartitionId(0); + settings.DeduplicationEnabled(false); + auto session = client.CreateSimpleBlockingWriteSession(settings); + + TWriteMessage msg(TStringBuilder() << "message_" << seqNo); + msg.CreateTimestamp(TInstant::Now() - TDuration::Seconds(10 - seqNo)); + UNIT_ASSERT(session->Write(std::move(msg))); + + session->Close(TDuration::Seconds(5)); + }; + + // Check describe for empty topic + { + auto d = describe(); + UNIT_ASSERT_STRINGS_EQUAL(TEST_CONSUMER, d.GetConsumer().GetConsumerName()); + UNIT_ASSERT_VALUES_EQUAL(1, d.GetPartitions().size()); + auto& p = d.GetPartitions()[0]; + UNIT_ASSERT_VALUES_EQUAL(0, p.GetPartitionId()); + UNIT_ASSERT_VALUES_EQUAL(true, p.GetActive()); + UNIT_ASSERT_VALUES_EQUAL(0, p.GetPartitionStats()->GetEndOffset()); + auto& c = p.GetPartitionConsumerStats(); + UNIT_ASSERT_VALUES_EQUAL(true, c.has_value()); + UNIT_ASSERT_VALUES_EQUAL(0, c->GetCommittedOffset()); + UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxWriteTimeLag()); + UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxReadTimeLag()); + UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxCommittedTimeLag()); + UNIT_ASSERT_TIME_EQUAL(TInstant::Now(), c->GetLastReadTime(), TDuration::Seconds(3)); // why not zero? + UNIT_ASSERT_VALUES_EQUAL(0, c->GetLastReadOffset()); + } + + write(3); + write(7); + + // Check describe for topic which contains messages, but consumer hasn`t read + { + auto d = describe(); + UNIT_ASSERT_STRINGS_EQUAL(TEST_CONSUMER, d.GetConsumer().GetConsumerName()); + UNIT_ASSERT_VALUES_EQUAL(1, d.GetPartitions().size()); + auto& p = d.GetPartitions()[0]; + UNIT_ASSERT_VALUES_EQUAL(0, p.GetPartitionId()); + UNIT_ASSERT_VALUES_EQUAL(true, p.GetActive()); + UNIT_ASSERT_VALUES_EQUAL(2, p.GetPartitionStats()->GetEndOffset()); + auto& c = p.GetPartitionConsumerStats(); + UNIT_ASSERT_VALUES_EQUAL(true, c.has_value()); + UNIT_ASSERT_VALUES_EQUAL(0, c->GetCommittedOffset()); + UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(7), c->GetMaxWriteTimeLag()); // + UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxReadTimeLag()); + UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxCommittedTimeLag()); + UNIT_ASSERT_TIME_EQUAL(TInstant::Now(), c->GetLastReadTime(), TDuration::Seconds(3)); // why not zero? + UNIT_ASSERT_VALUES_EQUAL(1, c->GetLastReadOffset()); + } + + UNIT_ASSERT(setup.Commit(TString{TEST_TOPIC}, TEST_CONSUMER, 0, 1).IsSuccess()); + + // Check describe for topic whis contains messages, has commited offset but hasn`t read (restart tablet for example) + { + auto d = describe(); + UNIT_ASSERT_STRINGS_EQUAL(TEST_CONSUMER, d.GetConsumer().GetConsumerName()); + UNIT_ASSERT_VALUES_EQUAL(1, d.GetPartitions().size()); + auto& p = d.GetPartitions()[0]; + UNIT_ASSERT_VALUES_EQUAL(0, p.GetPartitionId()); + UNIT_ASSERT_VALUES_EQUAL(true, p.GetActive()); + UNIT_ASSERT_VALUES_EQUAL(2, p.GetPartitionStats()->GetEndOffset()); + auto& c = p.GetPartitionConsumerStats(); + UNIT_ASSERT_VALUES_EQUAL(true, c.has_value()); + UNIT_ASSERT_VALUES_EQUAL(1, c->GetCommittedOffset()); + UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(7), c->GetMaxWriteTimeLag()); + UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxReadTimeLag()); + UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxCommittedTimeLag()); + UNIT_ASSERT_TIME_EQUAL(TInstant::Now(), c->GetLastReadTime(), TDuration::Seconds(3)); // why not zero? + UNIT_ASSERT_VALUES_EQUAL(1, c->GetLastReadOffset()); + } + + { + TTopicClient client(setup.MakeDriver()); + TReadSessionSettings settings; + settings.ConsumerName(TEST_CONSUMER); + settings.AppendTopics(TTopicReadSettings().Path(TEST_TOPIC)); + + auto session = client.CreateReadSession(settings); + + TInstant endTime = TInstant::Now() + TDuration::Seconds(5); + while (true) { + auto e = session->GetEvent(); + if (e) { + Cerr << ">>>>> Event = " << e->index() << Endl << Flush; + } + if (e && std::holds_alternative<TReadSessionEvent::TDataReceivedEvent>(e.value())) { + // we must recive only one date event with second message + break; + } else if (e && std::holds_alternative<TReadSessionEvent::TStartPartitionSessionEvent>(e.value())) { + std::get<TReadSessionEvent::TStartPartitionSessionEvent>(e.value()).Confirm(); + } + UNIT_ASSERT_C(endTime > TInstant::Now(), "Unable wait"); + } + } + + // Check describe for topic wich contains messages, has commited offset of first message and read second message + { + auto d = describe(); + UNIT_ASSERT_STRINGS_EQUAL(TEST_CONSUMER, d.GetConsumer().GetConsumerName()); + UNIT_ASSERT_VALUES_EQUAL(1, d.GetPartitions().size()); + auto& p = d.GetPartitions()[0]; + UNIT_ASSERT_VALUES_EQUAL(0, p.GetPartitionId()); + UNIT_ASSERT_VALUES_EQUAL(true, p.GetActive()); + UNIT_ASSERT_VALUES_EQUAL(2, p.GetPartitionStats()->GetEndOffset()); + auto& c = p.GetPartitionConsumerStats(); + UNIT_ASSERT_VALUES_EQUAL(true, c.has_value()); + UNIT_ASSERT_VALUES_EQUAL(1, c->GetCommittedOffset()); + UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(7), c->GetMaxWriteTimeLag()); + UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxReadTimeLag()); + UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxCommittedTimeLag()); + UNIT_ASSERT_TIME_EQUAL(TInstant::Now(), c->GetLastReadTime(), TDuration::Seconds(3)); + UNIT_ASSERT_VALUES_EQUAL(2, c->GetLastReadOffset()); + } + + } +} + +} // namespace NKikimr diff --git a/ydb/core/persqueue/ut/ut_with_sdk/ya.make b/ydb/core/persqueue/ut/ut_with_sdk/ya.make index fdb6c2ff405..eb370dff024 100644 --- a/ydb/core/persqueue/ut/ut_with_sdk/ya.make +++ b/ydb/core/persqueue/ut/ut_with_sdk/ya.make @@ -32,6 +32,7 @@ SRCS( autoscaling_ut.cpp balancing_ut.cpp mirrorer_ut.cpp + topic_ut.cpp ) END() diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index 2f27923c359..b2c682c588f 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -751,6 +751,7 @@ message TClientInfo { optional uint64 ReadLagMs = 5; optional uint64 LastReadTimestampMs = 8; optional uint64 TotalLagMs = 9; + optional uint64 CommitedLagMs = 10; } @@ -851,6 +852,7 @@ message TStatusResponse { optional uint64 CommitedOffset = 9; optional bool ReadingFinished = 10; + optional uint64 CommitedLagMs = 11; } diff --git a/ydb/public/api/protos/ydb_topic.proto b/ydb/public/api/protos/ydb_topic.proto index a45a8c67314..5ac93836d15 100644 --- a/ydb/public/api/protos/ydb_topic.proto +++ b/ydb/public/api/protos/ydb_topic.proto @@ -810,6 +810,8 @@ message Consumer { google.protobuf.Duration max_read_time_lag = 2; // Maximum of differences between write timestamp and create timestamp for all messages, read during last minute. google.protobuf.Duration max_write_time_lag = 3; + // The difference between the write timestamp of the last commited message and the current time. + google.protobuf.Duration max_committed_time_lag = 5; // Bytes read statistics. MultipleWindowsStat bytes_read = 4; } @@ -1214,6 +1216,8 @@ message DescribeConsumerResult { google.protobuf.Duration max_read_time_lag = 6; // Maximum of differences between write timestamp and create timestamp for all messages, read during last minute. google.protobuf.Duration max_write_time_lag = 7; + // The difference between the write timestamp of the last commited message and the current time. + google.protobuf.Duration max_committed_time_lag = 13; // How much bytes were read during several windows statistics from this partition. MultipleWindowsStat bytes_read = 8; diff --git a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/control_plane.h b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/control_plane.h index c776e3f5b45..09611b1e07c 100644 --- a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/control_plane.h +++ b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/control_plane.h @@ -111,6 +111,7 @@ public: const TInstant& GetLastReadTime() const; const TDuration& GetMaxReadTimeLag() const; const TDuration& GetMaxWriteTimeLag() const; + const TDuration& GetMaxCommittedTimeLag() const; private: uint64_t CommittedOffset_; @@ -120,6 +121,7 @@ private: TInstant LastReadTime_; TDuration MaxReadTimeLag_; TDuration MaxWriteTimeLag_; + TDuration MaxCommittedTimeLag_; }; // Topic partition location diff --git a/ydb/public/sdk/cpp/src/client/topic/impl/topic.cpp b/ydb/public/sdk/cpp/src/client/topic/impl/topic.cpp index 280cd800b59..1717f29cd7e 100644 --- a/ydb/public/sdk/cpp/src/client/topic/impl/topic.cpp +++ b/ydb/public/sdk/cpp/src/client/topic/impl/topic.cpp @@ -376,6 +376,7 @@ TPartitionConsumerStats::TPartitionConsumerStats(const Ydb::Topic::DescribeConsu , LastReadTime_(TInstant::Seconds(partitionStats.last_read_time().seconds())) , MaxReadTimeLag_(TDuration::Seconds(partitionStats.max_read_time_lag().seconds())) , MaxWriteTimeLag_(TDuration::Seconds(partitionStats.max_write_time_lag().seconds())) + , MaxCommittedTimeLag_(TDuration::Seconds(partitionStats.max_committed_time_lag().seconds())) {} uint64_t TPartitionConsumerStats::GetCommittedOffset() const { @@ -406,6 +407,10 @@ const TDuration& TPartitionConsumerStats::GetMaxWriteTimeLag() const { return MaxWriteTimeLag_; } +const TDuration& TPartitionConsumerStats::GetMaxCommittedTimeLag() const { + return MaxCommittedTimeLag_; +} + TPartitionLocation::TPartitionLocation(const Ydb::Topic::PartitionLocation& partitionLocation) : NodeId_(partitionLocation.node_id()) , Generation_(partitionLocation.generation()) diff --git a/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp b/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp index c3253d1ea76..b61ec85322e 100644 --- a/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp +++ b/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp @@ -66,6 +66,27 @@ TTopicDescription TTopicSdkTestSetup::DescribeTopic(const TString& path) return status.GetTopicDescription(); } +TConsumerDescription TTopicSdkTestSetup::DescribeConsumer(const TString& path, const TString& consumer) +{ + TTopicClient client(MakeDriver()); + + TDescribeConsumerSettings settings; + settings.IncludeStats(true); + settings.IncludeLocation(true); + + auto status = client.DescribeConsumer(path, consumer, settings).GetValueSync(); + UNIT_ASSERT(status.IsSuccess()); + + return status.GetConsumerDescription(); +} + +TStatus TTopicSdkTestSetup::Commit(const TString& path, const TString& consumerName, size_t partitionId, size_t offset) { + TTopicClient client(MakeDriver()); + + return client.CommitOffset(path, partitionId, consumerName, offset).GetValueSync(); +} + + TString TTopicSdkTestSetup::GetEndpoint() const { return "localhost:" + ToString(Server.GrpcPort); } diff --git a/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h b/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h index 3473b883b7c..4b6ee30e296 100644 --- a/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h +++ b/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h @@ -22,6 +22,9 @@ public: size_t maxPartitionCount = 100); TTopicDescription DescribeTopic(const TString& path = TString{TEST_TOPIC}); + TConsumerDescription DescribeConsumer(const TString& path = TString{TEST_TOPIC}, const TString& consumer = TString{TEST_CONSUMER}); + + TStatus Commit(const TString& path, const TString& consumerName, size_t partitionId, size_t offset); TString GetEndpoint() const; TString GetTopicPath(const TString& name = TString{TEST_TOPIC}) const; diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp index bfd6519c95a..fd1ac8a47a3 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.cpp +++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp @@ -874,12 +874,14 @@ void TDescribeTopicActor::ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPer SetProtoTime(stats->mutable_min_partitions_last_read_time(), cons.GetLastReadTimestampMs()); SetProtoTime(stats->mutable_max_read_time_lag(), cons.GetReadLagMs()); SetProtoTime(stats->mutable_max_write_time_lag(), cons.GetWriteLagMs()); + SetProtoTime(stats->mutable_max_committed_time_lag(), cons.GetCommitedLagMs()); } else { auto* stats = it->second->mutable_consumer_stats(); UpdateProtoTime(stats->mutable_min_partitions_last_read_time(), cons.GetLastReadTimestampMs(), true); UpdateProtoTime(stats->mutable_max_read_time_lag(), cons.GetReadLagMs(), false); UpdateProtoTime(stats->mutable_max_write_time_lag(), cons.GetWriteLagMs(), false); + UpdateProtoTime(stats->mutable_max_committed_time_lag(), cons.GetCommitedLagMs(), false); } AddWindowsStat(it->second->mutable_consumer_stats()->mutable_bytes_read(), cons.GetAvgReadSpeedPerMin(), cons.GetAvgReadSpeedPerHour(), cons.GetAvgReadSpeedPerDay()); @@ -988,6 +990,7 @@ void TDescribeConsumerActor::ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEv SetProtoTime(consStats->mutable_last_read_time(), partResult.GetLagsInfo().GetLastReadTimestampMs()); SetProtoTime(consStats->mutable_max_read_time_lag(), partResult.GetLagsInfo().GetReadLagMs()); SetProtoTime(consStats->mutable_max_write_time_lag(), partResult.GetLagsInfo().GetWriteLagMs()); + SetProtoTime(consStats->mutable_max_committed_time_lag(), partResult.GetLagsInfo().GetCommitedLagMs()); AddWindowsStat(consStats->mutable_bytes_read(), partResult.GetAvgReadSpeedPerMin(), partResult.GetAvgReadSpeedPerHour(), partResult.GetAvgReadSpeedPerDay()); @@ -997,12 +1000,14 @@ void TDescribeConsumerActor::ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEv SetProtoTime(stats->mutable_min_partitions_last_read_time(), partResult.GetLagsInfo().GetLastReadTimestampMs()); SetProtoTime(stats->mutable_max_read_time_lag(), partResult.GetLagsInfo().GetReadLagMs()); SetProtoTime(stats->mutable_max_write_time_lag(), partResult.GetLagsInfo().GetWriteLagMs()); + SetProtoTime(stats->mutable_max_committed_time_lag(), partResult.GetLagsInfo().GetCommitedLagMs()); } else { auto* stats = Result.mutable_consumer()->mutable_consumer_stats(); UpdateProtoTime(stats->mutable_min_partitions_last_read_time(), partResult.GetLagsInfo().GetLastReadTimestampMs(), true); UpdateProtoTime(stats->mutable_max_read_time_lag(), partResult.GetLagsInfo().GetReadLagMs(), false); UpdateProtoTime(stats->mutable_max_write_time_lag(), partResult.GetLagsInfo().GetWriteLagMs(), false); + UpdateProtoTime(stats->mutable_max_committed_time_lag(), partResult.GetLagsInfo().GetCommitedLagMs(), false); } } } |