diff options
author | komels <komels@yandex-team.ru> | 2022-03-18 15:19:58 +0300 |
---|---|---|
committer | komels <komels@yandex-team.ru> | 2022-03-18 15:19:58 +0300 |
commit | e0ddcb1e3c9d4c86b210bc1c5a51bc74c0f1feeb (patch) | |
tree | a8937fc19c6f7f55da2bf6834875527a37c29459 | |
parent | 395dc6b898267eeedc05cb440827a22b6f71ec69 (diff) | |
download | ydb-e0ddcb1e3c9d4c86b210bc1c5a51bc74c0f1feeb.tar.gz |
Expiring ymq counters LOGBROKER-6987
ref:d68d767da082009458d6050f75e06aa9861621d4
-rw-r--r-- | ydb/core/ymq/actor/action.h | 9 | ||||
-rw-r--r-- | ydb/core/ymq/actor/proxy_actor.cpp | 6 | ||||
-rw-r--r-- | ydb/core/ymq/actor/service.cpp | 46 | ||||
-rw-r--r-- | ydb/core/ymq/base/counters.cpp | 58 | ||||
-rw-r--r-- | ydb/core/ymq/base/counters.h | 19 |
5 files changed, 106 insertions, 32 deletions
diff --git a/ydb/core/ymq/actor/action.h b/ydb/core/ymq/actor/action.h index 1801c98d68..ccdd99ba83 100644 --- a/ydb/core/ymq/actor/action.h +++ b/ydb/core/ymq/actor/action.h @@ -221,19 +221,18 @@ protected: } else if (UserCounters_) { INC_COUNTER(UserCounters_, RequestTimeouts); } else { - TIntrusivePtrCntrCouple rootCounters { + TIntrusivePtrCntrCouple rootCounters{ SqsCoreCounters_ ? SqsCoreCounters_ : GetSqsServiceCounters(AppData()->Counters, "core"), GetYmqPublicCounters(AppData()->Counters) }; - auto [userCounters, queueCounters] = GetUserAndQueueCounters(rootCounters, TQueuePath(Cfg().GetRoot(), UserName_, GetQueueName())); + auto[userCounters, queueCounters] = GetUserAndQueueCounters(rootCounters, + TQueuePath(Cfg().GetRoot(), UserName_, + GetQueueName())); if (queueCounters.SqsCounters) { queueCounters.SqsCounters->GetCounter("RequestTimeouts", true)->Inc(); } else if (userCounters.SqsCounters) { userCounters.SqsCounters->GetCounter("RequestTimeouts", true)->Inc(); } - if (queueCounters.YmqCounters) { - queueCounters.YmqCounters->GetCounter("RequestTimeouts", true)->Inc(); - } } MakeError(MutableErrorDesc(), NErrors::TIMEOUT); diff --git a/ydb/core/ymq/actor/proxy_actor.cpp b/ydb/core/ymq/actor/proxy_actor.cpp index 98d85210a5..b482f88cce 100644 --- a/ydb/core/ymq/actor/proxy_actor.cpp +++ b/ydb/core/ymq/actor/proxy_actor.cpp @@ -155,9 +155,9 @@ void TProxyActor::HandleWakeup(TEvWakeup::TPtr&) { if (queueCountersCouple.SqsCounters) { queueCountersCouple.SqsCounters->GetCounter("RequestTimeouts", true)->Inc(); } - if (queueCountersCouple.YmqCounters) { - queueCountersCouple.YmqCounters->GetCounter("request_timeouts_count_per_second", true)->Inc(); - } +// if (queueCountersCouple.YmqCounters) { +// queueCountersCouple.YmqCounters->GetCounter("request_timeouts_count_per_second", true)->Inc(); +// } } SendErrorAndDie(NErrors::TIMEOUT); diff --git a/ydb/core/ymq/actor/service.cpp b/ydb/core/ymq/actor/service.cpp index 34c9a4ae46..2757d23a38 100644 --- a/ydb/core/ymq/actor/service.cpp +++ b/ydb/core/ymq/actor/service.cpp @@ -71,6 +71,7 @@ struct TSqsService::TQueueInfo : public TAtomicRefCount<TQueueInfo> { TQueueInfo( TString userName, TString queueName, TString rootUrl, ui64 leaderTabletId, TString customName, TString folderId, ui64 version, ui64 shardsCount, const TIntrusivePtr<TUserCounters>& userCounters, + const TIntrusivePtr<TFolderCounters>& folderCounters, const TActorId& schemeCache, TIntrusivePtr<TSqsEvents::TQuoterResourcesForActions> quoterResourcesForUser, bool insertCounters ) @@ -84,6 +85,7 @@ struct TSqsService::TQueueInfo : public TAtomicRefCount<TQueueInfo> { , LeaderTabletId_(leaderTabletId) , Counters_(userCounters->CreateQueueCounters(QueueName_, FolderId_, insertCounters)) , UserCounters_(userCounters) + , FolderCounters_(folderCounters) , SchemeCache_(schemeCache) , QuoterResourcesForUser_(std::move(quoterResourcesForUser)) { @@ -129,14 +131,10 @@ struct TSqsService::TQueueInfo : public TAtomicRefCount<TQueueInfo> { LocalLeader_ = TActivationContext::Register(new TQueueLeader(UserName_, QueueName_, FolderId_, RootUrl_, Counters_, UserCounters_, SchemeCache_, QuoterResourcesForUser_)); LOG_SQS_INFO("Start local leader [" << UserName_ << "/" << QueueName_ << "] actor " << LocalLeader_); - // ToDo: Should better make TFolderCounters struct and move it there. - // Will have to refactor TQueueCounters a bit, since it directly works with TUserCounters if (FolderId_) { - auto folderCounters = GetFolderCounters(UserCounters_->UserCounters, FolderId_); - if (folderCounters.YmqCounters) { - auto counter = folderCounters.YmqCounters->GetCounter("queue.total_count", false); - (*counter)++; - } + Y_VERIFY(FolderCounters_); + FolderCounters_->InitCounters(); + INC_COUNTER(FolderCounters_, total_count); } } } @@ -149,14 +147,8 @@ struct TSqsService::TQueueInfo : public TAtomicRefCount<TQueueInfo> { TActivationContext::Send(new IEventHandle(LocalLeader_, SelfId(), new TEvPoisonPill())); LocalLeader_ = TActorId(); if (FolderId_) { - auto folderCounters = GetFolderCounters(UserCounters_->UserCounters, FolderId_); - if (folderCounters.YmqCounters) { - auto counter = folderCounters.YmqCounters->GetCounter("queue.total_count", false); - counter->Dec(); - if (counter->Val() == 0) { - RemoveFolderCounters(UserCounters_->UserCounters, FolderId_); - } - } + Y_VERIFY(FolderCounters_); + DEC_COUNTER(FolderCounters_, total_count); } } } @@ -188,6 +180,7 @@ struct TSqsService::TQueueInfo : public TAtomicRefCount<TQueueInfo> { ui64 LeaderTabletId_ = 0; TIntrusivePtr<TQueueCounters> Counters_; TIntrusivePtr<TUserCounters> UserCounters_; + TIntrusivePtr<TFolderCounters> FolderCounters_; TActorId PipeClient_; TActorId LeaderPipeServer_; TActorId LocalLeader_; @@ -251,6 +244,7 @@ struct TSqsService::TUserInfo : public TAtomicRefCount<TUserInfo> { std::shared_ptr<const std::map<TString, TString>> Settings_ = std::make_shared<const std::map<TString, TString>>(); TIntrusivePtr<TUserCounters> Counters_; std::map<TString, TSqsService::TQueueInfoPtr> Queues_; + std::map<TString, TIntrusivePtr<TFolderCounters>> FolderCounters_; THashMap<std::pair<TString, TString>, TSqsService::TQueueInfoPtr> QueueByNameAndFolder_; // <custom name, folder id> -> queue info TIntrusivePtr<TSqsEvents::TQuoterResourcesForActions> QuoterResources_; TLocalRateLimiterResource CreateObjectsQuoterResource_; @@ -1024,7 +1018,10 @@ void TSqsService::RemoveQueue(const TString& userName, const TString& queue) { queuePtr->GetLeaderNodeRequests_.clear(); LeaderTabletIdToQueue_.erase(queuePtr->LeaderTabletId_); userIt->second->QueueByNameAndFolder_.erase(std::make_pair(queuePtr->CustomName_, queuePtr->FolderId_)); - + auto queuesCount = userIt->second->CountQueuesInFolder(queuePtr->FolderId_); + if (!queuesCount) { + userIt->second->FolderCounters_.erase(queuePtr->FolderId_); + } userIt->second->Queues_.erase(queueIt); queuePtr->Counters_->RemoveCounters(); } @@ -1041,15 +1038,24 @@ std::map<TString, TSqsService::TQueueInfoPtr>::iterator TSqsService::AddQueue(co const TInstant now = TActivationContext::Now(); const TInstant timeToInsertCounters = createdTimestamp + TDuration::MilliSeconds(Cfg().GetQueueCountersExportDelayMs()); const bool insertCounters = now >= timeToInsertCounters; - auto ret = user->Queues_.insert(std::make_pair(queue, TQueueInfoPtr(new TQueueInfo(userName, queue, RootUrl_, leaderTabletId, customName, folderId, version, shardsCount, user->Counters_, SchemeCache_, user->QuoterResources_, insertCounters)))).first; - auto queueInfo = ret->second; - LeaderTabletIdToQueue_[leaderTabletId] = queueInfo; - user->QueueByNameAndFolder_.emplace(std::make_pair(customName, folderId), queueInfo); + auto folderCntrIter = user->FolderCounters_.find(folderId); + if (folderCntrIter == user->FolderCounters_.end()) { + folderCntrIter = user->FolderCounters_.insert(std::make_pair(folderId, user->Counters_->CreateFolderCounters(folderId, true))).first; + } if (!insertCounters) { Schedule(timeToInsertCounters - now, new TSqsEvents::TEvInsertQueueCounters(userName, queue, leaderTabletId)); } + auto ret = user->Queues_.insert(std::make_pair(queue, TQueueInfoPtr(new TQueueInfo( + userName, queue, RootUrl_, leaderTabletId, customName, folderId, version, shardsCount, + user->Counters_, folderCntrIter->second, SchemeCache_, user->QuoterResources_, insertCounters))) + ).first; + + auto queueInfo = ret->second; + LeaderTabletIdToQueue_[leaderTabletId] = queueInfo; + user->QueueByNameAndFolder_.emplace(std::make_pair(customName, folderId), queueInfo); + { auto requests = user->GetLeaderNodeRequests_.equal_range(queue); for (auto i = requests.first; i != requests.second; ++i) { diff --git a/ydb/core/ymq/base/counters.cpp b/ydb/core/ymq/base/counters.cpp index 90a9f5000f..36486efcca 100644 --- a/ydb/core/ymq/base/counters.cpp +++ b/ydb/core/ymq/base/counters.cpp @@ -377,7 +377,10 @@ ELaziness Lazy(const NKikimrConfig::TSqsConfig& cfg) { #define INIT_COUNTERS_COUPLE_WITH_NAMES(rootCounters, sqsCounter, ymqCounter, sqsName, ymqName, expiring, valueType, lazy, aggr) \ sqsCounter.Init(rootCounters.SqsCounters, expiring, valueType, sqsName, lazy); \ if (rootCounters.YmqCounters && !aggr) { \ - ymqCounter.Init(rootCounters.YmqCounters, expiring, valueType, DEFAULT_YMQ_COUNTER_NAME, ymqName, ELaziness::OnStart); \ + ymqCounter.Init( \ + rootCounters.YmqCounters, ELifetime::Expiring, valueType, DEFAULT_YMQ_COUNTER_NAME, ymqName, \ + ELaziness::OnStart \ + ); \ } #define INIT_COUNTER(rootCounters, variable, expiring, valueType, lazy) \ @@ -389,7 +392,7 @@ ELaziness Lazy(const NKikimrConfig::TSqsConfig& cfg) { #define INIT_HISTOGRAMS_COUPLE_WITH_NAMES(rootCounters, sqsHistogram, ymqHistogram, sqsName, ymqName, expiring, sqsBuckets, ymqBuckets, lazy, aggr) \ sqsHistogram.Init(rootCounters.SqsCounters, expiring, sqsBuckets, sqsName, lazy); \ if (rootCounters.YmqCounters && !aggr) { \ - ymqHistogram.Init(rootCounters.YmqCounters, expiring, ymqBuckets, DEFAULT_YMQ_COUNTER_NAME, ymqName, lazy); \ + ymqHistogram.Init(rootCounters.YmqCounters, ELifetime::Expiring, ymqBuckets, DEFAULT_YMQ_COUNTER_NAME, ymqName, lazy); \ } #define INIT_HISTOGRAMS_COUPLE_WITH_BUCKETS(rootCounters, sqsHistogram, ymqHistogram, expiring, sqsBuckets, ymqBuckets, lazy, aggr) \ @@ -425,10 +428,10 @@ void TYmqActionCounters::Init( SubGroup = rootCounters->GetSubgroup(labelName, methodName); INIT_COUNTER_WITH_NAME_AND_LABEL( SubGroup, Success, DEFAULT_YMQ_COUNTER_NAME, TStringBuilder() << namePrefix << "requests_count_per_second", - lifetime,EValueType::Derivative, ELaziness::OnStart); + lifetime, EValueType::Derivative, ELaziness::OnStart); INIT_COUNTER_WITH_NAME_AND_LABEL( SubGroup, Errors, DEFAULT_YMQ_COUNTER_NAME, TStringBuilder() << namePrefix << "errors_count_per_second", - lifetime,EValueType::Derivative, ELaziness::OnStart + lifetime, EValueType::Derivative, ELaziness::OnStart ); // ! - No inflight counter @@ -513,6 +516,49 @@ void TAPIStatusesCounters::SetAggregatedParent(TAPIStatusesCounters* parent) { UnknownCounter.SetAggregatedParent(parent ? &parent->UnknownCounter : nullptr); } +TFolderCounters::TFolderCounters(const TUserCounters* userCounters, const TString& folderId, bool insertCounters) + : UserCounters(userCounters->UserCounters) + , FolderId(folderId) +{ + if (insertCounters) { + FolderCounters = GetFolderCounters(UserCounters, folderId); + } else { + FolderCounters = {new NMonitoring::TDynamicCounters(), new NMonitoring::TDynamicCounters()}; + } + //InitCounters(); +} + +void TFolderCounters::InitCounters() { + if (Inited) { + return; + } + Inited = true; + + InsertCounters(); + if (UserCounters.YmqCounters) { + total_count.Init( + FolderCounters.YmqCounters, ELifetime::Expiring, EValueType::Absolute, DEFAULT_COUNTER_NAME, + "queue.total_count", + ELaziness::OnStart + ); + } +} + + +void TFolderCounters::InsertCounters() { + + if (UserCounters.Defined()) { + if (!UserCounters.SqsCounters->FindSubgroup(FOLDER_LABEL, FolderId)) { + FolderCounters.SqsCounters->ResetCounters(); + UserCounters.SqsCounters->RegisterSubgroup(FOLDER_LABEL, FolderId, FolderCounters.SqsCounters); + } + if (UserCounters.YmqCounters && !UserCounters.YmqCounters->FindSubgroup(FOLDER_LABEL, FolderId)) { + FolderCounters.YmqCounters->ResetCounters(); + UserCounters.YmqCounters->RegisterSubgroup(FOLDER_LABEL, FolderId, FolderCounters.YmqCounters); + } + } +} + TQueueCounters::TQueueCounters(const NKikimrConfig::TSqsConfig& cfg, const TIntrusivePtrCntrCouple& rootCounters, const TUserCounters* userCounters, @@ -790,6 +836,10 @@ void TUserCounters::TDetailedCounters::SetAggregatedParent(TUserCounters::TDetai UserDetailedCountersDescriptor.SetAggregatedParent(this, parent); } +TIntrusivePtr<TFolderCounters> TUserCounters::CreateFolderCounters(const TString& folderId, bool insertCounters) { + return new TFolderCounters(this, folderId, insertCounters); +} + TIntrusivePtr<TQueueCounters> TUserCounters::CreateQueueCounters(const TString& queueName, const TString& folderId, bool insertCounters) { auto counters = CreateQueueCountersImpl(queueName, folderId, insertCounters, IsAggregatedCounters); counters->SetAggregatedParent(AggregatedQueueCounters); diff --git a/ydb/core/ymq/base/counters.h b/ydb/core/ymq/base/counters.h index 3c2667b241..8851cdff71 100644 --- a/ydb/core/ymq/base/counters.h +++ b/ydb/core/ymq/base/counters.h @@ -20,6 +20,7 @@ namespace NKikimr::NSQS { struct TUserCounters; +struct TFolderCounters; struct TQueueCounters; struct THttpCounters; struct TCloudAuthCounters; @@ -508,6 +509,7 @@ struct TUserCounters : public TAtomicRefCount<TUserCounters> { } TIntrusivePtr<TQueueCounters> CreateQueueCounters(const TString& queueName, const TString& folderId, bool insertCounters); + TIntrusivePtr<TFolderCounters> CreateFolderCounters(const TString& folderId, bool insertCounters); void RemoveCounters(); @@ -554,6 +556,23 @@ private: bool IsAggregatedCounters; }; +struct TFolderCounters : public TAtomicRefCount<TFolderCounters> { + TLazyCachedCounter total_count; // Total queues in folder. + + TFolderCounters(const TUserCounters* userCounters, const TString& folderId, bool insertCounters); + + void InitCounters(); + void InsertCounters(); + +private: + TIntrusivePtrCntrCouple UserCounters; // User tree in core subsystem + TIntrusivePtrCntrCouple FolderCounters; // Folder tree in core subsystem + + const TString FolderId; + bool Inited = false; + +}; + // Queue counters in SQS core subsystem. struct TQueueCounters : public TAtomicRefCount<TQueueCounters> { // Action types are declared in ydb/core/ymq/base/action.h. |