aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkomels <komels@yandex-team.ru>2022-03-18 15:19:58 +0300
committerkomels <komels@yandex-team.ru>2022-03-18 15:19:58 +0300
commite0ddcb1e3c9d4c86b210bc1c5a51bc74c0f1feeb (patch)
treea8937fc19c6f7f55da2bf6834875527a37c29459
parent395dc6b898267eeedc05cb440827a22b6f71ec69 (diff)
downloadydb-e0ddcb1e3c9d4c86b210bc1c5a51bc74c0f1feeb.tar.gz
Expiring ymq counters LOGBROKER-6987
ref:d68d767da082009458d6050f75e06aa9861621d4
-rw-r--r--ydb/core/ymq/actor/action.h9
-rw-r--r--ydb/core/ymq/actor/proxy_actor.cpp6
-rw-r--r--ydb/core/ymq/actor/service.cpp46
-rw-r--r--ydb/core/ymq/base/counters.cpp58
-rw-r--r--ydb/core/ymq/base/counters.h19
5 files changed, 106 insertions, 32 deletions
diff --git a/ydb/core/ymq/actor/action.h b/ydb/core/ymq/actor/action.h
index 1801c98d68..ccdd99ba83 100644
--- a/ydb/core/ymq/actor/action.h
+++ b/ydb/core/ymq/actor/action.h
@@ -221,19 +221,18 @@ protected:
} else if (UserCounters_) {
INC_COUNTER(UserCounters_, RequestTimeouts);
} else {
- TIntrusivePtrCntrCouple rootCounters {
+ TIntrusivePtrCntrCouple rootCounters{
SqsCoreCounters_ ? SqsCoreCounters_ : GetSqsServiceCounters(AppData()->Counters, "core"),
GetYmqPublicCounters(AppData()->Counters)
};
- auto [userCounters, queueCounters] = GetUserAndQueueCounters(rootCounters, TQueuePath(Cfg().GetRoot(), UserName_, GetQueueName()));
+ auto[userCounters, queueCounters] = GetUserAndQueueCounters(rootCounters,
+ TQueuePath(Cfg().GetRoot(), UserName_,
+ GetQueueName()));
if (queueCounters.SqsCounters) {
queueCounters.SqsCounters->GetCounter("RequestTimeouts", true)->Inc();
} else if (userCounters.SqsCounters) {
userCounters.SqsCounters->GetCounter("RequestTimeouts", true)->Inc();
}
- if (queueCounters.YmqCounters) {
- queueCounters.YmqCounters->GetCounter("RequestTimeouts", true)->Inc();
- }
}
MakeError(MutableErrorDesc(), NErrors::TIMEOUT);
diff --git a/ydb/core/ymq/actor/proxy_actor.cpp b/ydb/core/ymq/actor/proxy_actor.cpp
index 98d85210a5..b482f88cce 100644
--- a/ydb/core/ymq/actor/proxy_actor.cpp
+++ b/ydb/core/ymq/actor/proxy_actor.cpp
@@ -155,9 +155,9 @@ void TProxyActor::HandleWakeup(TEvWakeup::TPtr&) {
if (queueCountersCouple.SqsCounters) {
queueCountersCouple.SqsCounters->GetCounter("RequestTimeouts", true)->Inc();
}
- if (queueCountersCouple.YmqCounters) {
- queueCountersCouple.YmqCounters->GetCounter("request_timeouts_count_per_second", true)->Inc();
- }
+// if (queueCountersCouple.YmqCounters) {
+// queueCountersCouple.YmqCounters->GetCounter("request_timeouts_count_per_second", true)->Inc();
+// }
}
SendErrorAndDie(NErrors::TIMEOUT);
diff --git a/ydb/core/ymq/actor/service.cpp b/ydb/core/ymq/actor/service.cpp
index 34c9a4ae46..2757d23a38 100644
--- a/ydb/core/ymq/actor/service.cpp
+++ b/ydb/core/ymq/actor/service.cpp
@@ -71,6 +71,7 @@ struct TSqsService::TQueueInfo : public TAtomicRefCount<TQueueInfo> {
TQueueInfo(
TString userName, TString queueName, TString rootUrl, ui64 leaderTabletId, TString customName,
TString folderId, ui64 version, ui64 shardsCount, const TIntrusivePtr<TUserCounters>& userCounters,
+ const TIntrusivePtr<TFolderCounters>& folderCounters,
const TActorId& schemeCache, TIntrusivePtr<TSqsEvents::TQuoterResourcesForActions> quoterResourcesForUser,
bool insertCounters
)
@@ -84,6 +85,7 @@ struct TSqsService::TQueueInfo : public TAtomicRefCount<TQueueInfo> {
, LeaderTabletId_(leaderTabletId)
, Counters_(userCounters->CreateQueueCounters(QueueName_, FolderId_, insertCounters))
, UserCounters_(userCounters)
+ , FolderCounters_(folderCounters)
, SchemeCache_(schemeCache)
, QuoterResourcesForUser_(std::move(quoterResourcesForUser))
{
@@ -129,14 +131,10 @@ struct TSqsService::TQueueInfo : public TAtomicRefCount<TQueueInfo> {
LocalLeader_ = TActivationContext::Register(new TQueueLeader(UserName_, QueueName_, FolderId_, RootUrl_, Counters_, UserCounters_, SchemeCache_, QuoterResourcesForUser_));
LOG_SQS_INFO("Start local leader [" << UserName_ << "/" << QueueName_ << "] actor " << LocalLeader_);
- // ToDo: Should better make TFolderCounters struct and move it there.
- // Will have to refactor TQueueCounters a bit, since it directly works with TUserCounters
if (FolderId_) {
- auto folderCounters = GetFolderCounters(UserCounters_->UserCounters, FolderId_);
- if (folderCounters.YmqCounters) {
- auto counter = folderCounters.YmqCounters->GetCounter("queue.total_count", false);
- (*counter)++;
- }
+ Y_VERIFY(FolderCounters_);
+ FolderCounters_->InitCounters();
+ INC_COUNTER(FolderCounters_, total_count);
}
}
}
@@ -149,14 +147,8 @@ struct TSqsService::TQueueInfo : public TAtomicRefCount<TQueueInfo> {
TActivationContext::Send(new IEventHandle(LocalLeader_, SelfId(), new TEvPoisonPill()));
LocalLeader_ = TActorId();
if (FolderId_) {
- auto folderCounters = GetFolderCounters(UserCounters_->UserCounters, FolderId_);
- if (folderCounters.YmqCounters) {
- auto counter = folderCounters.YmqCounters->GetCounter("queue.total_count", false);
- counter->Dec();
- if (counter->Val() == 0) {
- RemoveFolderCounters(UserCounters_->UserCounters, FolderId_);
- }
- }
+ Y_VERIFY(FolderCounters_);
+ DEC_COUNTER(FolderCounters_, total_count);
}
}
}
@@ -188,6 +180,7 @@ struct TSqsService::TQueueInfo : public TAtomicRefCount<TQueueInfo> {
ui64 LeaderTabletId_ = 0;
TIntrusivePtr<TQueueCounters> Counters_;
TIntrusivePtr<TUserCounters> UserCounters_;
+ TIntrusivePtr<TFolderCounters> FolderCounters_;
TActorId PipeClient_;
TActorId LeaderPipeServer_;
TActorId LocalLeader_;
@@ -251,6 +244,7 @@ struct TSqsService::TUserInfo : public TAtomicRefCount<TUserInfo> {
std::shared_ptr<const std::map<TString, TString>> Settings_ = std::make_shared<const std::map<TString, TString>>();
TIntrusivePtr<TUserCounters> Counters_;
std::map<TString, TSqsService::TQueueInfoPtr> Queues_;
+ std::map<TString, TIntrusivePtr<TFolderCounters>> FolderCounters_;
THashMap<std::pair<TString, TString>, TSqsService::TQueueInfoPtr> QueueByNameAndFolder_; // <custom name, folder id> -> queue info
TIntrusivePtr<TSqsEvents::TQuoterResourcesForActions> QuoterResources_;
TLocalRateLimiterResource CreateObjectsQuoterResource_;
@@ -1024,7 +1018,10 @@ void TSqsService::RemoveQueue(const TString& userName, const TString& queue) {
queuePtr->GetLeaderNodeRequests_.clear();
LeaderTabletIdToQueue_.erase(queuePtr->LeaderTabletId_);
userIt->second->QueueByNameAndFolder_.erase(std::make_pair(queuePtr->CustomName_, queuePtr->FolderId_));
-
+ auto queuesCount = userIt->second->CountQueuesInFolder(queuePtr->FolderId_);
+ if (!queuesCount) {
+ userIt->second->FolderCounters_.erase(queuePtr->FolderId_);
+ }
userIt->second->Queues_.erase(queueIt);
queuePtr->Counters_->RemoveCounters();
}
@@ -1041,15 +1038,24 @@ std::map<TString, TSqsService::TQueueInfoPtr>::iterator TSqsService::AddQueue(co
const TInstant now = TActivationContext::Now();
const TInstant timeToInsertCounters = createdTimestamp + TDuration::MilliSeconds(Cfg().GetQueueCountersExportDelayMs());
const bool insertCounters = now >= timeToInsertCounters;
- auto ret = user->Queues_.insert(std::make_pair(queue, TQueueInfoPtr(new TQueueInfo(userName, queue, RootUrl_, leaderTabletId, customName, folderId, version, shardsCount, user->Counters_, SchemeCache_, user->QuoterResources_, insertCounters)))).first;
- auto queueInfo = ret->second;
- LeaderTabletIdToQueue_[leaderTabletId] = queueInfo;
- user->QueueByNameAndFolder_.emplace(std::make_pair(customName, folderId), queueInfo);
+ auto folderCntrIter = user->FolderCounters_.find(folderId);
+ if (folderCntrIter == user->FolderCounters_.end()) {
+ folderCntrIter = user->FolderCounters_.insert(std::make_pair(folderId, user->Counters_->CreateFolderCounters(folderId, true))).first;
+ }
if (!insertCounters) {
Schedule(timeToInsertCounters - now, new TSqsEvents::TEvInsertQueueCounters(userName, queue, leaderTabletId));
}
+ auto ret = user->Queues_.insert(std::make_pair(queue, TQueueInfoPtr(new TQueueInfo(
+ userName, queue, RootUrl_, leaderTabletId, customName, folderId, version, shardsCount,
+ user->Counters_, folderCntrIter->second, SchemeCache_, user->QuoterResources_, insertCounters)))
+ ).first;
+
+ auto queueInfo = ret->second;
+ LeaderTabletIdToQueue_[leaderTabletId] = queueInfo;
+ user->QueueByNameAndFolder_.emplace(std::make_pair(customName, folderId), queueInfo);
+
{
auto requests = user->GetLeaderNodeRequests_.equal_range(queue);
for (auto i = requests.first; i != requests.second; ++i) {
diff --git a/ydb/core/ymq/base/counters.cpp b/ydb/core/ymq/base/counters.cpp
index 90a9f5000f..36486efcca 100644
--- a/ydb/core/ymq/base/counters.cpp
+++ b/ydb/core/ymq/base/counters.cpp
@@ -377,7 +377,10 @@ ELaziness Lazy(const NKikimrConfig::TSqsConfig& cfg) {
#define INIT_COUNTERS_COUPLE_WITH_NAMES(rootCounters, sqsCounter, ymqCounter, sqsName, ymqName, expiring, valueType, lazy, aggr) \
sqsCounter.Init(rootCounters.SqsCounters, expiring, valueType, sqsName, lazy); \
if (rootCounters.YmqCounters && !aggr) { \
- ymqCounter.Init(rootCounters.YmqCounters, expiring, valueType, DEFAULT_YMQ_COUNTER_NAME, ymqName, ELaziness::OnStart); \
+ ymqCounter.Init( \
+ rootCounters.YmqCounters, ELifetime::Expiring, valueType, DEFAULT_YMQ_COUNTER_NAME, ymqName, \
+ ELaziness::OnStart \
+ ); \
}
#define INIT_COUNTER(rootCounters, variable, expiring, valueType, lazy) \
@@ -389,7 +392,7 @@ ELaziness Lazy(const NKikimrConfig::TSqsConfig& cfg) {
#define INIT_HISTOGRAMS_COUPLE_WITH_NAMES(rootCounters, sqsHistogram, ymqHistogram, sqsName, ymqName, expiring, sqsBuckets, ymqBuckets, lazy, aggr) \
sqsHistogram.Init(rootCounters.SqsCounters, expiring, sqsBuckets, sqsName, lazy); \
if (rootCounters.YmqCounters && !aggr) { \
- ymqHistogram.Init(rootCounters.YmqCounters, expiring, ymqBuckets, DEFAULT_YMQ_COUNTER_NAME, ymqName, lazy); \
+ ymqHistogram.Init(rootCounters.YmqCounters, ELifetime::Expiring, ymqBuckets, DEFAULT_YMQ_COUNTER_NAME, ymqName, lazy); \
}
#define INIT_HISTOGRAMS_COUPLE_WITH_BUCKETS(rootCounters, sqsHistogram, ymqHistogram, expiring, sqsBuckets, ymqBuckets, lazy, aggr) \
@@ -425,10 +428,10 @@ void TYmqActionCounters::Init(
SubGroup = rootCounters->GetSubgroup(labelName, methodName);
INIT_COUNTER_WITH_NAME_AND_LABEL(
SubGroup, Success, DEFAULT_YMQ_COUNTER_NAME, TStringBuilder() << namePrefix << "requests_count_per_second",
- lifetime,EValueType::Derivative, ELaziness::OnStart);
+ lifetime, EValueType::Derivative, ELaziness::OnStart);
INIT_COUNTER_WITH_NAME_AND_LABEL(
SubGroup, Errors, DEFAULT_YMQ_COUNTER_NAME, TStringBuilder() << namePrefix << "errors_count_per_second",
- lifetime,EValueType::Derivative, ELaziness::OnStart
+ lifetime, EValueType::Derivative, ELaziness::OnStart
);
// ! - No inflight counter
@@ -513,6 +516,49 @@ void TAPIStatusesCounters::SetAggregatedParent(TAPIStatusesCounters* parent) {
UnknownCounter.SetAggregatedParent(parent ? &parent->UnknownCounter : nullptr);
}
+TFolderCounters::TFolderCounters(const TUserCounters* userCounters, const TString& folderId, bool insertCounters)
+ : UserCounters(userCounters->UserCounters)
+ , FolderId(folderId)
+{
+ if (insertCounters) {
+ FolderCounters = GetFolderCounters(UserCounters, folderId);
+ } else {
+ FolderCounters = {new NMonitoring::TDynamicCounters(), new NMonitoring::TDynamicCounters()};
+ }
+ //InitCounters();
+}
+
+void TFolderCounters::InitCounters() {
+ if (Inited) {
+ return;
+ }
+ Inited = true;
+
+ InsertCounters();
+ if (UserCounters.YmqCounters) {
+ total_count.Init(
+ FolderCounters.YmqCounters, ELifetime::Expiring, EValueType::Absolute, DEFAULT_COUNTER_NAME,
+ "queue.total_count",
+ ELaziness::OnStart
+ );
+ }
+}
+
+
+void TFolderCounters::InsertCounters() {
+
+ if (UserCounters.Defined()) {
+ if (!UserCounters.SqsCounters->FindSubgroup(FOLDER_LABEL, FolderId)) {
+ FolderCounters.SqsCounters->ResetCounters();
+ UserCounters.SqsCounters->RegisterSubgroup(FOLDER_LABEL, FolderId, FolderCounters.SqsCounters);
+ }
+ if (UserCounters.YmqCounters && !UserCounters.YmqCounters->FindSubgroup(FOLDER_LABEL, FolderId)) {
+ FolderCounters.YmqCounters->ResetCounters();
+ UserCounters.YmqCounters->RegisterSubgroup(FOLDER_LABEL, FolderId, FolderCounters.YmqCounters);
+ }
+ }
+}
+
TQueueCounters::TQueueCounters(const NKikimrConfig::TSqsConfig& cfg,
const TIntrusivePtrCntrCouple& rootCounters,
const TUserCounters* userCounters,
@@ -790,6 +836,10 @@ void TUserCounters::TDetailedCounters::SetAggregatedParent(TUserCounters::TDetai
UserDetailedCountersDescriptor.SetAggregatedParent(this, parent);
}
+TIntrusivePtr<TFolderCounters> TUserCounters::CreateFolderCounters(const TString& folderId, bool insertCounters) {
+ return new TFolderCounters(this, folderId, insertCounters);
+}
+
TIntrusivePtr<TQueueCounters> TUserCounters::CreateQueueCounters(const TString& queueName, const TString& folderId, bool insertCounters) {
auto counters = CreateQueueCountersImpl(queueName, folderId, insertCounters, IsAggregatedCounters);
counters->SetAggregatedParent(AggregatedQueueCounters);
diff --git a/ydb/core/ymq/base/counters.h b/ydb/core/ymq/base/counters.h
index 3c2667b241..8851cdff71 100644
--- a/ydb/core/ymq/base/counters.h
+++ b/ydb/core/ymq/base/counters.h
@@ -20,6 +20,7 @@
namespace NKikimr::NSQS {
struct TUserCounters;
+struct TFolderCounters;
struct TQueueCounters;
struct THttpCounters;
struct TCloudAuthCounters;
@@ -508,6 +509,7 @@ struct TUserCounters : public TAtomicRefCount<TUserCounters> {
}
TIntrusivePtr<TQueueCounters> CreateQueueCounters(const TString& queueName, const TString& folderId, bool insertCounters);
+ TIntrusivePtr<TFolderCounters> CreateFolderCounters(const TString& folderId, bool insertCounters);
void RemoveCounters();
@@ -554,6 +556,23 @@ private:
bool IsAggregatedCounters;
};
+struct TFolderCounters : public TAtomicRefCount<TFolderCounters> {
+ TLazyCachedCounter total_count; // Total queues in folder.
+
+ TFolderCounters(const TUserCounters* userCounters, const TString& folderId, bool insertCounters);
+
+ void InitCounters();
+ void InsertCounters();
+
+private:
+ TIntrusivePtrCntrCouple UserCounters; // User tree in core subsystem
+ TIntrusivePtrCntrCouple FolderCounters; // Folder tree in core subsystem
+
+ const TString FolderId;
+ bool Inited = false;
+
+};
+
// Queue counters in SQS core subsystem.
struct TQueueCounters : public TAtomicRefCount<TQueueCounters> {
// Action types are declared in ydb/core/ymq/base/action.h.