aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexbogo <alexbogo@ydb.tech>2023-02-15 10:43:33 +0300
committeralexbogo <alexbogo@ydb.tech>2023-02-15 10:43:33 +0300
commit8b2cb29ffbddf18649570d759756db6bf4100c34 (patch)
tree9460bd674f5e7b66af4e72d27a889e2eb62bc1e6
parent8d5870f5f3b8a67afd44c3856560d521e6255bd0 (diff)
downloadydb-8b2cb29ffbddf18649570d759756db6bf4100c34.tar.gz
optimization for a large number of consumers
init
-rw-r--r--ydb/core/persqueue/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/persqueue/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/persqueue/CMakeLists.linux.txt1
-rw-r--r--ydb/core/persqueue/partition.cpp54
-rw-r--r--ydb/core/persqueue/partition.h19
-rw-r--r--ydb/core/persqueue/quota_tracker.cpp51
-rw-r--r--ydb/core/persqueue/quota_tracker.h29
-rw-r--r--ydb/core/persqueue/user_info.cpp17
-rw-r--r--ydb/core/persqueue/user_info.h143
-rw-r--r--ydb/core/persqueue/ut/counters_ut.cpp30
10 files changed, 144 insertions, 202 deletions
diff --git a/ydb/core/persqueue/CMakeLists.darwin.txt b/ydb/core/persqueue/CMakeLists.darwin.txt
index 7f0040544b..e9cdb1f022 100644
--- a/ydb/core/persqueue/CMakeLists.darwin.txt
+++ b/ydb/core/persqueue/CMakeLists.darwin.txt
@@ -52,6 +52,7 @@ target_sources(ydb-core-persqueue PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_database.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_l2_cache.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/quota_tracker.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_balancer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_speed_limiter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/sourceid.cpp
diff --git a/ydb/core/persqueue/CMakeLists.linux-aarch64.txt b/ydb/core/persqueue/CMakeLists.linux-aarch64.txt
index ef7d925028..1c810979a4 100644
--- a/ydb/core/persqueue/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/persqueue/CMakeLists.linux-aarch64.txt
@@ -53,6 +53,7 @@ target_sources(ydb-core-persqueue PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_database.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_l2_cache.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/quota_tracker.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_balancer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_speed_limiter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/sourceid.cpp
diff --git a/ydb/core/persqueue/CMakeLists.linux.txt b/ydb/core/persqueue/CMakeLists.linux.txt
index ef7d925028..1c810979a4 100644
--- a/ydb/core/persqueue/CMakeLists.linux.txt
+++ b/ydb/core/persqueue/CMakeLists.linux.txt
@@ -53,6 +53,7 @@ target_sources(ydb-core-persqueue PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_database.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_l2_cache.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/quota_tracker.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_balancer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_speed_limiter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/sourceid.cpp
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index 25f4379bcb..66b5d08f51 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -764,7 +764,7 @@ void TPartition::Bootstrap(const TActorContext& ctx) {
Partition));
}
- UsersInfoStorage.Init(Tablet, SelfId());
+ UsersInfoStorage.Init(Tablet, SelfId(), ctx);
Y_VERIFY(AppData(ctx)->PQConfig.GetMaxBlobsPerLevel() > 0);
ui32 border = LEVEL0;
@@ -1036,7 +1036,6 @@ void TPartition::UpdateAvailableSize(const TActorContext& ctx) {
}
}
ScheduleUpdateAvailableSize(ctx);
- ReportCounters(ctx);
}
void TPartition::HandleOnIdle(TEvPQ::TEvUpdateAvailableSize::TPtr&, const TActorContext& ctx) {
@@ -1829,6 +1828,8 @@ void TPartition::InitComplete(const TActorContext& ctx) {
if (Config.GetPartitionConfig().HasMirrorFrom()) {
CreateMirrorerActor();
}
+
+ ReportCounters(ctx);
}
@@ -2385,7 +2386,6 @@ void TPartition::Handle(TEvPQ::TEvBlobResponse::TPtr& ev, const TActorContext& c
TabletCounters.Cumulative()[COUNTER_PQ_READ_BYTES].Increment(resp.ByteSize());
}
ctx.Send(info.Destination != 0 ? Tablet : ctx.SelfID, answer.Event.Release());
- ReportCounters(ctx);
OnReadRequestFinished(std::move(info), answer.Size);
}
@@ -3026,13 +3026,11 @@ void TPartition::ReadTimestampForOffset(const TString& user, TUserInfo& userInfo
}
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_TIMESTAMP_OFFSET_IS_LOST].Increment(1);
- ReportCounters(ctx);
return;
}
if (userInfo.Offset >= (i64)EndOffset || StartOffset == EndOffset) {
userInfo.ReadScheduled = false;
- ReportCounters(ctx);
return;
}
@@ -3149,7 +3147,6 @@ void TPartition::ProcessTimestampRead(const TActorContext& ctx) {
ReadTimestampForOffset(user, *userInfo, ctx);
}
Y_VERIFY(ReadingTimestamp || UpdateUserInfoTimestamp.empty());
- ReportCounters(ctx);
}
@@ -3386,18 +3383,6 @@ void TPartition::ReportCounters(const TActorContext& ctx) {
userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_BYTES].Set(speed);
}
- ui64 availSec = userInfo.ReadQuota.GetAvailableAvgSec(ctx.Now());
- if (availSec != userInfo.LabeledCounters->GetCounters()[METRIC_MIN_READ_QUOTA_BYTES_AVAIL_SEC].Get()) {
- haveChanges = true;
- userInfo.LabeledCounters->GetCounters()[METRIC_MIN_READ_QUOTA_BYTES_AVAIL_SEC].Set(availSec);
- }
-
- ui64 availMin = userInfo.ReadQuota.GetAvailableAvgMin(ctx.Now());
- if (availMin != userInfo.LabeledCounters->GetCounters()[METRIC_MIN_READ_QUOTA_BYTES_AVAIL_MIN].Get()) {
- haveChanges = true;
- userInfo.LabeledCounters->GetCounters()[METRIC_MIN_READ_QUOTA_BYTES_AVAIL_MIN].Set(availMin);
- }
-
ui64 readOffsetRewindSum = userInfo.ReadOffsetRewindSum;
if (readOffsetRewindSum != userInfo.LabeledCounters->GetCounters()[METRIC_READ_OFFSET_REWIND_SUM].Get()) {
haveChanges = true;
@@ -3460,18 +3445,6 @@ void TPartition::ReportCounters(const TActorContext& ctx) {
PartitionCountersLabeled->GetCounters()[METRIC_WRITE_QUOTA_BYTES].Set(speed);
}
- ui64 availSec = WriteQuota->GetAvailableAvgSec(ctx.Now());
- if (availSec != PartitionCountersLabeled->GetCounters()[METRIC_MIN_WRITE_QUOTA_BYTES_AVAIL_SEC].Get()) {
- haveChanges = true;
- PartitionCountersLabeled->GetCounters()[METRIC_MIN_WRITE_QUOTA_BYTES_AVAIL_SEC].Set(availSec);
- }
-
- ui64 availMin = WriteQuota->GetAvailableAvgMin(ctx.Now());
- if (availMin != PartitionCountersLabeled->GetCounters()[METRIC_MIN_WRITE_QUOTA_BYTES_AVAIL_MIN].Get()) {
- haveChanges = true;
- PartitionCountersLabeled->GetCounters()[METRIC_MIN_WRITE_QUOTA_BYTES_AVAIL_MIN].Set(availMin);
- }
-
ui32 id = METRIC_TOTAL_WRITE_SPEED_1;
for (ui32 i = 0; i < AvgWriteBytes.size(); ++i) {
ui64 avg = AvgWriteBytes[i].GetValue();
@@ -3942,6 +3915,8 @@ void TPartition::BeginChangePartitionConfig(const TEvPQ::TEvChangePartitionConfi
ProcessUserAct(act, ctx);
}
+
+ ReportCounters(ctx);
}
void TPartition::EndChangePartitionConfig(const TEvPQ::TEvChangePartitionConfig& event,
@@ -3985,6 +3960,7 @@ void TPartition::EndChangePartitionConfig(const TEvPQ::TEvChangePartitionConfig&
}
SchedulePartitionConfigChanged();
+ ReportCounters(ctx);
}
void TPartition::InitPendingUserInfoForImportantClients(const TEvPQ::TEvChangePartitionConfig& event,
@@ -4572,21 +4548,6 @@ void TPartition::ScheduleUpdateAvailableSize(const TActorContext& ctx) {
ctx.Schedule(UPDATE_AVAIL_SIZE_INTERVAL, new TEvPQ::TEvUpdateAvailableSize());
}
-
-void TQuotaTracker::Update(const TInstant& timestamp) {
- ui64 ms = (timestamp - LastUpdateTime).MilliSeconds();
- LastUpdateTime += TDuration::MilliSeconds(ms);
-
- if (AvailableSize < 0) {
- QuotedTime += ms;
- }
-
- AvailableSize = Min<i64>(AvailableSize + (ui64)SpeedPerSecond * ms / 1000, MaxBurst);
- AvgMin.Update(AvailableSize, timestamp.MilliSeconds());
- AvgSec.Update(AvailableSize, timestamp.MilliSeconds());
-}
-
-
void TPartition::HandleWriteResponse(const TActorContext& ctx) {
Y_VERIFY(CurrentStateFunc() == &TThis::StateWrite);
@@ -4656,8 +4617,6 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) {
ProcessTimestampsForNewData(prevEndOffset, ctx);
- ReportCounters(ctx);
-
HandleWrites(ctx);
}
@@ -5625,7 +5584,6 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u
TabletCounters.Percentile()[COUNTER_LATENCY_PQ_READ_HEAD_ONLY].IncrementFor((ctx.Now() - info.Timestamp).MilliSeconds());
TabletCounters.Cumulative()[COUNTER_PQ_READ_BYTES].Increment(resp.ByteSize());
ctx.Send(info.Destination != 0 ? Tablet : ctx.SelfID, answer.Event.Release());
- ReportCounters(ctx);
OnReadRequestFinished(std::move(info), answer.Size);
return;
}
diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h
index df1b1c3963..4a68000f90 100644
--- a/ydb/core/persqueue/partition.h
+++ b/ydb/core/persqueue/partition.h
@@ -1,21 +1,24 @@
#pragma once
-#include <util/generic/set.h>
-
-#include <library/cpp/actors/core/actor.h>
-#include <library/cpp/actors/core/hfunc.h>
-#include <library/cpp/actors/core/log.h>
-#include <library/cpp/sliding_window/sliding_window.h>
-#include <ydb/core/keyvalue/keyvalue_events.h>
-#include <ydb/library/persqueue/counter_time_keeper/counter_time_keeper.h>
#include "blob.h"
#include "header.h"
#include "key.h"
#include "partition_types.h"
+#include "quota_tracker.h"
#include "sourceid.h"
#include "subscriber.h"
#include "user_info.h"
+#include <ydb/core/keyvalue/keyvalue_events.h>
+#include <ydb/library/persqueue/counter_time_keeper/counter_time_keeper.h>
+
+#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/actors/core/log.h>
+#include <library/cpp/sliding_window/sliding_window.h>
+
+#include <util/generic/set.h>
+
namespace NKikimr::NPQ {
diff --git a/ydb/core/persqueue/quota_tracker.cpp b/ydb/core/persqueue/quota_tracker.cpp
new file mode 100644
index 0000000000..2728670a18
--- /dev/null
+++ b/ydb/core/persqueue/quota_tracker.cpp
@@ -0,0 +1,51 @@
+#include "quota_tracker.h"
+
+
+namespace NKikimr::NPQ {
+ TQuotaTracker::TQuotaTracker(const ui64 maxBurst, const ui64 speedPerSecond, const TInstant timestamp)
+ : AvailableSize(maxBurst)
+ , SpeedPerSecond(speedPerSecond)
+ , LastUpdateTime(timestamp)
+ , MaxBurst(maxBurst)
+ , QuotedTime(0)
+ {}
+
+ void TQuotaTracker::UpdateConfig(const ui64 maxBurst, const ui64 speedPerSecond) {
+ SpeedPerSecond = speedPerSecond;
+ MaxBurst = maxBurst;
+ AvailableSize = maxBurst;
+ }
+
+ void TQuotaTracker::Update(const TInstant timestamp) {
+ ui64 ms = (timestamp - LastUpdateTime).MilliSeconds();
+ LastUpdateTime = timestamp;
+
+ if (AvailableSize < 0) {
+ QuotedTime += ms;
+ }
+
+ AvailableSize = Min<i64>(AvailableSize + (ui64)SpeedPerSecond * ms / 1000, MaxBurst);
+ }
+
+ bool TQuotaTracker::CanExaust() const {
+ return AvailableSize > 0;
+ }
+
+ void TQuotaTracker::Exaust(const ui64 size, const TInstant timestamp) {
+ Update(timestamp);
+ AvailableSize -= (i64)size;
+ Update(timestamp);
+ }
+
+ ui64 TQuotaTracker::GetQuotedTime() const {
+ return QuotedTime;
+ }
+
+ ui64 TQuotaTracker::GetTotalSpeed() const {
+ return SpeedPerSecond;
+ }
+
+} // NKikimr::NPQ
+
+
+
diff --git a/ydb/core/persqueue/quota_tracker.h b/ydb/core/persqueue/quota_tracker.h
new file mode 100644
index 0000000000..8301a1877a
--- /dev/null
+++ b/ydb/core/persqueue/quota_tracker.h
@@ -0,0 +1,29 @@
+#pragma once
+
+#include <util/datetime/base.h>
+
+
+namespace NKikimr::NPQ {
+ class TQuotaTracker {
+ public:
+ TQuotaTracker(const ui64 maxBurst, const ui64 speedPerSecond, const TInstant timestamp);
+
+ void UpdateConfig(const ui64 maxBurst, const ui64 speedPerSecond);
+ void Update(const TInstant timestamp);
+
+ bool CanExaust() const;
+ void Exaust(const ui64 size, const TInstant timestamp);
+
+ ui64 GetQuotedTime() const;
+ ui64 GetTotalSpeed() const;
+
+ private:
+ i64 AvailableSize;
+ ui64 SpeedPerSecond;
+ TInstant LastUpdateTime;
+ ui64 MaxBurst;
+
+ ui64 QuotedTime;
+ };
+
+} // NKikimr::NPQ
diff --git a/ydb/core/persqueue/user_info.cpp b/ydb/core/persqueue/user_info.cpp
index 6cdb214c5d..a1b10ede2f 100644
--- a/ydb/core/persqueue/user_info.cpp
+++ b/ydb/core/persqueue/user_info.cpp
@@ -56,16 +56,19 @@ TUsersInfoStorage::TUsersInfoStorage(
Counters.Populate(counters);
}
-void TUsersInfoStorage::Init(TActorId tabletActor, TActorId partitionActor) {
+void TUsersInfoStorage::Init(TActorId tabletActor, TActorId partitionActor, const TActorContext& ctx) {
+ Y_VERIFY(UsersInfo.empty());
Y_VERIFY(!TabletActor);
Y_VERIFY(!PartitionActor);
TabletActor = tabletActor;
PartitionActor = partitionActor;
- for (auto& userInfoPair : UsersInfo) {
- auto& userInfo = userInfoPair.second;
- Y_VERIFY(!userInfo.ReadSpeedLimiter);
- userInfo.ReadSpeedLimiter = CreateReadSpeedLimiter(userInfo.User);
+ if (AppData(ctx)->Counters && AppData()->PQConfig.GetTopicsAreFirstClassCitizen()) {
+ StreamCountersSubgroup = NPersQueue::GetCountersForTopic(AppData(ctx)->Counters, IsServerless);
+ auto subgroups = NPersQueue::GetSubgroupsForTopic(TopicConverter, CloudId, DbId, DbPath, FolderId);
+ for (auto& group : subgroups) {
+ StreamCountersSubgroup = StreamCountersSubgroup->GetSubgroup(group.first, group.second);
+ }
}
}
@@ -181,8 +184,8 @@ TUserInfo TUsersInfoStorage::CreateUserInfo(const TActorContext& ctx,
return {
- ctx, CreateReadSpeedLimiter(user), user, readRuleGeneration, important, TopicConverter, Partition,
- session, gen, step, offset, readOffsetRewindSum, DCId, readFromTimestamp, CloudId, DbId, DbPath, IsServerless, FolderId,
+ ctx, StreamCountersSubgroup, CreateReadSpeedLimiter(user), user, readRuleGeneration, important, TopicConverter, Partition,
+ session, gen, step, offset, readOffsetRewindSum, DCId, readFromTimestamp, DbPath,
meterRead, burst, speed
};
}
diff --git a/ydb/core/persqueue/user_info.h b/ydb/core/persqueue/user_info.h
index 7184f967cc..e60b13b79f 100644
--- a/ydb/core/persqueue/user_info.h
+++ b/ydb/core/persqueue/user_info.h
@@ -3,6 +3,7 @@
#include "working_time_counter.h"
#include "subscriber.h"
#include "percentile_counter.h"
+#include "quota_tracker.h"
#include "read_speed_limiter.h"
#include "metering_sink.h"
@@ -32,111 +33,6 @@ static const TString CLIENTID_WITHOUT_CONSUMER = "$without_consumer";
typedef TProtobufTabletLabeledCounters<EClientLabeledCounters_descriptor> TUserLabeledCounters;
-
-class TQuotaTracker {
-
- class TAvgTracker {
- public:
- TAvgTracker(ui64 duration)
- : Duration(duration)
- , Sum(0)
- {
- Y_VERIFY(duration > 0);
- }
-
- void Update(i64 value, i64 ts)
- {
- Values.push_back(std::make_pair(value, ts));
- i64 newStart = ts - Duration;
- if (Values.size() > 1) {
- Sum += GetSum(Values.size() - 2);
- Y_VERIFY(Values.back().second >= Values.back().second);
- }
- while (Values.size() > 2 && newStart > Values[1].second) {
- Sum -= GetSum(0);
- Values.pop_front();
- }
- }
-
- ui64 GetAvg() {
- return (Values.size() > 1 && Values.back().second > Values.front().second)
- ? Max<i64>(0, Sum / (Values.back().second - Values.front().second))
- : 0;
- }
-
- private:
-
- i64 GetSum(ui32 pos) {
- Y_VERIFY(pos + 1 < Values.size());
- return (Values[pos + 1].first + Values[pos].first) * (Values[pos + 1].second - Values[pos].second) / 2;
- }
-
- private:
- ui64 Duration;
- i64 Sum;
- std::deque<std::pair<i64, i64>> Values;
- };
-
-
-public:
- TQuotaTracker(const ui64 maxBurst, const ui64 speedPerSecond, const TInstant& timestamp)
- : AvailableSize(maxBurst)
- , SpeedPerSecond(speedPerSecond)
- , LastUpdateTime(timestamp)
- , MaxBurst(maxBurst)
- , AvgMin(60'000) //avg avail in bytes per sec for last minute
- , AvgSec(1000) //avg avail in bytes per sec
- , QuotedTime(0)
- {}
-
- ui64 GetQuotedTime() const {
- return QuotedTime;
- }
-
- void UpdateConfig(const ui64 maxBurst, const ui64 speedPerSecond) {
- SpeedPerSecond = speedPerSecond;
- MaxBurst = maxBurst;
- AvailableSize = maxBurst;
- }
-
- void Update(const TInstant& timestamp);
-
- bool CanExaust() const {
- return AvailableSize > 0;
- }
-
- void Exaust(const ui64 size, const TInstant& timestamp) {
- Update(timestamp);
- AvailableSize -= (i64)size;
- Update(timestamp);
- }
-
- ui64 GetAvailableAvgSec(const TInstant& timestamp) {
- Update(timestamp);
- return AvgSec.GetAvg();
- }
-
- ui64 GetAvailableAvgMin(const TInstant& timestamp) {
- Update(timestamp);
- return AvgMin.GetAvg();
- }
-
- ui64 GetTotalSpeed() const {
- return SpeedPerSecond;
- }
-
-private:
- i64 AvailableSize;
- ui64 SpeedPerSecond;
- TInstant LastUpdateTime;
- ui64 MaxBurst;
-
- TAvgTracker AvgMin;
- TAvgTracker AvgSec;
-
- ui64 QuotedTime;
-};
-
struct TReadSpeedLimiterHolder {
TReadSpeedLimiterHolder(const TActorId& actor, const TTabletCountersBase& baseline)
: Actor(actor)
@@ -262,11 +158,12 @@ struct TUserInfo {
}
TUserInfo(
- const TActorContext& ctx, THolder<TReadSpeedLimiterHolder> readSpeedLimiter, const TString& user,
+ const TActorContext& ctx,
+ NMonitoring::TDynamicCounterPtr streamCountersSubgroup, THolder<TReadSpeedLimiterHolder> readSpeedLimiter, const TString& user,
const ui64 readRuleGeneration, const bool important, const NPersQueue::TTopicConverterPtr& topicConverter,
const ui32 partition, const TString &session, ui32 gen, ui32 step, i64 offset,
const ui64 readOffsetRewindSum, const TString& dcId, TInstant readFromTimestamp,
- const TString& cloudId, const TString& dbId, const TString& dbPath, const bool isServerless, const TString& folderId, bool meterRead,
+ const TString& dbPath, bool meterRead,
ui64 burst = 1'000'000'000, ui64 speed = 1'000'000'000
)
: ReadSpeedLimiter(std::move(readSpeedLimiter))
@@ -305,7 +202,7 @@ struct TUserInfo {
LabeledCounters.Reset(new TUserLabeledCounters(
user + "||" + topicConverter->GetClientsideName(), partition, dbPath));
- SetupStreamCounters(ctx, dcId, ToString<ui32>(partition), cloudId, dbId, dbPath, isServerless, folderId);
+ SetupStreamCounters(streamCountersSubgroup);
} else {
LabeledCounters.Reset(new TUserLabeledCounters(
user + "/" + (important ? "1" : "0") + "/" + topicConverter->GetClientsideName(),
@@ -316,13 +213,9 @@ struct TUserInfo {
}
}
- void SetupStreamCounters(
- const TActorContext& ctx, const TString& dcId, const TString& partition,
- const TString& cloudId, const TString& dbId, const TString& dbPath, const bool isServerless, const TString& folderId
- ) {
- auto subgroup = NPersQueue::GetCountersForTopic(AppData(ctx)->Counters, isServerless);
- auto subgroups =
- NPersQueue::GetSubgroupsForTopic(TopicConverter, cloudId, dbId, dbPath, folderId);
+ void SetupStreamCounters(NMonitoring::TDynamicCounterPtr subgroup) {
+ Y_VERIFY(subgroup);
+ TVector<std::pair<TString, TString>> subgroups;
if (DoInternalRead) {
subgroups.push_back({"consumer", User});
@@ -338,20 +231,11 @@ struct TUserInfo {
MsgsRead = TMultiCounter(subgroup, {}, subgroups,
{"topic.read.messages"}, true, "name");
}
- Y_UNUSED(dcId);
- Y_UNUSED(partition);
- /*
- Counter.SetCounter(subgroup,
- {{"database", dbPath}, {"cloud_id", cloudId}, {"folder_id", folderId}, {"database_id", dbId},
- {"topic", TopicConverter->GetFederationPath()},
- {"consumer", User}, {"host", dcId}, {"partition", partition}},
- {"name", "topic.read.awaiting_consume_milliseconds", true});
- */
-
- subgroups.push_back({"name", "topic.read.lag_milliseconds"});
+
+ subgroups.emplace_back("name", "topic.read.lag_milliseconds");
ReadTimeLag.reset(new TPercentileCounter(
- NPersQueue::GetCountersForTopic(AppData(ctx)->Counters, isServerless), {}, subgroups, "bin",
- TVector<std::pair<ui64, TString>>{{100, "100"}, {200, "200"}, {500, "500"},
+ subgroup, {}, subgroups, "bin",
+ TVector<std::pair<ui64, TString>>{{100, "100"}, {200, "200"}, {500, "500"},
{1000, "1000"}, {2000, "2000"},
{5000, "5000"}, {10'000, "10000"},
{30'000, "30000"}, {60'000, "60000"},
@@ -496,7 +380,7 @@ public:
const TTabletCountersBase& counters, const NKikimrPQ::TPQTabletConfig& config,
const TString& CloudId, const TString& DbId, const TString& DbPath, const bool isServerless, const TString& FolderId);
- void Init(TActorId tabletActor, TActorId partitionActor);
+ void Init(TActorId tabletActor, TActorId partitionActor, const TActorContext& ctx);
void ParseDeprecated(const TString& key, const TString& data, const TActorContext& ctx);
void Parse(const TString& key, const TString& data, const TActorContext& ctx);
@@ -541,6 +425,7 @@ private:
NPersQueue::TTopicConverterPtr TopicConverter;
const ui32 Partition;
TTabletCountersBase Counters;
+ NMonitoring::TDynamicCounterPtr StreamCountersSubgroup;
TMaybe<TActorId> TabletActor;
TMaybe<TActorId> PartitionActor;
diff --git a/ydb/core/persqueue/ut/counters_ut.cpp b/ydb/core/persqueue/ut/counters_ut.cpp
index 0c351f5106..04aae14c11 100644
--- a/ydb/core/persqueue/ut/counters_ut.cpp
+++ b/ydb/core/persqueue/ut/counters_ut.cpp
@@ -147,9 +147,8 @@ void CompareJsons(const TString& inputStr, const TString& referenceStr) {
NJson::TJsonValue inputJson;
UNIT_ASSERT(NJson::ReadJsonTree(TStringBuf(inputStr), &inputJson));
- // Run time of test differs as well as counters below. We check if they are in
- // probable interval [4500; 5500], set it to 5000 and then compare with reference
- // string.
+ // Run time of test differs as well as counters below.
+ // We set it to 5000 and then compare with reference string.
auto getByPath = [](const NJson::TJsonValue& msg, TStringBuf path) {
NJson::TJsonValue ret;
UNIT_ASSERT_C(msg.GetValueByPath(path, ret), path);
@@ -162,20 +161,31 @@ void CompareJsons(const TString& inputStr, const TString& referenceStr) {
getByPath(sensor, "labels.sensor") == "PQ/PartitionLifeTimeMs" ||
getByPath(sensor, "labels.sensor") == "PQ/TotalTimeLagMsByLastRead" ||
getByPath(sensor, "labels.sensor") == "PQ/WriteTimeLagMsByLastReadOld")) {
- auto value = sensor["value"].GetIntegerSafe();
- UNIT_ASSERT_GT(value, 4500);
- UNIT_ASSERT_LT(value, 5500);
sensor.SetValueByPath("value", 5000);
} else if (getByPath(sensor, "kind") == "GAUGE" &&
(getByPath(sensor, "labels.sensor") == "PQ/WriteTimeLagMsByLastRead" ||
getByPath(sensor, "labels.sensor") == "PQ/WriteTimeLagMsByLastWrite")) {
- auto value = sensor["value"].GetIntegerSafe();
- UNIT_ASSERT_GT_C(value, 100, "value is " << value);
- UNIT_ASSERT_LT_C(value, 3000, "value is " << value);
sensor.SetValueByPath("value", 30);
}
}
- UNIT_ASSERT_VALUES_EQUAL(referenceJson, inputJson);
+
+ Cerr << "Test diff count : " << inputJson["sensors"].GetArraySafe().size()
+ << " " << referenceJson["sensors"].GetArraySafe().size() << Endl;
+
+ ui64 inCount = inputJson["sensors"].GetArraySafe().size();
+ ui64 refCount = referenceJson["sensors"].GetArraySafe().size();
+ for (ui64 i = 0; i < inCount && i < refCount; ++i) {
+ auto& in = inputJson["sensors"].GetArraySafe()[i];
+ auto& ref = referenceJson["sensors"].GetArraySafe()[i];
+ UNIT_ASSERT_VALUES_EQUAL_C(in["labels"], ref["labels"], TStringBuilder() << " at pos #" << i);
+ }
+ if (inCount > refCount) {
+ UNIT_ASSERT_C(false, inputJson["sensors"].GetArraySafe()[refCount].GetStringRobust());
+ } else if (refCount > inCount) {
+ UNIT_ASSERT_C(false, referenceJson["sensors"].GetArraySafe()[inCount].GetStringRobust());
+ }
+
+ //UNIT_ASSERT_VALUES_EQUAL(referenceJson, inputJson);
}
Y_UNIT_TEST(Partition) {