diff options
author | FloatingCrowbar <komels@ydb.tech> | 2024-06-18 10:07:19 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-06-18 10:07:19 +0300 |
commit | d23ebbe747a462f1bb29afaf810ff41cb1d4cd96 (patch) | |
tree | f77ae65cff21d847b7ab85e3ecacfbab0e535c48 | |
parent | 22b4f2dbdcb83cf996a418249f28a9d391101cf9 (diff) | |
download | ydb-d23ebbe747a462f1bb29afaf810ff41cb1d4cd96.tar.gz |
Publish tx counters (#5534)
-rw-r--r-- | ydb/core/persqueue/events/internal.h | 2 | ||||
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 43 | ||||
-rw-r--r-- | ydb/core/persqueue/partition.h | 22 | ||||
-rw-r--r-- | ydb/core/persqueue/partition_init.cpp | 25 | ||||
-rw-r--r-- | ydb/core/persqueue/partition_types.h | 1 | ||||
-rw-r--r-- | ydb/core/persqueue/partition_write.cpp | 57 | ||||
-rw-r--r-- | ydb/core/persqueue/percentile_counter.cpp | 89 | ||||
-rw-r--r-- | ydb/core/persqueue/percentile_counter.h | 28 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/counters_ut.cpp | 68 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/persqueue_ut.cpp | 95 |
10 files changed, 379 insertions, 51 deletions
diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h index f80db7b3f7..bf0354ec46 100644 --- a/ydb/core/persqueue/events/internal.h +++ b/ydb/core/persqueue/events/internal.h @@ -5,6 +5,7 @@ #include <ydb/core/base/row_version.h> #include <ydb/core/protos/pqconfig.pb.h> #include <ydb/core/persqueue/blob.h> +#include <ydb/core/persqueue/percentile_counter.h> #include <ydb/core/persqueue/key.h> #include <ydb/core/persqueue/sourceid_info.h> #include <ydb/core/persqueue/metering_sink.h> @@ -1080,6 +1081,7 @@ struct TEvPQ { ui64 MessagesWrittenTotal; ui64 MessagesWrittenGrpc; TVector<ui64> MessagesSizes; + THolder<NPQ::TMultiBucketCounter> InputLags; }; struct TEvGetWriteInfoError : public TEventLocal<TEvGetWriteInfoError, EvGetWriteInfoError> { diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index a015e98fbe..23827c5f49 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -202,12 +202,6 @@ TPartition::TPartition(ui64 tabletId, const TPartitionId& partition, const TActo , InitDone(false) , NewPartition(newPartition) , Subscriber(partition, TabletCounters, Tablet) - , WriteCycleSize(0) - , WriteNewSize(0) - , WriteNewSizeInternal(0) - , WriteNewSizeUncompressed(0) - , WriteNewMessages(0) - , WriteNewMessagesInternal(0) , DiskIsFull(false) , SubDomainOutOfSpace(subDomainOutOfSpace) , HasDataReqNum(0) @@ -1018,6 +1012,7 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorCon response->MessagesWrittenTotal = MsgsWrittenTotal.Value(); response->MessagesWrittenGrpc = MsgsWrittenGrpc.Value(); response->MessagesSizes = std::move(MessageSize.GetValues()); + response->InputLags = std::move(SupportivePartitionTimeLag); ctx.Send(ev->Sender, response); } @@ -1850,6 +1845,36 @@ void TPartition::RunPersist() { AddCmdWriteConfig(PersistRequest->Record); } if (PersistRequest->Record.CmdDeleteRangeSize() || PersistRequest->Record.CmdWriteSize() || PersistRequest->Record.CmdRenameSize()) { + // Apply counters + for (const auto& writeInfo : WriteInfosApplied) { + // writeTimeLag + if (InputTimeLag && writeInfo->InputLags) { + writeInfo->InputLags->UpdateTimestamp(ctx.Now().MilliSeconds()); + for (const auto& values : writeInfo->InputLags->GetValues()) { + if (values.second) + InputTimeLag->IncFor(std::ceil(values.first), values.second); + } + } + //MessageSize + auto i = 0u; + for (auto range : MessageSize.GetRanges()) { + if (i >= writeInfo->MessagesSizes.size()) { + break; + } + MessageSize.IncFor(range, writeInfo->MessagesSizes[i++]); + } + + // Bytes Written + BytesWrittenTotal.Inc(writeInfo->BytesWrittenTotal); + BytesWrittenGrpc.Inc(writeInfo->BytesWrittenGrpc); + BytesWrittenUncompressed.Inc(writeInfo->BytesWrittenUncompressed); + // Messages written + MsgsWrittenTotal.Inc(writeInfo->MessagesWrittenTotal); + MsgsWrittenGrpc.Inc(writeInfo->MessagesWrittenTotal); + } + WriteInfosApplied.clear(); + //Done with counters. + ctx.Send(HaveWriteMsg ? BlobCache : Tablet, PersistRequest.Release()); KVWriteInProgress = true; } else { @@ -2044,7 +2069,7 @@ bool TPartition::BeginTransaction(const TEvPQ::TEvProposePartitionConfig& event) return true; } -void TPartition::CommitWriteOperations(const TTransaction& t) +void TPartition::CommitWriteOperations(TTransaction& t) { if (!t.WriteInfo) { return; @@ -2072,10 +2097,12 @@ void TPartition::CommitWriteOperations(const TTransaction& t) .IgnoreQuotaDeadline = true, .HeartbeatVersion = std::nullopt, }, std::nullopt}; + msg.Internal = true; TMessage message(std::move(msg), ctx.Now() - TInstant::Zero()); UserActionAndTxPendingCommit.emplace_front(std::move(message)); } + WriteInfosApplied.emplace_back(std::move(t.WriteInfo)); } void TPartition::CommitTransaction(TSimpleSharedPtr<TTransaction>& t) @@ -2370,7 +2397,7 @@ TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TE return EProcessResult::Continue; } -void TPartition::ExecImmediateTx(const TTransaction& t) +void TPartition::ExecImmediateTx(TTransaction& t) { --ImmediateTxCount; auto& record = t.ProposeTransaction->Record; diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index b4ddc3de15..01933c08e0 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -28,6 +28,7 @@ namespace NKikimr::NPQ { static const ui32 MAX_BLOB_PART_SIZE = 500_KB; +static const ui32 DEFAULT_BUCKET_COUNTER_MULTIPLIER = 20; using TPartitionLabeledCounters = TProtobufTabletLabeledCounters<EPartitionLabeledCounters_descriptor>; @@ -429,7 +430,7 @@ private: void Handle(TEvPQ::TEvProcessChangeOwnerRequests::TPtr& ev, const TActorContext& ctx); void StartProcessChangeOwnerRequests(const TActorContext& ctx); - void CommitWriteOperations(const TTransaction& t); + void CommitWriteOperations(TTransaction& t); void HandleOnInit(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext& ctx); void Handle(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext& ctx); @@ -704,7 +705,7 @@ private: [[nodiscard]] EProcessResult PreProcessImmediateTx(const NKikimrPQ::TEvProposeTransaction& tx); - void ExecImmediateTx(const TTransaction& tx); + void ExecImmediateTx(TTransaction& tx); EProcessResult PreProcessRequest(TRegisterMessageGroupMsg& msg); EProcessResult PreProcessRequest(TDeregisterMessageGroupMsg& msg); @@ -757,6 +758,7 @@ private: std::deque<TUserActionAndTransactionEvent> UserActionAndTransactionEvents; std::deque<TUserActionAndTransactionEvent> UserActionAndTxPendingCommit; + TVector<THolder<TEvPQ::TEvGetWriteInfoResponse>> WriteInfosApplied; THashMap<ui64, TSimpleSharedPtr<TTransaction>> TransactionsInflight; THashMap<TActorId, TSimpleSharedPtr<TTransaction>> WriteInfosToTx; @@ -814,14 +816,17 @@ private: TSubscriber Subscriber; TInstant WriteCycleStartTime; - ui32 WriteCycleSize; + ui32 WriteCycleSize = 0; ui32 WriteCycleSizeEstimate = 0; ui32 WriteKeysSizeEstimate = 0; - ui32 WriteNewSize; - ui32 WriteNewSizeInternal; - ui64 WriteNewSizeUncompressed; - ui32 WriteNewMessages; - ui32 WriteNewMessagesInternal; + ui32 WriteNewSize = 0; + ui32 WriteNewSizeFull = 0; + ui32 WriteNewSizeInternal = 0; + ui64 WriteNewSizeUncompressed = 0; + ui64 WriteNewSizeUncompressedFull = 0; + + ui32 WriteNewMessages = 0; + ui32 WriteNewMessagesInternal = 0; TInstant CurrentTimestamp; @@ -860,6 +865,7 @@ private: NSlidingWindow::TSlidingWindow<NSlidingWindow::TMaxOperation<ui64>> WriteLagMs; //ToDo - counters. THolder<TPercentileCounter> InputTimeLag; + THolder<TMultiBucketCounter> SupportivePartitionTimeLag; TPartitionHistogramWrapper MessageSize; TPercentileCounter WriteLatency; diff --git a/ydb/core/persqueue/partition_init.cpp b/ydb/core/persqueue/partition_init.cpp index 15d9964d0a..e914205436 100644 --- a/ydb/core/persqueue/partition_init.cpp +++ b/ydb/core/persqueue/partition_init.cpp @@ -8,7 +8,6 @@ namespace NKikimr::NPQ { static const ui32 LEVEL0 = 32; static const TString WRITE_QUOTA_ROOT_PATH = "write-quota"; - bool DiskIsFull(TEvKeyValue::TEvResponse::TPtr& ev); void RequestInfoRange(const TActorContext& ctx, const TActorId& dst, const TPartitionId& partition, const TString& key); void RequestDataRange(const TActorContext& ctx, const TActorId& dst, const TPartitionId& partition, const TString& key); @@ -789,7 +788,7 @@ void TPartition::SetupTopicCounters(const TActorContext& ctx) { subGroup = GetServiceCounters(counters, "pqproxy|writeInfo"); { - std::unique_ptr<TPercentileCounter> percentileCounter(new TPercentileCounter( + std::unique_ptr<TPercentileCounter> percentileCounter(new TPercentileCounter( subGroup, labels, {{"sensor", "MessageSize" + suffix}}, "Size", TVector<std::pair<ui64, TString>>{ {1_KB, "1kb"}, {5_KB, "5kb"}, {10_KB, "10kb"}, @@ -869,15 +868,21 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) { subgroups.push_back({"name", "topic.write.lag_milliseconds"}); - InputTimeLag = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter( - NPersQueue::GetCountersForTopic(counters, IsServerless), {}, - subgroups, "bin", - TVector<std::pair<ui64, TString>>{ - {100, "100"}, {200, "200"}, {500, "500"}, - {1000, "1000"}, {2000, "2000"}, {5000, "5000"}, - {10'000, "10000"}, {30'000, "30000"}, {60'000, "60000"}, - {180'000,"180000"}, {9'999'999, "999999"}}, true)); + if (IsSupportive()) { + SupportivePartitionTimeLag = MakeHolder<TMultiBucketCounter>( + TVector<ui64>{100, 200, 500, 1000, 2000, 5000, 10'000, 30'000, 60'000, 180'000, 9'999'999}, + DEFAULT_BUCKET_COUNTER_MULTIPLIER, ctx.Now().MilliSeconds()); + } else { + InputTimeLag = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter( + NPersQueue::GetCountersForTopic(counters, IsServerless), {}, + subgroups, "bin", + TVector<std::pair<ui64, TString>>{ + {100, "100"}, {200, "200"}, {500, "500"}, + {1000, "1000"}, {2000, "2000"}, {5000, "5000"}, + {10'000, "10000"}, {30'000, "30000"}, {60'000, "60000"}, + {180'000,"180000"}, {9'999'999, "999999"}}, true)); + } subgroups.back().second = "topic.write.message_size_bytes"; { std::unique_ptr<TPercentileCounter> percentileCounter(new TPercentileCounter( diff --git a/ydb/core/persqueue/partition_types.h b/ydb/core/persqueue/partition_types.h index 9ad5dded83..c43b147ecb 100644 --- a/ydb/core/persqueue/partition_types.h +++ b/ydb/core/persqueue/partition_types.h @@ -20,6 +20,7 @@ struct TWriteMsg { TMaybe<ui64> Offset; TEvPQ::TEvWrite::TMsg Msg; std::optional<ui64> InitialSeqNo; + bool Internal = false; }; struct TOwnershipMsg { diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp index 3a06980de9..2d52126041 100644 --- a/ydb/core/persqueue/partition_write.cpp +++ b/ydb/core/persqueue/partition_write.cpp @@ -312,7 +312,7 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) { ", Offset: " << offset << " is " << (already ? "already written" : "stored on disk") ); - if (PartitionWriteQuotaWaitCounter) { + if (PartitionWriteQuotaWaitCounter && !writeResponse.Internal) { PartitionWriteQuotaWaitCounter->IncFor(PartitionQuotaWaitTimeForCurrentBlob.MilliSeconds()); } if (!already && partNo + 1 == totalParts && !writeResponse.Msg.HeartbeatVersion) @@ -524,15 +524,20 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) { "TPartition::HandleWriteResponse writeNewSize# " << WriteNewSize; ); + if (SupportivePartitionTimeLag) { + SupportivePartitionTimeLag->UpdateTimestamp(now.MilliSeconds()); + } if (SplitMergeEnabled(Config)) { - SplitMergeAvgWriteBytes->Update(WriteNewSize, now); + SplitMergeAvgWriteBytes->Update(WriteNewSizeFull, now); auto needScaling = CheckScaleStatus(ctx); ChangeScaleStatusIfNeeded(needScaling); } WriteCycleSize = 0; WriteNewSize = 0; + WriteNewSizeFull = 0; WriteNewSizeInternal = 0; WriteNewSizeUncompressed = 0; + WriteNewSizeUncompressedFull = 0; WriteNewMessages = 0; WriteNewMessagesInternal = 0; UpdateWriteBufferIsFullState(now); @@ -1044,14 +1049,17 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey << ". Writing seqNo: " << sourceId.UpdatedSeqNo() << ". EndOffset: " << EndOffset << ". CurOffset: " << curOffset << ". Offset: " << poffset ); - - TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ALREADY].Increment(1); - MsgsDiscarded.Inc(); - TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ALREADY].Increment(p.Msg.Data.size()); - BytesDiscarded.Inc(p.Msg.Data.size()); + if (!p.Internal) { + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ALREADY].Increment(1); + MsgsDiscarded.Inc(); + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ALREADY].Increment(p.Msg.Data.size()); + BytesDiscarded.Inc(p.Msg.Data.size()); + } } else { - TabletCounters.Cumulative()[COUNTER_PQ_WRITE_SMALL_OFFSET].Increment(1); - TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_SMALL_OFFSET].Increment(p.Msg.Data.size()); + if (!p.Internal) { + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_SMALL_OFFSET].Increment(1); + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_SMALL_OFFSET].Increment(p.Msg.Data.size()); + } } TString().swap(p.Msg.Data); @@ -1153,14 +1161,17 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey ctx.Send(Tablet, new TEvents::TEvPoisonPill()); return false; } - - WriteNewSize += p.Msg.SourceId.size() + p.Msg.Data.size(); - WriteNewSizeInternal += p.Msg.External ? 0 : (p.Msg.SourceId.size() + p.Msg.Data.size()); - WriteNewSizeUncompressed += p.Msg.UncompressedSize + p.Msg.SourceId.size(); - if (p.Msg.PartNo == 0) { - ++WriteNewMessages; - if (!p.Msg.External) - ++WriteNewMessagesInternal; + WriteNewSizeFull += p.Msg.SourceId.size() + p.Msg.Data.size(); + WriteNewSizeUncompressedFull += p.Msg.UncompressedSize + p.Msg.SourceId.size(); + if (!p.Internal) { + WriteNewSize += p.Msg.SourceId.size() + p.Msg.Data.size(); + WriteNewSizeUncompressed += p.Msg.UncompressedSize + p.Msg.SourceId.size(); + WriteNewSizeInternal += p.Msg.External ? 0 : (p.Msg.SourceId.size() + p.Msg.Data.size()); + } + if (p.Msg.PartNo == 0 && !p.Internal) { + ++WriteNewMessages; + if (!p.Msg.External) + ++WriteNewMessagesInternal; } TMaybe<TPartData> partData; @@ -1176,13 +1187,14 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey const ui64 writeLagMs = (WriteTimestamp - TInstant::MilliSeconds(p.Msg.CreateTimestamp)).MilliSeconds(); WriteLagMs.Update(writeLagMs, WriteTimestamp); - if (InputTimeLag) { + if (InputTimeLag && !p.Internal) { InputTimeLag->IncFor(writeLagMs, 1); - if (p.Msg.PartNo == 0) { - MessageSize.IncFor(p.Msg.TotalSize + p.Msg.SourceId.size(), 1); - } + } else if (SupportivePartitionTimeLag) { + SupportivePartitionTimeLag->Insert(writeLagMs, 1); + } + if (p.Msg.PartNo == 0 && !p.Internal) { + MessageSize.IncFor(p.Msg.TotalSize + p.Msg.SourceId.size(), 1); } - bool lastBlobPart = blob.IsLastPart(); //will return compacted tmp blob @@ -1616,6 +1628,7 @@ void TPartition::BeginAppendHeadWithNewWrites(const TActorContext& ctx) WriteCycleSize = 0; WriteNewSize = 0; WriteNewSizeUncompressed = 0; + WriteNewSizeUncompressed = 0; WriteNewMessages = 0; UpdateWriteBufferIsFullState(ctx.Now()); CurrentTimestamp = ctx.Now(); diff --git a/ydb/core/persqueue/percentile_counter.cpp b/ydb/core/persqueue/percentile_counter.cpp index 9cd4e7f7d2..85edf4a950 100644 --- a/ydb/core/persqueue/percentile_counter.cpp +++ b/ydb/core/persqueue/percentile_counter.cpp @@ -193,10 +193,99 @@ TVector<ui64> TPartitionHistogramWrapper::GetValues() const { } return res; } +const TVector<ui64>& TPartitionHistogramWrapper::GetRanges() const { + Y_ABORT_UNLESS(!IsSupportivePartition); + return Histogram->Ranges; +} TPartitionHistogramWrapper::operator bool() const { return Inited && (IsSupportivePartition || Histogram); } + +ui64 TMultiBucketCounter::InsertWithHint(double value, ui64 count, ui64 hint) noexcept { + if (!count) { + return hint; + } + while (hint < Buckets.size()) { + if (Buckets[hint].Range < value) { + ++hint; + continue; + } else { + break; + } + } + auto& bucket = Buckets[hint]; + auto newAvg = (bucket.AvgValue * bucket.ValuesCount + count * value) / (bucket.ValuesCount + count); + + bucket.ValuesCount += count; + bucket.AvgValue = newAvg; + return hint; +} + +void TMultiBucketCounter::UpdateTimestamp(ui64 newTimeRef) +{ + if (newTimeRef <= TimeReference) { // Cannot update in the past + return; + } + ui64 timeDiff = newTimeRef - TimeReference; + auto oldBuckets = std::move(Buckets); + Buckets = TVector<TBucket>(oldBuckets.size()); + for (auto i = 0u; i < Buckets.size(); ++i) { + Buckets[i].Range = oldBuckets[i].Range; + } + ui64 hint = 0; + for (const auto& b : oldBuckets) { + hint = InsertWithHint(b.AvgValue + timeDiff, b.ValuesCount, hint); + } + TimeReference = newTimeRef; +} + +TMultiBucketCounter::TMultiBucketCounter(const TVector<ui64>& buckets, ui64 multiplier, ui64 timeRef) + : Buckets(buckets.size() * multiplier + 1) + , TimeReference(timeRef) +{ + ui64 prev = 0; + ui64 i = 0; + for (auto b : buckets) { + ui64 step = (b - prev) / multiplier; + for (auto j = 1u; j < multiplier; ++j) { + Buckets[i++].Range = prev + j * step; + } + Buckets[i++].Range = b; + prev = b; + } + Buckets[i++].Range = std::numeric_limits<ui64>::max(); +} + +void TMultiBucketCounter::Insert(i64 value, ui64 count) noexcept { + if (value < 0) { + InsertWithHint(0, count, 0); + return; + } + + ui64 begin = 0, end = Buckets.size() - 1; + while (end - begin > 10) { + ui64 median = begin + (end - begin) / 2; + if (Buckets[median].Range >= (ui64)value) { + end = median; + } else { + begin = median; + } + } + InsertWithHint((ui64)value, count, begin); +} + +TVector<std::pair<double, ui64>> TMultiBucketCounter::GetValues(bool allowZeroes) const noexcept { + TVector<std::pair<double, ui64>> result; + for (const auto b: Buckets) { + if (!allowZeroes && b.ValuesCount == 0) { + continue; + } + result.push_back(std::make_pair(b.AvgValue, b.ValuesCount)); + } + return result; +} + } // NPQ } // NKikimr diff --git a/ydb/core/persqueue/percentile_counter.h b/ydb/core/persqueue/percentile_counter.h index a06dde78dc..89ec2a2ddd 100644 --- a/ydb/core/persqueue/percentile_counter.h +++ b/ydb/core/persqueue/percentile_counter.h @@ -97,8 +97,36 @@ public: iter++; } } + const TVector<ui64>& GetRanges() const; operator bool() const; }; + +class TMultiBucketCounter { +private: + struct TBucket { + ui64 Range; + double AvgValue = 0; + ui64 ValuesCount = 0; + TBucket() = default; + explicit TBucket(ui64 range) + : Range(range) + , AvgValue(0) + , ValuesCount(0) + {} + }; + TVector<TBucket> Buckets; + ui64 TimeReference; + + ui64 InsertWithHint(double value, ui64 count, ui64 hint) noexcept; + +public: + TMultiBucketCounter(const TVector<ui64>& buckets, ui64 multiplier, ui64 timeRef); + void UpdateTimestamp(ui64 newTimeReference); + void Insert(i64 value, ui64 count) noexcept; + TVector<std::pair<double, ui64>> GetValues(bool allowZeroes = false) const noexcept; + +}; + }// NPQ }// NKikimr diff --git a/ydb/core/persqueue/ut/counters_ut.cpp b/ydb/core/persqueue/ut/counters_ut.cpp index 7a15805c84..a2d38248a4 100644 --- a/ydb/core/persqueue/ut/counters_ut.cpp +++ b/ydb/core/persqueue/ut/counters_ut.cpp @@ -4,6 +4,8 @@ #include <ydb/core/base/tablet_pipecache.h> #include <ydb/core/mon/sync_http_mon.h> #include <ydb/core/persqueue/ut/common/pq_ut_common.h> +#include <ydb/core/persqueue/percentile_counter.h> +#include <ydb/core/persqueue/partition.h> #include <ydb/core/sys_view/service/sysview_service.h> #include <ydb/core/testlib/fake_scheme_shard.h> #include <ydb/core/tx/scheme_cache/scheme_cache.h> @@ -185,7 +187,7 @@ Y_UNIT_TEST(PartitionFirstClass) { dbGroup->OutputHtml(countersStr); TString referenceCounters = NResource::Find(TStringBuf("counters_pqproxy_firstclass.html")); - UNIT_ASSERT_VALUES_EQUAL(countersStr.Str() + "\n", referenceCounters); + UNIT_ASSERT_EQUAL(countersStr.Str() + "\n", referenceCounters); } { @@ -427,8 +429,7 @@ Y_UNIT_TEST(PartitionFirstClass) { ->GetSubgroup("database", "/Root") ->GetSubgroup("cloud_id", "cloud_id") ->GetSubgroup("folder_id", "folder_id") - ->GetSubgroup("database_id", "database_id") - ->GetSubgroup("topic", "topic"); + ->GetSubgroup("database_id", "database_id")->GetSubgroup("topic", "topic"); group->GetNamedCounter("name", "topic.partition.uptime_milliseconds_min", false)->Set(30000); group->GetNamedCounter("name", "topic.partition.write.lag_milliseconds_max", false)->Set(600); group->GetNamedCounter("name", "topic.partition.uptime_milliseconds_min", false)->Set(30000); @@ -560,4 +561,65 @@ Y_UNIT_TEST(ImportantFlagSwitching) { } // Y_UNIT_TEST_SUITE(PQCountersLabeled) +Y_UNIT_TEST_SUITE(TMultiBucketCounter) { +void CheckBucketsValues(const TVector<std::pair<double, ui64>>& actual, const TVector<std::pair<double, ui64>>& expected) { + UNIT_ASSERT_VALUES_EQUAL(actual.size(), expected.size()); + for (auto i = 0u; i < expected.size(); ++i) { + UNIT_ASSERT_VALUES_EQUAL(actual[i].second, expected[i].second); + UNIT_ASSERT_C(abs(actual[i].first - expected[i].first) < 0.0001, TStringBuilder() << actual[i].first << "-" << expected[i].first); + } +} + +Y_UNIT_TEST(InsertAndUpdate) { + TMultiBucketCounter counter({100, 200, 500, 1000, 5000}, 5, 0); + counter.Insert(19, 3); + counter.Insert(15, 1); + counter.Insert(17, 1); + + counter.Insert(100, 1); + counter.Insert(50001, 5); + + CheckBucketsValues(counter.GetValues(), {{(19*3 + 15 + 17) / 5.0, 5}, {100, 1}, {50001, 5}}); + + counter.UpdateTimestamp(50); + + CheckBucketsValues(counter.GetValues(), {{50.0 + (19*3 + 15 + 17) / 5.0, 5}, {150, 1}, {50051, 5}}); + counter.Insert(190, 1); + counter.Insert(155, 1); + + CheckBucketsValues(counter.GetValues(), {{50.0 + (19*3 + 15 + 17) / 5.0, 5}, {152.5, 2}, {190, 1}, {50051, 5}}); + + counter.UpdateTimestamp(1050); + + CheckBucketsValues(counter.GetValues(), {{(1067.8 * 5 + 1152.5 * 2 + 1190) / 8, 8}, {51051, 5}}); +} + +Y_UNIT_TEST(ManyCounters) { + TVector<ui64> buckets = {100, 200, 500, 1000, 2000, 5000}; + ui64 multiplier = 20; + TMultiBucketCounter counter(buckets, multiplier, 0); + for (auto i = 1u; i <= 5000; i++) { + counter.Insert(1, 1); + counter.UpdateTimestamp(i); + } + counter.Insert(1, 1); + + const auto& values = counter.GetValues(true); + ui64 prev = 0; + for (auto i = 0u; i < buckets.size(); i++) { + ui64 sum = 0u; + for (auto j = 0u; j < multiplier; j++) { + sum += values[i * multiplier + j].second; + } + Cerr << "Bucket: " << buckets[i] << " elems count: " << sum << Endl; + ui64 bucketSize = buckets[i] - prev; + prev = buckets[i]; + i64 diff = sum - bucketSize; + UNIT_ASSERT(std::abs(diff) < (i64)(bucketSize / 10)); + } + +} + +} // Y_UNIT_TEST_SUITE(TMultiBucketCounter) + } // namespace NKikimr::NPQ diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index f7474278bc..c365822e16 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -42,6 +42,7 @@ #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/data_plane_helpers.h> #include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h> #include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> +#include <ydb/public/sdk/cpp/client/ydb_topic/include/client.h> #include <thread> @@ -7225,5 +7226,99 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { UNIT_ASSERT(diff >= timeNeededToWaitQuotaAfterReadToEnd * 2); } + Y_UNIT_TEST(TxCounters) { + TServerSettings settings = PQSettings(0, 1); + settings.PQConfig.SetTopicsAreFirstClassCitizen(true); + settings.PQConfig.SetRoot("/Root"); + settings.PQConfig.SetDatabase("/Root"); + settings.SetEnableTopicServiceTx(true); + NPersQueue::TTestServer server{settings, true}; + + server.EnableLogs({ NKikimrServices::PERSQUEUE }, NActors::NLog::PRI_INFO); + + + NYdb::TDriverConfig config; + config.SetEndpoint(TStringBuilder() << "localhost:" + ToString(server.GrpcPort)); + config.SetDatabase("/Root"); + config.SetAuthToken("root@builtin"); + auto driver = NYdb::TDriver(config); + + NYdb::NTable::TTableClient client(driver); + NYdb::NTopic::TTopicClient topicClient(driver); + + auto result = client.CreateSession().ExtractValueSync(); + auto tableSession = result.GetSession(); + auto txResult = tableSession.BeginTransaction().ExtractValueSync(); + auto tx = txResult.GetTransaction(); + + TString topic = "topic"; + NYdb::NTopic::TCreateTopicSettings createSettings; + createSettings.BeginConfigurePartitioningSettings() + .MinActivePartitions(1) + .MaxActivePartitions(1); + + auto status = topicClient.CreateTopic(topic, createSettings).GetValueSync(); + UNIT_ASSERT_C(status.IsSuccess(), status.GetIssues().ToString()); + server.WaitInit(topic); + + NYdb::NTopic::TWriteSessionSettings options; + options.Path(topic); + options.ProducerId("123"); + auto writer = topicClient.CreateWriteSession(options); + + auto send = [&](ui64 dataSize, const TDuration& writeLag) { + while (true) { + auto msg = writer->GetEvent(true); + UNIT_ASSERT(msg); + auto ev = std::get_if<NYdb::NTopic::TWriteSessionEvent::TReadyToAcceptEvent>(&*msg); + if (!ev) + continue; + TString data("a", dataSize); + NYdb::NTopic::TWriteMessage writeMsg{data}; + writeMsg.CreateTimestamp(TInstant::Now() - writeLag); + writeMsg.Codec = NYdb::NTopic::ECodec::RAW; + writeMsg.Tx(tx); + writer->Write(std::move(ev->ContinuationToken), std::move(writeMsg)); + return; + } + }; + send(10, TDuration::MilliSeconds(1001)); + send(5 * 1024 + 1, TDuration::MilliSeconds(1101)); + send(5 * 1024 + 1, TDuration::MilliSeconds(1201)); + send(10241, TDuration::MilliSeconds(2505)); + writer->Close(); + + auto commitResult = tx.Commit().ExtractValueSync(); + UNIT_ASSERT(commitResult.GetStatus() == NYdb::EStatus::SUCCESS); + + auto counters = server.GetRuntime()->GetAppData(0).Counters; + auto serviceCounters = GetServiceCounters(counters, "datastreams", false); + auto dbGroup = serviceCounters->GetSubgroup("database", "/Root") + ->GetSubgroup("cloud_id", "") + ->GetSubgroup("folder_id", "") + ->GetSubgroup("database_id", "")->GetSubgroup("topic", "topic"); + TStringStream countersStr; + dbGroup->OutputHtml(countersStr); + Cerr << "Counters: ================================ \n" << countersStr.Str() << Endl; + auto checkSingleCounter = [&](const TString& name, ui64 expected) { + auto counter = dbGroup->GetNamedCounter("name", name); + UNIT_ASSERT(counter); + UNIT_ASSERT_VALUES_EQUAL((ui64)counter->Val(), expected); + }; + checkSingleCounter("api.grpc.topic.stream_write.bytes", 20796); + checkSingleCounter("api.grpc.topic.stream_write.messages", 4); + { + auto group = dbGroup->GetSubgroup("name", "topic.write.lag_milliseconds"); + UNIT_ASSERT_VALUES_EQUAL((ui64)group->GetNamedCounter("bin", "2000")->Val(), 3); + UNIT_ASSERT_VALUES_EQUAL((ui64)group->GetNamedCounter("bin", "5000")->Val(), 1); + } + { + auto group = dbGroup->GetSubgroup("name", "topic.write.message_size_bytes"); + UNIT_ASSERT_VALUES_EQUAL((ui64)group->GetNamedCounter("bin", "1024")->Val(), 1); + UNIT_ASSERT_VALUES_EQUAL((ui64)group->GetNamedCounter("bin", "10240")->Val(), 2); + UNIT_ASSERT_VALUES_EQUAL((ui64)group->GetNamedCounter("bin", "20480")->Val(), 1); + } + } + } } |