aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFloatingCrowbar <komels@ydb.tech>2024-06-18 10:07:19 +0300
committerGitHub <noreply@github.com>2024-06-18 10:07:19 +0300
commitd23ebbe747a462f1bb29afaf810ff41cb1d4cd96 (patch)
treef77ae65cff21d847b7ab85e3ecacfbab0e535c48
parent22b4f2dbdcb83cf996a418249f28a9d391101cf9 (diff)
downloadydb-d23ebbe747a462f1bb29afaf810ff41cb1d4cd96.tar.gz
Publish tx counters (#5534)
-rw-r--r--ydb/core/persqueue/events/internal.h2
-rw-r--r--ydb/core/persqueue/partition.cpp43
-rw-r--r--ydb/core/persqueue/partition.h22
-rw-r--r--ydb/core/persqueue/partition_init.cpp25
-rw-r--r--ydb/core/persqueue/partition_types.h1
-rw-r--r--ydb/core/persqueue/partition_write.cpp57
-rw-r--r--ydb/core/persqueue/percentile_counter.cpp89
-rw-r--r--ydb/core/persqueue/percentile_counter.h28
-rw-r--r--ydb/core/persqueue/ut/counters_ut.cpp68
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp95
10 files changed, 379 insertions, 51 deletions
diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h
index f80db7b3f7..bf0354ec46 100644
--- a/ydb/core/persqueue/events/internal.h
+++ b/ydb/core/persqueue/events/internal.h
@@ -5,6 +5,7 @@
#include <ydb/core/base/row_version.h>
#include <ydb/core/protos/pqconfig.pb.h>
#include <ydb/core/persqueue/blob.h>
+#include <ydb/core/persqueue/percentile_counter.h>
#include <ydb/core/persqueue/key.h>
#include <ydb/core/persqueue/sourceid_info.h>
#include <ydb/core/persqueue/metering_sink.h>
@@ -1080,6 +1081,7 @@ struct TEvPQ {
ui64 MessagesWrittenTotal;
ui64 MessagesWrittenGrpc;
TVector<ui64> MessagesSizes;
+ THolder<NPQ::TMultiBucketCounter> InputLags;
};
struct TEvGetWriteInfoError : public TEventLocal<TEvGetWriteInfoError, EvGetWriteInfoError> {
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index a015e98fbe..23827c5f49 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -202,12 +202,6 @@ TPartition::TPartition(ui64 tabletId, const TPartitionId& partition, const TActo
, InitDone(false)
, NewPartition(newPartition)
, Subscriber(partition, TabletCounters, Tablet)
- , WriteCycleSize(0)
- , WriteNewSize(0)
- , WriteNewSizeInternal(0)
- , WriteNewSizeUncompressed(0)
- , WriteNewMessages(0)
- , WriteNewMessagesInternal(0)
, DiskIsFull(false)
, SubDomainOutOfSpace(subDomainOutOfSpace)
, HasDataReqNum(0)
@@ -1018,6 +1012,7 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorCon
response->MessagesWrittenTotal = MsgsWrittenTotal.Value();
response->MessagesWrittenGrpc = MsgsWrittenGrpc.Value();
response->MessagesSizes = std::move(MessageSize.GetValues());
+ response->InputLags = std::move(SupportivePartitionTimeLag);
ctx.Send(ev->Sender, response);
}
@@ -1850,6 +1845,36 @@ void TPartition::RunPersist() {
AddCmdWriteConfig(PersistRequest->Record);
}
if (PersistRequest->Record.CmdDeleteRangeSize() || PersistRequest->Record.CmdWriteSize() || PersistRequest->Record.CmdRenameSize()) {
+ // Apply counters
+ for (const auto& writeInfo : WriteInfosApplied) {
+ // writeTimeLag
+ if (InputTimeLag && writeInfo->InputLags) {
+ writeInfo->InputLags->UpdateTimestamp(ctx.Now().MilliSeconds());
+ for (const auto& values : writeInfo->InputLags->GetValues()) {
+ if (values.second)
+ InputTimeLag->IncFor(std::ceil(values.first), values.second);
+ }
+ }
+ //MessageSize
+ auto i = 0u;
+ for (auto range : MessageSize.GetRanges()) {
+ if (i >= writeInfo->MessagesSizes.size()) {
+ break;
+ }
+ MessageSize.IncFor(range, writeInfo->MessagesSizes[i++]);
+ }
+
+ // Bytes Written
+ BytesWrittenTotal.Inc(writeInfo->BytesWrittenTotal);
+ BytesWrittenGrpc.Inc(writeInfo->BytesWrittenGrpc);
+ BytesWrittenUncompressed.Inc(writeInfo->BytesWrittenUncompressed);
+ // Messages written
+ MsgsWrittenTotal.Inc(writeInfo->MessagesWrittenTotal);
+ MsgsWrittenGrpc.Inc(writeInfo->MessagesWrittenTotal);
+ }
+ WriteInfosApplied.clear();
+ //Done with counters.
+
ctx.Send(HaveWriteMsg ? BlobCache : Tablet, PersistRequest.Release());
KVWriteInProgress = true;
} else {
@@ -2044,7 +2069,7 @@ bool TPartition::BeginTransaction(const TEvPQ::TEvProposePartitionConfig& event)
return true;
}
-void TPartition::CommitWriteOperations(const TTransaction& t)
+void TPartition::CommitWriteOperations(TTransaction& t)
{
if (!t.WriteInfo) {
return;
@@ -2072,10 +2097,12 @@ void TPartition::CommitWriteOperations(const TTransaction& t)
.IgnoreQuotaDeadline = true,
.HeartbeatVersion = std::nullopt,
}, std::nullopt};
+ msg.Internal = true;
TMessage message(std::move(msg), ctx.Now() - TInstant::Zero());
UserActionAndTxPendingCommit.emplace_front(std::move(message));
}
+ WriteInfosApplied.emplace_back(std::move(t.WriteInfo));
}
void TPartition::CommitTransaction(TSimpleSharedPtr<TTransaction>& t)
@@ -2370,7 +2397,7 @@ TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TE
return EProcessResult::Continue;
}
-void TPartition::ExecImmediateTx(const TTransaction& t)
+void TPartition::ExecImmediateTx(TTransaction& t)
{
--ImmediateTxCount;
auto& record = t.ProposeTransaction->Record;
diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h
index b4ddc3de15..01933c08e0 100644
--- a/ydb/core/persqueue/partition.h
+++ b/ydb/core/persqueue/partition.h
@@ -28,6 +28,7 @@
namespace NKikimr::NPQ {
static const ui32 MAX_BLOB_PART_SIZE = 500_KB;
+static const ui32 DEFAULT_BUCKET_COUNTER_MULTIPLIER = 20;
using TPartitionLabeledCounters = TProtobufTabletLabeledCounters<EPartitionLabeledCounters_descriptor>;
@@ -429,7 +430,7 @@ private:
void Handle(TEvPQ::TEvProcessChangeOwnerRequests::TPtr& ev, const TActorContext& ctx);
void StartProcessChangeOwnerRequests(const TActorContext& ctx);
- void CommitWriteOperations(const TTransaction& t);
+ void CommitWriteOperations(TTransaction& t);
void HandleOnInit(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext& ctx);
@@ -704,7 +705,7 @@ private:
[[nodiscard]] EProcessResult PreProcessImmediateTx(const NKikimrPQ::TEvProposeTransaction& tx);
- void ExecImmediateTx(const TTransaction& tx);
+ void ExecImmediateTx(TTransaction& tx);
EProcessResult PreProcessRequest(TRegisterMessageGroupMsg& msg);
EProcessResult PreProcessRequest(TDeregisterMessageGroupMsg& msg);
@@ -757,6 +758,7 @@ private:
std::deque<TUserActionAndTransactionEvent> UserActionAndTransactionEvents;
std::deque<TUserActionAndTransactionEvent> UserActionAndTxPendingCommit;
+ TVector<THolder<TEvPQ::TEvGetWriteInfoResponse>> WriteInfosApplied;
THashMap<ui64, TSimpleSharedPtr<TTransaction>> TransactionsInflight;
THashMap<TActorId, TSimpleSharedPtr<TTransaction>> WriteInfosToTx;
@@ -814,14 +816,17 @@ private:
TSubscriber Subscriber;
TInstant WriteCycleStartTime;
- ui32 WriteCycleSize;
+ ui32 WriteCycleSize = 0;
ui32 WriteCycleSizeEstimate = 0;
ui32 WriteKeysSizeEstimate = 0;
- ui32 WriteNewSize;
- ui32 WriteNewSizeInternal;
- ui64 WriteNewSizeUncompressed;
- ui32 WriteNewMessages;
- ui32 WriteNewMessagesInternal;
+ ui32 WriteNewSize = 0;
+ ui32 WriteNewSizeFull = 0;
+ ui32 WriteNewSizeInternal = 0;
+ ui64 WriteNewSizeUncompressed = 0;
+ ui64 WriteNewSizeUncompressedFull = 0;
+
+ ui32 WriteNewMessages = 0;
+ ui32 WriteNewMessagesInternal = 0;
TInstant CurrentTimestamp;
@@ -860,6 +865,7 @@ private:
NSlidingWindow::TSlidingWindow<NSlidingWindow::TMaxOperation<ui64>> WriteLagMs;
//ToDo - counters.
THolder<TPercentileCounter> InputTimeLag;
+ THolder<TMultiBucketCounter> SupportivePartitionTimeLag;
TPartitionHistogramWrapper MessageSize;
TPercentileCounter WriteLatency;
diff --git a/ydb/core/persqueue/partition_init.cpp b/ydb/core/persqueue/partition_init.cpp
index 15d9964d0a..e914205436 100644
--- a/ydb/core/persqueue/partition_init.cpp
+++ b/ydb/core/persqueue/partition_init.cpp
@@ -8,7 +8,6 @@ namespace NKikimr::NPQ {
static const ui32 LEVEL0 = 32;
static const TString WRITE_QUOTA_ROOT_PATH = "write-quota";
-
bool DiskIsFull(TEvKeyValue::TEvResponse::TPtr& ev);
void RequestInfoRange(const TActorContext& ctx, const TActorId& dst, const TPartitionId& partition, const TString& key);
void RequestDataRange(const TActorContext& ctx, const TActorId& dst, const TPartitionId& partition, const TString& key);
@@ -789,7 +788,7 @@ void TPartition::SetupTopicCounters(const TActorContext& ctx) {
subGroup = GetServiceCounters(counters, "pqproxy|writeInfo");
{
- std::unique_ptr<TPercentileCounter> percentileCounter(new TPercentileCounter(
+ std::unique_ptr<TPercentileCounter> percentileCounter(new TPercentileCounter(
subGroup, labels, {{"sensor", "MessageSize" + suffix}}, "Size",
TVector<std::pair<ui64, TString>>{
{1_KB, "1kb"}, {5_KB, "5kb"}, {10_KB, "10kb"},
@@ -869,15 +868,21 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
subgroups.push_back({"name", "topic.write.lag_milliseconds"});
- InputTimeLag = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter(
- NPersQueue::GetCountersForTopic(counters, IsServerless), {},
- subgroups, "bin",
- TVector<std::pair<ui64, TString>>{
- {100, "100"}, {200, "200"}, {500, "500"},
- {1000, "1000"}, {2000, "2000"}, {5000, "5000"},
- {10'000, "10000"}, {30'000, "30000"}, {60'000, "60000"},
- {180'000,"180000"}, {9'999'999, "999999"}}, true));
+ if (IsSupportive()) {
+ SupportivePartitionTimeLag = MakeHolder<TMultiBucketCounter>(
+ TVector<ui64>{100, 200, 500, 1000, 2000, 5000, 10'000, 30'000, 60'000, 180'000, 9'999'999},
+ DEFAULT_BUCKET_COUNTER_MULTIPLIER, ctx.Now().MilliSeconds());
+ } else {
+ InputTimeLag = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter(
+ NPersQueue::GetCountersForTopic(counters, IsServerless), {},
+ subgroups, "bin",
+ TVector<std::pair<ui64, TString>>{
+ {100, "100"}, {200, "200"}, {500, "500"},
+ {1000, "1000"}, {2000, "2000"}, {5000, "5000"},
+ {10'000, "10000"}, {30'000, "30000"}, {60'000, "60000"},
+ {180'000,"180000"}, {9'999'999, "999999"}}, true));
+ }
subgroups.back().second = "topic.write.message_size_bytes";
{
std::unique_ptr<TPercentileCounter> percentileCounter(new TPercentileCounter(
diff --git a/ydb/core/persqueue/partition_types.h b/ydb/core/persqueue/partition_types.h
index 9ad5dded83..c43b147ecb 100644
--- a/ydb/core/persqueue/partition_types.h
+++ b/ydb/core/persqueue/partition_types.h
@@ -20,6 +20,7 @@ struct TWriteMsg {
TMaybe<ui64> Offset;
TEvPQ::TEvWrite::TMsg Msg;
std::optional<ui64> InitialSeqNo;
+ bool Internal = false;
};
struct TOwnershipMsg {
diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp
index 3a06980de9..2d52126041 100644
--- a/ydb/core/persqueue/partition_write.cpp
+++ b/ydb/core/persqueue/partition_write.cpp
@@ -312,7 +312,7 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
", Offset: " << offset << " is " << (already ? "already written" : "stored on disk")
);
- if (PartitionWriteQuotaWaitCounter) {
+ if (PartitionWriteQuotaWaitCounter && !writeResponse.Internal) {
PartitionWriteQuotaWaitCounter->IncFor(PartitionQuotaWaitTimeForCurrentBlob.MilliSeconds());
}
if (!already && partNo + 1 == totalParts && !writeResponse.Msg.HeartbeatVersion)
@@ -524,15 +524,20 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) {
"TPartition::HandleWriteResponse writeNewSize# " << WriteNewSize;
);
+ if (SupportivePartitionTimeLag) {
+ SupportivePartitionTimeLag->UpdateTimestamp(now.MilliSeconds());
+ }
if (SplitMergeEnabled(Config)) {
- SplitMergeAvgWriteBytes->Update(WriteNewSize, now);
+ SplitMergeAvgWriteBytes->Update(WriteNewSizeFull, now);
auto needScaling = CheckScaleStatus(ctx);
ChangeScaleStatusIfNeeded(needScaling);
}
WriteCycleSize = 0;
WriteNewSize = 0;
+ WriteNewSizeFull = 0;
WriteNewSizeInternal = 0;
WriteNewSizeUncompressed = 0;
+ WriteNewSizeUncompressedFull = 0;
WriteNewMessages = 0;
WriteNewMessagesInternal = 0;
UpdateWriteBufferIsFullState(now);
@@ -1044,14 +1049,17 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey
<< ". Writing seqNo: " << sourceId.UpdatedSeqNo()
<< ". EndOffset: " << EndOffset << ". CurOffset: " << curOffset << ". Offset: " << poffset
);
-
- TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ALREADY].Increment(1);
- MsgsDiscarded.Inc();
- TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ALREADY].Increment(p.Msg.Data.size());
- BytesDiscarded.Inc(p.Msg.Data.size());
+ if (!p.Internal) {
+ TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ALREADY].Increment(1);
+ MsgsDiscarded.Inc();
+ TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ALREADY].Increment(p.Msg.Data.size());
+ BytesDiscarded.Inc(p.Msg.Data.size());
+ }
} else {
- TabletCounters.Cumulative()[COUNTER_PQ_WRITE_SMALL_OFFSET].Increment(1);
- TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_SMALL_OFFSET].Increment(p.Msg.Data.size());
+ if (!p.Internal) {
+ TabletCounters.Cumulative()[COUNTER_PQ_WRITE_SMALL_OFFSET].Increment(1);
+ TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_SMALL_OFFSET].Increment(p.Msg.Data.size());
+ }
}
TString().swap(p.Msg.Data);
@@ -1153,14 +1161,17 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey
ctx.Send(Tablet, new TEvents::TEvPoisonPill());
return false;
}
-
- WriteNewSize += p.Msg.SourceId.size() + p.Msg.Data.size();
- WriteNewSizeInternal += p.Msg.External ? 0 : (p.Msg.SourceId.size() + p.Msg.Data.size());
- WriteNewSizeUncompressed += p.Msg.UncompressedSize + p.Msg.SourceId.size();
- if (p.Msg.PartNo == 0) {
- ++WriteNewMessages;
- if (!p.Msg.External)
- ++WriteNewMessagesInternal;
+ WriteNewSizeFull += p.Msg.SourceId.size() + p.Msg.Data.size();
+ WriteNewSizeUncompressedFull += p.Msg.UncompressedSize + p.Msg.SourceId.size();
+ if (!p.Internal) {
+ WriteNewSize += p.Msg.SourceId.size() + p.Msg.Data.size();
+ WriteNewSizeUncompressed += p.Msg.UncompressedSize + p.Msg.SourceId.size();
+ WriteNewSizeInternal += p.Msg.External ? 0 : (p.Msg.SourceId.size() + p.Msg.Data.size());
+ }
+ if (p.Msg.PartNo == 0 && !p.Internal) {
+ ++WriteNewMessages;
+ if (!p.Msg.External)
+ ++WriteNewMessagesInternal;
}
TMaybe<TPartData> partData;
@@ -1176,13 +1187,14 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey
const ui64 writeLagMs =
(WriteTimestamp - TInstant::MilliSeconds(p.Msg.CreateTimestamp)).MilliSeconds();
WriteLagMs.Update(writeLagMs, WriteTimestamp);
- if (InputTimeLag) {
+ if (InputTimeLag && !p.Internal) {
InputTimeLag->IncFor(writeLagMs, 1);
- if (p.Msg.PartNo == 0) {
- MessageSize.IncFor(p.Msg.TotalSize + p.Msg.SourceId.size(), 1);
- }
+ } else if (SupportivePartitionTimeLag) {
+ SupportivePartitionTimeLag->Insert(writeLagMs, 1);
+ }
+ if (p.Msg.PartNo == 0 && !p.Internal) {
+ MessageSize.IncFor(p.Msg.TotalSize + p.Msg.SourceId.size(), 1);
}
-
bool lastBlobPart = blob.IsLastPart();
//will return compacted tmp blob
@@ -1616,6 +1628,7 @@ void TPartition::BeginAppendHeadWithNewWrites(const TActorContext& ctx)
WriteCycleSize = 0;
WriteNewSize = 0;
WriteNewSizeUncompressed = 0;
+ WriteNewSizeUncompressed = 0;
WriteNewMessages = 0;
UpdateWriteBufferIsFullState(ctx.Now());
CurrentTimestamp = ctx.Now();
diff --git a/ydb/core/persqueue/percentile_counter.cpp b/ydb/core/persqueue/percentile_counter.cpp
index 9cd4e7f7d2..85edf4a950 100644
--- a/ydb/core/persqueue/percentile_counter.cpp
+++ b/ydb/core/persqueue/percentile_counter.cpp
@@ -193,10 +193,99 @@ TVector<ui64> TPartitionHistogramWrapper::GetValues() const {
}
return res;
}
+const TVector<ui64>& TPartitionHistogramWrapper::GetRanges() const {
+ Y_ABORT_UNLESS(!IsSupportivePartition);
+ return Histogram->Ranges;
+}
TPartitionHistogramWrapper::operator bool() const {
return Inited && (IsSupportivePartition || Histogram);
}
+
+ui64 TMultiBucketCounter::InsertWithHint(double value, ui64 count, ui64 hint) noexcept {
+ if (!count) {
+ return hint;
+ }
+ while (hint < Buckets.size()) {
+ if (Buckets[hint].Range < value) {
+ ++hint;
+ continue;
+ } else {
+ break;
+ }
+ }
+ auto& bucket = Buckets[hint];
+ auto newAvg = (bucket.AvgValue * bucket.ValuesCount + count * value) / (bucket.ValuesCount + count);
+
+ bucket.ValuesCount += count;
+ bucket.AvgValue = newAvg;
+ return hint;
+}
+
+void TMultiBucketCounter::UpdateTimestamp(ui64 newTimeRef)
+{
+ if (newTimeRef <= TimeReference) { // Cannot update in the past
+ return;
+ }
+ ui64 timeDiff = newTimeRef - TimeReference;
+ auto oldBuckets = std::move(Buckets);
+ Buckets = TVector<TBucket>(oldBuckets.size());
+ for (auto i = 0u; i < Buckets.size(); ++i) {
+ Buckets[i].Range = oldBuckets[i].Range;
+ }
+ ui64 hint = 0;
+ for (const auto& b : oldBuckets) {
+ hint = InsertWithHint(b.AvgValue + timeDiff, b.ValuesCount, hint);
+ }
+ TimeReference = newTimeRef;
+}
+
+TMultiBucketCounter::TMultiBucketCounter(const TVector<ui64>& buckets, ui64 multiplier, ui64 timeRef)
+ : Buckets(buckets.size() * multiplier + 1)
+ , TimeReference(timeRef)
+{
+ ui64 prev = 0;
+ ui64 i = 0;
+ for (auto b : buckets) {
+ ui64 step = (b - prev) / multiplier;
+ for (auto j = 1u; j < multiplier; ++j) {
+ Buckets[i++].Range = prev + j * step;
+ }
+ Buckets[i++].Range = b;
+ prev = b;
+ }
+ Buckets[i++].Range = std::numeric_limits<ui64>::max();
+}
+
+void TMultiBucketCounter::Insert(i64 value, ui64 count) noexcept {
+ if (value < 0) {
+ InsertWithHint(0, count, 0);
+ return;
+ }
+
+ ui64 begin = 0, end = Buckets.size() - 1;
+ while (end - begin > 10) {
+ ui64 median = begin + (end - begin) / 2;
+ if (Buckets[median].Range >= (ui64)value) {
+ end = median;
+ } else {
+ begin = median;
+ }
+ }
+ InsertWithHint((ui64)value, count, begin);
+}
+
+TVector<std::pair<double, ui64>> TMultiBucketCounter::GetValues(bool allowZeroes) const noexcept {
+ TVector<std::pair<double, ui64>> result;
+ for (const auto b: Buckets) {
+ if (!allowZeroes && b.ValuesCount == 0) {
+ continue;
+ }
+ result.push_back(std::make_pair(b.AvgValue, b.ValuesCount));
+ }
+ return result;
+}
+
} // NPQ
} // NKikimr
diff --git a/ydb/core/persqueue/percentile_counter.h b/ydb/core/persqueue/percentile_counter.h
index a06dde78dc..89ec2a2ddd 100644
--- a/ydb/core/persqueue/percentile_counter.h
+++ b/ydb/core/persqueue/percentile_counter.h
@@ -97,8 +97,36 @@ public:
iter++;
}
}
+ const TVector<ui64>& GetRanges() const;
operator bool() const;
};
+
+class TMultiBucketCounter {
+private:
+ struct TBucket {
+ ui64 Range;
+ double AvgValue = 0;
+ ui64 ValuesCount = 0;
+ TBucket() = default;
+ explicit TBucket(ui64 range)
+ : Range(range)
+ , AvgValue(0)
+ , ValuesCount(0)
+ {}
+ };
+ TVector<TBucket> Buckets;
+ ui64 TimeReference;
+
+ ui64 InsertWithHint(double value, ui64 count, ui64 hint) noexcept;
+
+public:
+ TMultiBucketCounter(const TVector<ui64>& buckets, ui64 multiplier, ui64 timeRef);
+ void UpdateTimestamp(ui64 newTimeReference);
+ void Insert(i64 value, ui64 count) noexcept;
+ TVector<std::pair<double, ui64>> GetValues(bool allowZeroes = false) const noexcept;
+
+};
+
}// NPQ
}// NKikimr
diff --git a/ydb/core/persqueue/ut/counters_ut.cpp b/ydb/core/persqueue/ut/counters_ut.cpp
index 7a15805c84..a2d38248a4 100644
--- a/ydb/core/persqueue/ut/counters_ut.cpp
+++ b/ydb/core/persqueue/ut/counters_ut.cpp
@@ -4,6 +4,8 @@
#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/mon/sync_http_mon.h>
#include <ydb/core/persqueue/ut/common/pq_ut_common.h>
+#include <ydb/core/persqueue/percentile_counter.h>
+#include <ydb/core/persqueue/partition.h>
#include <ydb/core/sys_view/service/sysview_service.h>
#include <ydb/core/testlib/fake_scheme_shard.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
@@ -185,7 +187,7 @@ Y_UNIT_TEST(PartitionFirstClass) {
dbGroup->OutputHtml(countersStr);
TString referenceCounters = NResource::Find(TStringBuf("counters_pqproxy_firstclass.html"));
- UNIT_ASSERT_VALUES_EQUAL(countersStr.Str() + "\n", referenceCounters);
+ UNIT_ASSERT_EQUAL(countersStr.Str() + "\n", referenceCounters);
}
{
@@ -427,8 +429,7 @@ Y_UNIT_TEST(PartitionFirstClass) {
->GetSubgroup("database", "/Root")
->GetSubgroup("cloud_id", "cloud_id")
->GetSubgroup("folder_id", "folder_id")
- ->GetSubgroup("database_id", "database_id")
- ->GetSubgroup("topic", "topic");
+ ->GetSubgroup("database_id", "database_id")->GetSubgroup("topic", "topic");
group->GetNamedCounter("name", "topic.partition.uptime_milliseconds_min", false)->Set(30000);
group->GetNamedCounter("name", "topic.partition.write.lag_milliseconds_max", false)->Set(600);
group->GetNamedCounter("name", "topic.partition.uptime_milliseconds_min", false)->Set(30000);
@@ -560,4 +561,65 @@ Y_UNIT_TEST(ImportantFlagSwitching) {
} // Y_UNIT_TEST_SUITE(PQCountersLabeled)
+Y_UNIT_TEST_SUITE(TMultiBucketCounter) {
+void CheckBucketsValues(const TVector<std::pair<double, ui64>>& actual, const TVector<std::pair<double, ui64>>& expected) {
+ UNIT_ASSERT_VALUES_EQUAL(actual.size(), expected.size());
+ for (auto i = 0u; i < expected.size(); ++i) {
+ UNIT_ASSERT_VALUES_EQUAL(actual[i].second, expected[i].second);
+ UNIT_ASSERT_C(abs(actual[i].first - expected[i].first) < 0.0001, TStringBuilder() << actual[i].first << "-" << expected[i].first);
+ }
+}
+
+Y_UNIT_TEST(InsertAndUpdate) {
+ TMultiBucketCounter counter({100, 200, 500, 1000, 5000}, 5, 0);
+ counter.Insert(19, 3);
+ counter.Insert(15, 1);
+ counter.Insert(17, 1);
+
+ counter.Insert(100, 1);
+ counter.Insert(50001, 5);
+
+ CheckBucketsValues(counter.GetValues(), {{(19*3 + 15 + 17) / 5.0, 5}, {100, 1}, {50001, 5}});
+
+ counter.UpdateTimestamp(50);
+
+ CheckBucketsValues(counter.GetValues(), {{50.0 + (19*3 + 15 + 17) / 5.0, 5}, {150, 1}, {50051, 5}});
+ counter.Insert(190, 1);
+ counter.Insert(155, 1);
+
+ CheckBucketsValues(counter.GetValues(), {{50.0 + (19*3 + 15 + 17) / 5.0, 5}, {152.5, 2}, {190, 1}, {50051, 5}});
+
+ counter.UpdateTimestamp(1050);
+
+ CheckBucketsValues(counter.GetValues(), {{(1067.8 * 5 + 1152.5 * 2 + 1190) / 8, 8}, {51051, 5}});
+}
+
+Y_UNIT_TEST(ManyCounters) {
+ TVector<ui64> buckets = {100, 200, 500, 1000, 2000, 5000};
+ ui64 multiplier = 20;
+ TMultiBucketCounter counter(buckets, multiplier, 0);
+ for (auto i = 1u; i <= 5000; i++) {
+ counter.Insert(1, 1);
+ counter.UpdateTimestamp(i);
+ }
+ counter.Insert(1, 1);
+
+ const auto& values = counter.GetValues(true);
+ ui64 prev = 0;
+ for (auto i = 0u; i < buckets.size(); i++) {
+ ui64 sum = 0u;
+ for (auto j = 0u; j < multiplier; j++) {
+ sum += values[i * multiplier + j].second;
+ }
+ Cerr << "Bucket: " << buckets[i] << " elems count: " << sum << Endl;
+ ui64 bucketSize = buckets[i] - prev;
+ prev = buckets[i];
+ i64 diff = sum - bucketSize;
+ UNIT_ASSERT(std::abs(diff) < (i64)(bucketSize / 10));
+ }
+
+}
+
+} // Y_UNIT_TEST_SUITE(TMultiBucketCounter)
+
} // namespace NKikimr::NPQ
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index f7474278bc..c365822e16 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -42,6 +42,7 @@
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/data_plane_helpers.h>
#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h>
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
+#include <ydb/public/sdk/cpp/client/ydb_topic/include/client.h>
#include <thread>
@@ -7225,5 +7226,99 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
UNIT_ASSERT(diff >= timeNeededToWaitQuotaAfterReadToEnd * 2);
}
+ Y_UNIT_TEST(TxCounters) {
+ TServerSettings settings = PQSettings(0, 1);
+ settings.PQConfig.SetTopicsAreFirstClassCitizen(true);
+ settings.PQConfig.SetRoot("/Root");
+ settings.PQConfig.SetDatabase("/Root");
+ settings.SetEnableTopicServiceTx(true);
+ NPersQueue::TTestServer server{settings, true};
+
+ server.EnableLogs({ NKikimrServices::PERSQUEUE }, NActors::NLog::PRI_INFO);
+
+
+ NYdb::TDriverConfig config;
+ config.SetEndpoint(TStringBuilder() << "localhost:" + ToString(server.GrpcPort));
+ config.SetDatabase("/Root");
+ config.SetAuthToken("root@builtin");
+ auto driver = NYdb::TDriver(config);
+
+ NYdb::NTable::TTableClient client(driver);
+ NYdb::NTopic::TTopicClient topicClient(driver);
+
+ auto result = client.CreateSession().ExtractValueSync();
+ auto tableSession = result.GetSession();
+ auto txResult = tableSession.BeginTransaction().ExtractValueSync();
+ auto tx = txResult.GetTransaction();
+
+ TString topic = "topic";
+ NYdb::NTopic::TCreateTopicSettings createSettings;
+ createSettings.BeginConfigurePartitioningSettings()
+ .MinActivePartitions(1)
+ .MaxActivePartitions(1);
+
+ auto status = topicClient.CreateTopic(topic, createSettings).GetValueSync();
+ UNIT_ASSERT_C(status.IsSuccess(), status.GetIssues().ToString());
+ server.WaitInit(topic);
+
+ NYdb::NTopic::TWriteSessionSettings options;
+ options.Path(topic);
+ options.ProducerId("123");
+ auto writer = topicClient.CreateWriteSession(options);
+
+ auto send = [&](ui64 dataSize, const TDuration& writeLag) {
+ while (true) {
+ auto msg = writer->GetEvent(true);
+ UNIT_ASSERT(msg);
+ auto ev = std::get_if<NYdb::NTopic::TWriteSessionEvent::TReadyToAcceptEvent>(&*msg);
+ if (!ev)
+ continue;
+ TString data("a", dataSize);
+ NYdb::NTopic::TWriteMessage writeMsg{data};
+ writeMsg.CreateTimestamp(TInstant::Now() - writeLag);
+ writeMsg.Codec = NYdb::NTopic::ECodec::RAW;
+ writeMsg.Tx(tx);
+ writer->Write(std::move(ev->ContinuationToken), std::move(writeMsg));
+ return;
+ }
+ };
+ send(10, TDuration::MilliSeconds(1001));
+ send(5 * 1024 + 1, TDuration::MilliSeconds(1101));
+ send(5 * 1024 + 1, TDuration::MilliSeconds(1201));
+ send(10241, TDuration::MilliSeconds(2505));
+ writer->Close();
+
+ auto commitResult = tx.Commit().ExtractValueSync();
+ UNIT_ASSERT(commitResult.GetStatus() == NYdb::EStatus::SUCCESS);
+
+ auto counters = server.GetRuntime()->GetAppData(0).Counters;
+ auto serviceCounters = GetServiceCounters(counters, "datastreams", false);
+ auto dbGroup = serviceCounters->GetSubgroup("database", "/Root")
+ ->GetSubgroup("cloud_id", "")
+ ->GetSubgroup("folder_id", "")
+ ->GetSubgroup("database_id", "")->GetSubgroup("topic", "topic");
+ TStringStream countersStr;
+ dbGroup->OutputHtml(countersStr);
+ Cerr << "Counters: ================================ \n" << countersStr.Str() << Endl;
+ auto checkSingleCounter = [&](const TString& name, ui64 expected) {
+ auto counter = dbGroup->GetNamedCounter("name", name);
+ UNIT_ASSERT(counter);
+ UNIT_ASSERT_VALUES_EQUAL((ui64)counter->Val(), expected);
+ };
+ checkSingleCounter("api.grpc.topic.stream_write.bytes", 20796);
+ checkSingleCounter("api.grpc.topic.stream_write.messages", 4);
+ {
+ auto group = dbGroup->GetSubgroup("name", "topic.write.lag_milliseconds");
+ UNIT_ASSERT_VALUES_EQUAL((ui64)group->GetNamedCounter("bin", "2000")->Val(), 3);
+ UNIT_ASSERT_VALUES_EQUAL((ui64)group->GetNamedCounter("bin", "5000")->Val(), 1);
+ }
+ {
+ auto group = dbGroup->GetSubgroup("name", "topic.write.message_size_bytes");
+ UNIT_ASSERT_VALUES_EQUAL((ui64)group->GetNamedCounter("bin", "1024")->Val(), 1);
+ UNIT_ASSERT_VALUES_EQUAL((ui64)group->GetNamedCounter("bin", "10240")->Val(), 2);
+ UNIT_ASSERT_VALUES_EQUAL((ui64)group->GetNamedCounter("bin", "20480")->Val(), 1);
+ }
+ }
+
}
}