diff options
author | alexbogo <alexbogo@ydb.tech> | 2023-02-15 10:43:33 +0300 |
---|---|---|
committer | alexbogo <alexbogo@ydb.tech> | 2023-02-15 10:43:33 +0300 |
commit | 8b2cb29ffbddf18649570d759756db6bf4100c34 (patch) | |
tree | 9460bd674f5e7b66af4e72d27a889e2eb62bc1e6 | |
parent | 8d5870f5f3b8a67afd44c3856560d521e6255bd0 (diff) | |
download | ydb-8b2cb29ffbddf18649570d759756db6bf4100c34.tar.gz |
optimization for a large number of consumers
init
-rw-r--r-- | ydb/core/persqueue/CMakeLists.darwin.txt | 1 | ||||
-rw-r--r-- | ydb/core/persqueue/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | ydb/core/persqueue/CMakeLists.linux.txt | 1 | ||||
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 54 | ||||
-rw-r--r-- | ydb/core/persqueue/partition.h | 19 | ||||
-rw-r--r-- | ydb/core/persqueue/quota_tracker.cpp | 51 | ||||
-rw-r--r-- | ydb/core/persqueue/quota_tracker.h | 29 | ||||
-rw-r--r-- | ydb/core/persqueue/user_info.cpp | 17 | ||||
-rw-r--r-- | ydb/core/persqueue/user_info.h | 143 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/counters_ut.cpp | 30 |
10 files changed, 144 insertions, 202 deletions
diff --git a/ydb/core/persqueue/CMakeLists.darwin.txt b/ydb/core/persqueue/CMakeLists.darwin.txt index 7f0040544b..e9cdb1f022 100644 --- a/ydb/core/persqueue/CMakeLists.darwin.txt +++ b/ydb/core/persqueue/CMakeLists.darwin.txt @@ -52,6 +52,7 @@ target_sources(ydb-core-persqueue PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_database.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_l2_cache.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/quota_tracker.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_balancer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_speed_limiter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/sourceid.cpp diff --git a/ydb/core/persqueue/CMakeLists.linux-aarch64.txt b/ydb/core/persqueue/CMakeLists.linux-aarch64.txt index ef7d925028..1c810979a4 100644 --- a/ydb/core/persqueue/CMakeLists.linux-aarch64.txt +++ b/ydb/core/persqueue/CMakeLists.linux-aarch64.txt @@ -53,6 +53,7 @@ target_sources(ydb-core-persqueue PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_database.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_l2_cache.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/quota_tracker.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_balancer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_speed_limiter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/sourceid.cpp diff --git a/ydb/core/persqueue/CMakeLists.linux.txt b/ydb/core/persqueue/CMakeLists.linux.txt index ef7d925028..1c810979a4 100644 --- a/ydb/core/persqueue/CMakeLists.linux.txt +++ b/ydb/core/persqueue/CMakeLists.linux.txt @@ -53,6 +53,7 @@ target_sources(ydb-core-persqueue PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_database.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_l2_cache.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/quota_tracker.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_balancer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_speed_limiter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/sourceid.cpp diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 25f4379bcb..66b5d08f51 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -764,7 +764,7 @@ void TPartition::Bootstrap(const TActorContext& ctx) { Partition)); } - UsersInfoStorage.Init(Tablet, SelfId()); + UsersInfoStorage.Init(Tablet, SelfId(), ctx); Y_VERIFY(AppData(ctx)->PQConfig.GetMaxBlobsPerLevel() > 0); ui32 border = LEVEL0; @@ -1036,7 +1036,6 @@ void TPartition::UpdateAvailableSize(const TActorContext& ctx) { } } ScheduleUpdateAvailableSize(ctx); - ReportCounters(ctx); } void TPartition::HandleOnIdle(TEvPQ::TEvUpdateAvailableSize::TPtr&, const TActorContext& ctx) { @@ -1829,6 +1828,8 @@ void TPartition::InitComplete(const TActorContext& ctx) { if (Config.GetPartitionConfig().HasMirrorFrom()) { CreateMirrorerActor(); } + + ReportCounters(ctx); } @@ -2385,7 +2386,6 @@ void TPartition::Handle(TEvPQ::TEvBlobResponse::TPtr& ev, const TActorContext& c TabletCounters.Cumulative()[COUNTER_PQ_READ_BYTES].Increment(resp.ByteSize()); } ctx.Send(info.Destination != 0 ? Tablet : ctx.SelfID, answer.Event.Release()); - ReportCounters(ctx); OnReadRequestFinished(std::move(info), answer.Size); } @@ -3026,13 +3026,11 @@ void TPartition::ReadTimestampForOffset(const TString& user, TUserInfo& userInfo } TabletCounters.Cumulative()[COUNTER_PQ_WRITE_TIMESTAMP_OFFSET_IS_LOST].Increment(1); - ReportCounters(ctx); return; } if (userInfo.Offset >= (i64)EndOffset || StartOffset == EndOffset) { userInfo.ReadScheduled = false; - ReportCounters(ctx); return; } @@ -3149,7 +3147,6 @@ void TPartition::ProcessTimestampRead(const TActorContext& ctx) { ReadTimestampForOffset(user, *userInfo, ctx); } Y_VERIFY(ReadingTimestamp || UpdateUserInfoTimestamp.empty()); - ReportCounters(ctx); } @@ -3386,18 +3383,6 @@ void TPartition::ReportCounters(const TActorContext& ctx) { userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_BYTES].Set(speed); } - ui64 availSec = userInfo.ReadQuota.GetAvailableAvgSec(ctx.Now()); - if (availSec != userInfo.LabeledCounters->GetCounters()[METRIC_MIN_READ_QUOTA_BYTES_AVAIL_SEC].Get()) { - haveChanges = true; - userInfo.LabeledCounters->GetCounters()[METRIC_MIN_READ_QUOTA_BYTES_AVAIL_SEC].Set(availSec); - } - - ui64 availMin = userInfo.ReadQuota.GetAvailableAvgMin(ctx.Now()); - if (availMin != userInfo.LabeledCounters->GetCounters()[METRIC_MIN_READ_QUOTA_BYTES_AVAIL_MIN].Get()) { - haveChanges = true; - userInfo.LabeledCounters->GetCounters()[METRIC_MIN_READ_QUOTA_BYTES_AVAIL_MIN].Set(availMin); - } - ui64 readOffsetRewindSum = userInfo.ReadOffsetRewindSum; if (readOffsetRewindSum != userInfo.LabeledCounters->GetCounters()[METRIC_READ_OFFSET_REWIND_SUM].Get()) { haveChanges = true; @@ -3460,18 +3445,6 @@ void TPartition::ReportCounters(const TActorContext& ctx) { PartitionCountersLabeled->GetCounters()[METRIC_WRITE_QUOTA_BYTES].Set(speed); } - ui64 availSec = WriteQuota->GetAvailableAvgSec(ctx.Now()); - if (availSec != PartitionCountersLabeled->GetCounters()[METRIC_MIN_WRITE_QUOTA_BYTES_AVAIL_SEC].Get()) { - haveChanges = true; - PartitionCountersLabeled->GetCounters()[METRIC_MIN_WRITE_QUOTA_BYTES_AVAIL_SEC].Set(availSec); - } - - ui64 availMin = WriteQuota->GetAvailableAvgMin(ctx.Now()); - if (availMin != PartitionCountersLabeled->GetCounters()[METRIC_MIN_WRITE_QUOTA_BYTES_AVAIL_MIN].Get()) { - haveChanges = true; - PartitionCountersLabeled->GetCounters()[METRIC_MIN_WRITE_QUOTA_BYTES_AVAIL_MIN].Set(availMin); - } - ui32 id = METRIC_TOTAL_WRITE_SPEED_1; for (ui32 i = 0; i < AvgWriteBytes.size(); ++i) { ui64 avg = AvgWriteBytes[i].GetValue(); @@ -3942,6 +3915,8 @@ void TPartition::BeginChangePartitionConfig(const TEvPQ::TEvChangePartitionConfi ProcessUserAct(act, ctx); } + + ReportCounters(ctx); } void TPartition::EndChangePartitionConfig(const TEvPQ::TEvChangePartitionConfig& event, @@ -3985,6 +3960,7 @@ void TPartition::EndChangePartitionConfig(const TEvPQ::TEvChangePartitionConfig& } SchedulePartitionConfigChanged(); + ReportCounters(ctx); } void TPartition::InitPendingUserInfoForImportantClients(const TEvPQ::TEvChangePartitionConfig& event, @@ -4572,21 +4548,6 @@ void TPartition::ScheduleUpdateAvailableSize(const TActorContext& ctx) { ctx.Schedule(UPDATE_AVAIL_SIZE_INTERVAL, new TEvPQ::TEvUpdateAvailableSize()); } - -void TQuotaTracker::Update(const TInstant& timestamp) { - ui64 ms = (timestamp - LastUpdateTime).MilliSeconds(); - LastUpdateTime += TDuration::MilliSeconds(ms); - - if (AvailableSize < 0) { - QuotedTime += ms; - } - - AvailableSize = Min<i64>(AvailableSize + (ui64)SpeedPerSecond * ms / 1000, MaxBurst); - AvgMin.Update(AvailableSize, timestamp.MilliSeconds()); - AvgSec.Update(AvailableSize, timestamp.MilliSeconds()); -} - - void TPartition::HandleWriteResponse(const TActorContext& ctx) { Y_VERIFY(CurrentStateFunc() == &TThis::StateWrite); @@ -4656,8 +4617,6 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) { ProcessTimestampsForNewData(prevEndOffset, ctx); - ReportCounters(ctx); - HandleWrites(ctx); } @@ -5625,7 +5584,6 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u TabletCounters.Percentile()[COUNTER_LATENCY_PQ_READ_HEAD_ONLY].IncrementFor((ctx.Now() - info.Timestamp).MilliSeconds()); TabletCounters.Cumulative()[COUNTER_PQ_READ_BYTES].Increment(resp.ByteSize()); ctx.Send(info.Destination != 0 ? Tablet : ctx.SelfID, answer.Event.Release()); - ReportCounters(ctx); OnReadRequestFinished(std::move(info), answer.Size); return; } diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index df1b1c3963..4a68000f90 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -1,21 +1,24 @@ #pragma once -#include <util/generic/set.h> - -#include <library/cpp/actors/core/actor.h> -#include <library/cpp/actors/core/hfunc.h> -#include <library/cpp/actors/core/log.h> -#include <library/cpp/sliding_window/sliding_window.h> -#include <ydb/core/keyvalue/keyvalue_events.h> -#include <ydb/library/persqueue/counter_time_keeper/counter_time_keeper.h> #include "blob.h" #include "header.h" #include "key.h" #include "partition_types.h" +#include "quota_tracker.h" #include "sourceid.h" #include "subscriber.h" #include "user_info.h" +#include <ydb/core/keyvalue/keyvalue_events.h> +#include <ydb/library/persqueue/counter_time_keeper/counter_time_keeper.h> + +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/core/log.h> +#include <library/cpp/sliding_window/sliding_window.h> + +#include <util/generic/set.h> + namespace NKikimr::NPQ { diff --git a/ydb/core/persqueue/quota_tracker.cpp b/ydb/core/persqueue/quota_tracker.cpp new file mode 100644 index 0000000000..2728670a18 --- /dev/null +++ b/ydb/core/persqueue/quota_tracker.cpp @@ -0,0 +1,51 @@ +#include "quota_tracker.h"
+
+
+namespace NKikimr::NPQ {
+ TQuotaTracker::TQuotaTracker(const ui64 maxBurst, const ui64 speedPerSecond, const TInstant timestamp)
+ : AvailableSize(maxBurst)
+ , SpeedPerSecond(speedPerSecond)
+ , LastUpdateTime(timestamp)
+ , MaxBurst(maxBurst)
+ , QuotedTime(0)
+ {}
+
+ void TQuotaTracker::UpdateConfig(const ui64 maxBurst, const ui64 speedPerSecond) {
+ SpeedPerSecond = speedPerSecond;
+ MaxBurst = maxBurst;
+ AvailableSize = maxBurst;
+ }
+
+ void TQuotaTracker::Update(const TInstant timestamp) {
+ ui64 ms = (timestamp - LastUpdateTime).MilliSeconds();
+ LastUpdateTime = timestamp;
+
+ if (AvailableSize < 0) {
+ QuotedTime += ms;
+ }
+
+ AvailableSize = Min<i64>(AvailableSize + (ui64)SpeedPerSecond * ms / 1000, MaxBurst);
+ }
+
+ bool TQuotaTracker::CanExaust() const {
+ return AvailableSize > 0;
+ }
+
+ void TQuotaTracker::Exaust(const ui64 size, const TInstant timestamp) {
+ Update(timestamp);
+ AvailableSize -= (i64)size;
+ Update(timestamp);
+ }
+
+ ui64 TQuotaTracker::GetQuotedTime() const {
+ return QuotedTime;
+ }
+
+ ui64 TQuotaTracker::GetTotalSpeed() const {
+ return SpeedPerSecond;
+ }
+
+} // NKikimr::NPQ
+
+
+
diff --git a/ydb/core/persqueue/quota_tracker.h b/ydb/core/persqueue/quota_tracker.h new file mode 100644 index 0000000000..8301a1877a --- /dev/null +++ b/ydb/core/persqueue/quota_tracker.h @@ -0,0 +1,29 @@ +#pragma once
+
+#include <util/datetime/base.h>
+
+
+namespace NKikimr::NPQ {
+ class TQuotaTracker {
+ public:
+ TQuotaTracker(const ui64 maxBurst, const ui64 speedPerSecond, const TInstant timestamp);
+
+ void UpdateConfig(const ui64 maxBurst, const ui64 speedPerSecond);
+ void Update(const TInstant timestamp);
+
+ bool CanExaust() const;
+ void Exaust(const ui64 size, const TInstant timestamp);
+
+ ui64 GetQuotedTime() const;
+ ui64 GetTotalSpeed() const;
+
+ private:
+ i64 AvailableSize;
+ ui64 SpeedPerSecond;
+ TInstant LastUpdateTime;
+ ui64 MaxBurst;
+
+ ui64 QuotedTime;
+ };
+
+} // NKikimr::NPQ
diff --git a/ydb/core/persqueue/user_info.cpp b/ydb/core/persqueue/user_info.cpp index 6cdb214c5d..a1b10ede2f 100644 --- a/ydb/core/persqueue/user_info.cpp +++ b/ydb/core/persqueue/user_info.cpp @@ -56,16 +56,19 @@ TUsersInfoStorage::TUsersInfoStorage( Counters.Populate(counters); } -void TUsersInfoStorage::Init(TActorId tabletActor, TActorId partitionActor) { +void TUsersInfoStorage::Init(TActorId tabletActor, TActorId partitionActor, const TActorContext& ctx) { + Y_VERIFY(UsersInfo.empty()); Y_VERIFY(!TabletActor); Y_VERIFY(!PartitionActor); TabletActor = tabletActor; PartitionActor = partitionActor; - for (auto& userInfoPair : UsersInfo) { - auto& userInfo = userInfoPair.second; - Y_VERIFY(!userInfo.ReadSpeedLimiter); - userInfo.ReadSpeedLimiter = CreateReadSpeedLimiter(userInfo.User); + if (AppData(ctx)->Counters && AppData()->PQConfig.GetTopicsAreFirstClassCitizen()) { + StreamCountersSubgroup = NPersQueue::GetCountersForTopic(AppData(ctx)->Counters, IsServerless); + auto subgroups = NPersQueue::GetSubgroupsForTopic(TopicConverter, CloudId, DbId, DbPath, FolderId); + for (auto& group : subgroups) { + StreamCountersSubgroup = StreamCountersSubgroup->GetSubgroup(group.first, group.second); + } } } @@ -181,8 +184,8 @@ TUserInfo TUsersInfoStorage::CreateUserInfo(const TActorContext& ctx, return { - ctx, CreateReadSpeedLimiter(user), user, readRuleGeneration, important, TopicConverter, Partition, - session, gen, step, offset, readOffsetRewindSum, DCId, readFromTimestamp, CloudId, DbId, DbPath, IsServerless, FolderId, + ctx, StreamCountersSubgroup, CreateReadSpeedLimiter(user), user, readRuleGeneration, important, TopicConverter, Partition, + session, gen, step, offset, readOffsetRewindSum, DCId, readFromTimestamp, DbPath, meterRead, burst, speed }; } diff --git a/ydb/core/persqueue/user_info.h b/ydb/core/persqueue/user_info.h index 7184f967cc..e60b13b79f 100644 --- a/ydb/core/persqueue/user_info.h +++ b/ydb/core/persqueue/user_info.h @@ -3,6 +3,7 @@ #include "working_time_counter.h" #include "subscriber.h" #include "percentile_counter.h" +#include "quota_tracker.h" #include "read_speed_limiter.h" #include "metering_sink.h" @@ -32,111 +33,6 @@ static const TString CLIENTID_WITHOUT_CONSUMER = "$without_consumer"; typedef TProtobufTabletLabeledCounters<EClientLabeledCounters_descriptor> TUserLabeledCounters; - -class TQuotaTracker { - - class TAvgTracker { - public: - TAvgTracker(ui64 duration) - : Duration(duration) - , Sum(0) - { - Y_VERIFY(duration > 0); - } - - void Update(i64 value, i64 ts) - { - Values.push_back(std::make_pair(value, ts)); - i64 newStart = ts - Duration; - if (Values.size() > 1) { - Sum += GetSum(Values.size() - 2); - Y_VERIFY(Values.back().second >= Values.back().second); - } - while (Values.size() > 2 && newStart > Values[1].second) { - Sum -= GetSum(0); - Values.pop_front(); - } - } - - ui64 GetAvg() { - return (Values.size() > 1 && Values.back().second > Values.front().second) - ? Max<i64>(0, Sum / (Values.back().second - Values.front().second)) - : 0; - } - - private: - - i64 GetSum(ui32 pos) { - Y_VERIFY(pos + 1 < Values.size()); - return (Values[pos + 1].first + Values[pos].first) * (Values[pos + 1].second - Values[pos].second) / 2; - } - - private: - ui64 Duration; - i64 Sum; - std::deque<std::pair<i64, i64>> Values; - }; - - -public: - TQuotaTracker(const ui64 maxBurst, const ui64 speedPerSecond, const TInstant& timestamp) - : AvailableSize(maxBurst) - , SpeedPerSecond(speedPerSecond) - , LastUpdateTime(timestamp) - , MaxBurst(maxBurst) - , AvgMin(60'000) //avg avail in bytes per sec for last minute - , AvgSec(1000) //avg avail in bytes per sec - , QuotedTime(0) - {} - - ui64 GetQuotedTime() const { - return QuotedTime; - } - - void UpdateConfig(const ui64 maxBurst, const ui64 speedPerSecond) { - SpeedPerSecond = speedPerSecond; - MaxBurst = maxBurst; - AvailableSize = maxBurst; - } - - void Update(const TInstant& timestamp); - - bool CanExaust() const { - return AvailableSize > 0; - } - - void Exaust(const ui64 size, const TInstant& timestamp) { - Update(timestamp); - AvailableSize -= (i64)size; - Update(timestamp); - } - - ui64 GetAvailableAvgSec(const TInstant& timestamp) { - Update(timestamp); - return AvgSec.GetAvg(); - } - - ui64 GetAvailableAvgMin(const TInstant& timestamp) { - Update(timestamp); - return AvgMin.GetAvg(); - } - - ui64 GetTotalSpeed() const { - return SpeedPerSecond; - } - -private: - i64 AvailableSize; - ui64 SpeedPerSecond; - TInstant LastUpdateTime; - ui64 MaxBurst; - - TAvgTracker AvgMin; - TAvgTracker AvgSec; - - ui64 QuotedTime; -}; - struct TReadSpeedLimiterHolder { TReadSpeedLimiterHolder(const TActorId& actor, const TTabletCountersBase& baseline) : Actor(actor) @@ -262,11 +158,12 @@ struct TUserInfo { } TUserInfo( - const TActorContext& ctx, THolder<TReadSpeedLimiterHolder> readSpeedLimiter, const TString& user, + const TActorContext& ctx, + NMonitoring::TDynamicCounterPtr streamCountersSubgroup, THolder<TReadSpeedLimiterHolder> readSpeedLimiter, const TString& user, const ui64 readRuleGeneration, const bool important, const NPersQueue::TTopicConverterPtr& topicConverter, const ui32 partition, const TString &session, ui32 gen, ui32 step, i64 offset, const ui64 readOffsetRewindSum, const TString& dcId, TInstant readFromTimestamp, - const TString& cloudId, const TString& dbId, const TString& dbPath, const bool isServerless, const TString& folderId, bool meterRead, + const TString& dbPath, bool meterRead, ui64 burst = 1'000'000'000, ui64 speed = 1'000'000'000 ) : ReadSpeedLimiter(std::move(readSpeedLimiter)) @@ -305,7 +202,7 @@ struct TUserInfo { LabeledCounters.Reset(new TUserLabeledCounters( user + "||" + topicConverter->GetClientsideName(), partition, dbPath)); - SetupStreamCounters(ctx, dcId, ToString<ui32>(partition), cloudId, dbId, dbPath, isServerless, folderId); + SetupStreamCounters(streamCountersSubgroup); } else { LabeledCounters.Reset(new TUserLabeledCounters( user + "/" + (important ? "1" : "0") + "/" + topicConverter->GetClientsideName(), @@ -316,13 +213,9 @@ struct TUserInfo { } } - void SetupStreamCounters( - const TActorContext& ctx, const TString& dcId, const TString& partition, - const TString& cloudId, const TString& dbId, const TString& dbPath, const bool isServerless, const TString& folderId - ) { - auto subgroup = NPersQueue::GetCountersForTopic(AppData(ctx)->Counters, isServerless); - auto subgroups = - NPersQueue::GetSubgroupsForTopic(TopicConverter, cloudId, dbId, dbPath, folderId); + void SetupStreamCounters(NMonitoring::TDynamicCounterPtr subgroup) { + Y_VERIFY(subgroup); + TVector<std::pair<TString, TString>> subgroups; if (DoInternalRead) { subgroups.push_back({"consumer", User}); @@ -338,20 +231,11 @@ struct TUserInfo { MsgsRead = TMultiCounter(subgroup, {}, subgroups, {"topic.read.messages"}, true, "name"); } - Y_UNUSED(dcId); - Y_UNUSED(partition); - /* - Counter.SetCounter(subgroup, - {{"database", dbPath}, {"cloud_id", cloudId}, {"folder_id", folderId}, {"database_id", dbId}, - {"topic", TopicConverter->GetFederationPath()}, - {"consumer", User}, {"host", dcId}, {"partition", partition}}, - {"name", "topic.read.awaiting_consume_milliseconds", true}); - */ - - subgroups.push_back({"name", "topic.read.lag_milliseconds"}); + + subgroups.emplace_back("name", "topic.read.lag_milliseconds"); ReadTimeLag.reset(new TPercentileCounter( - NPersQueue::GetCountersForTopic(AppData(ctx)->Counters, isServerless), {}, subgroups, "bin", - TVector<std::pair<ui64, TString>>{{100, "100"}, {200, "200"}, {500, "500"}, + subgroup, {}, subgroups, "bin", + TVector<std::pair<ui64, TString>>{{100, "100"}, {200, "200"}, {500, "500"}, {1000, "1000"}, {2000, "2000"}, {5000, "5000"}, {10'000, "10000"}, {30'000, "30000"}, {60'000, "60000"}, @@ -496,7 +380,7 @@ public: const TTabletCountersBase& counters, const NKikimrPQ::TPQTabletConfig& config, const TString& CloudId, const TString& DbId, const TString& DbPath, const bool isServerless, const TString& FolderId); - void Init(TActorId tabletActor, TActorId partitionActor); + void Init(TActorId tabletActor, TActorId partitionActor, const TActorContext& ctx); void ParseDeprecated(const TString& key, const TString& data, const TActorContext& ctx); void Parse(const TString& key, const TString& data, const TActorContext& ctx); @@ -541,6 +425,7 @@ private: NPersQueue::TTopicConverterPtr TopicConverter; const ui32 Partition; TTabletCountersBase Counters; + NMonitoring::TDynamicCounterPtr StreamCountersSubgroup; TMaybe<TActorId> TabletActor; TMaybe<TActorId> PartitionActor; diff --git a/ydb/core/persqueue/ut/counters_ut.cpp b/ydb/core/persqueue/ut/counters_ut.cpp index 0c351f5106..04aae14c11 100644 --- a/ydb/core/persqueue/ut/counters_ut.cpp +++ b/ydb/core/persqueue/ut/counters_ut.cpp @@ -147,9 +147,8 @@ void CompareJsons(const TString& inputStr, const TString& referenceStr) { NJson::TJsonValue inputJson; UNIT_ASSERT(NJson::ReadJsonTree(TStringBuf(inputStr), &inputJson)); - // Run time of test differs as well as counters below. We check if they are in - // probable interval [4500; 5500], set it to 5000 and then compare with reference - // string. + // Run time of test differs as well as counters below. + // We set it to 5000 and then compare with reference string. auto getByPath = [](const NJson::TJsonValue& msg, TStringBuf path) { NJson::TJsonValue ret; UNIT_ASSERT_C(msg.GetValueByPath(path, ret), path); @@ -162,20 +161,31 @@ void CompareJsons(const TString& inputStr, const TString& referenceStr) { getByPath(sensor, "labels.sensor") == "PQ/PartitionLifeTimeMs" || getByPath(sensor, "labels.sensor") == "PQ/TotalTimeLagMsByLastRead" || getByPath(sensor, "labels.sensor") == "PQ/WriteTimeLagMsByLastReadOld")) { - auto value = sensor["value"].GetIntegerSafe(); - UNIT_ASSERT_GT(value, 4500); - UNIT_ASSERT_LT(value, 5500); sensor.SetValueByPath("value", 5000); } else if (getByPath(sensor, "kind") == "GAUGE" && (getByPath(sensor, "labels.sensor") == "PQ/WriteTimeLagMsByLastRead" || getByPath(sensor, "labels.sensor") == "PQ/WriteTimeLagMsByLastWrite")) { - auto value = sensor["value"].GetIntegerSafe(); - UNIT_ASSERT_GT_C(value, 100, "value is " << value); - UNIT_ASSERT_LT_C(value, 3000, "value is " << value); sensor.SetValueByPath("value", 30); } } - UNIT_ASSERT_VALUES_EQUAL(referenceJson, inputJson); + + Cerr << "Test diff count : " << inputJson["sensors"].GetArraySafe().size() + << " " << referenceJson["sensors"].GetArraySafe().size() << Endl; + + ui64 inCount = inputJson["sensors"].GetArraySafe().size(); + ui64 refCount = referenceJson["sensors"].GetArraySafe().size(); + for (ui64 i = 0; i < inCount && i < refCount; ++i) { + auto& in = inputJson["sensors"].GetArraySafe()[i]; + auto& ref = referenceJson["sensors"].GetArraySafe()[i]; + UNIT_ASSERT_VALUES_EQUAL_C(in["labels"], ref["labels"], TStringBuilder() << " at pos #" << i); + } + if (inCount > refCount) { + UNIT_ASSERT_C(false, inputJson["sensors"].GetArraySafe()[refCount].GetStringRobust()); + } else if (refCount > inCount) { + UNIT_ASSERT_C(false, referenceJson["sensors"].GetArraySafe()[inCount].GetStringRobust()); + } + + //UNIT_ASSERT_VALUES_EQUAL(referenceJson, inputJson); } Y_UNIT_TEST(Partition) { |