diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-06-08 17:20:49 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-06-08 17:20:49 +0300 |
commit | 8bfba786a00c488c54b9950cea95da5a28e99be9 (patch) | |
tree | 1667a589e5890ee421225a5660051142f42acfe8 | |
parent | 723a9414949d935da94e2df3ce0b64d5c5497c8e (diff) | |
download | ydb-8bfba786a00c488c54b9950cea95da5a28e99be9.tar.gz |
add aggregation signals
-rw-r--r-- | ydb/core/tx/columnshard/counters/common/agent.cpp | 9 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/counters/common/agent.h | 3 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/counters/common/client.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/counters/common/client.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/counters/engine_logs.cpp | 1 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/counters/engine_logs.h | 111 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine_logs.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/storage/granule.cpp | 17 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/storage/granule.h | 35 |
9 files changed, 151 insertions, 32 deletions
diff --git a/ydb/core/tx/columnshard/counters/common/agent.cpp b/ydb/core/tx/columnshard/counters/common/agent.cpp index b86c4d92fcf..e34fe416ca3 100644 --- a/ydb/core/tx/columnshard/counters/common/agent.cpp +++ b/ydb/core/tx/columnshard/counters/common/agent.cpp @@ -11,14 +11,16 @@ TValueAggregationAgent::TValueAggregationAgent(const TString& signalName, const } -bool TValueAggregationAgent::CalcAggregations(i64& minValue, i64& maxValue) const { +bool TValueAggregationAgent::CalcAggregations(i64& sum, i64& minValue, i64& maxValue) const { const ui32 count = Values.size(); if (!count) { return false; } + sum = 0; minValue = Values.front(); maxValue = Values.front(); for (ui32 i = 0; i < count; ++i) { + sum += Values[i]; if (minValue > Values[i]) { minValue = Values[i]; } @@ -30,12 +32,13 @@ bool TValueAggregationAgent::CalcAggregations(i64& minValue, i64& maxValue) cons } std::optional<NKikimr::NColumnShard::TSignalAggregations> TValueAggregationAgent::GetAggregations() const { + i64 sum; i64 min; i64 max; - if (!CalcAggregations(min, max)) { + if (!CalcAggregations(sum, min, max)) { return {}; } - return TSignalAggregations(SumValue, min, max); + return TSignalAggregations(sum, min, max); } void TValueAggregationAgent::ResendStatus() const { diff --git a/ydb/core/tx/columnshard/counters/common/agent.h b/ydb/core/tx/columnshard/counters/common/agent.h index 9aa2b908ebc..c10ecb5dee8 100644 --- a/ydb/core/tx/columnshard/counters/common/agent.h +++ b/ydb/core/tx/columnshard/counters/common/agent.h @@ -26,10 +26,9 @@ private: ::NMonitoring::TDynamicCounters::TCounterPtr ValueSignalMin; ::NMonitoring::TDynamicCounters::TCounterPtr ValueSignalMax; std::deque<i64> Values; - i64 SumValue; TMutex Mutex; - bool CalcAggregations(i64& minValue, i64& maxValue) const; + bool CalcAggregations(i64& sum, i64& minValue, i64& maxValue) const; std::optional<TSignalAggregations> GetAggregations() const; public: diff --git a/ydb/core/tx/columnshard/counters/common/client.cpp b/ydb/core/tx/columnshard/counters/common/client.cpp index 95c400b35e2..71e18722a63 100644 --- a/ydb/core/tx/columnshard/counters/common/client.cpp +++ b/ydb/core/tx/columnshard/counters/common/client.cpp @@ -14,4 +14,8 @@ void TValueAggregationClient::Set(const i64 value) const { *ValuePtr = value; } +TValueAggregationClient::~TValueAggregationClient() { + Set(0); +} + } diff --git a/ydb/core/tx/columnshard/counters/common/client.h b/ydb/core/tx/columnshard/counters/common/client.h index 0c8f5a01075..f9fba2f5c29 100644 --- a/ydb/core/tx/columnshard/counters/common/client.h +++ b/ydb/core/tx/columnshard/counters/common/client.h @@ -11,6 +11,7 @@ private: i64* ValuePtr = nullptr; public: TValueAggregationClient(std::shared_ptr<TValueAggregationAgent> owner); + ~TValueAggregationClient(); void Set(const i64 value) const; }; diff --git a/ydb/core/tx/columnshard/counters/engine_logs.cpp b/ydb/core/tx/columnshard/counters/engine_logs.cpp index cc208742fa0..25b84517577 100644 --- a/ydb/core/tx/columnshard/counters/engine_logs.cpp +++ b/ydb/core/tx/columnshard/counters/engine_logs.cpp @@ -6,6 +6,7 @@ namespace NKikimr::NColumnShard { TEngineLogsCounters::TEngineLogsCounters() : TBase("EngineLogs") + , GranuleDataAgent("EngineLogs") { OverloadGranules = TBase::GetValue("Granules/Overload"); CompactOverloadGranulesSelection = TBase::GetDeriviative("Granules/Selection/Overload/Count"); diff --git a/ydb/core/tx/columnshard/counters/engine_logs.h b/ydb/core/tx/columnshard/counters/engine_logs.h index 6beb40308a0..cb33f37fb48 100644 --- a/ydb/core/tx/columnshard/counters/engine_logs.h +++ b/ydb/core/tx/columnshard/counters/engine_logs.h @@ -4,6 +4,111 @@ namespace NKikimr::NColumnShard { +class TBaseGranuleDataClassSummary { +protected: + i64 PortionsSize = 0; + i64 MaxColumnsSize = 0; + i64 PortionsCount = 0; + i64 RecordsCount = 0; +public: + i64 GetPortionsSize() const { + return PortionsSize; + } + i64 GetRecordsCount() const { + return RecordsCount; + } + i64 GetMaxColumnsSize() const { + return MaxColumnsSize; + } + i64 GetPortionsCount() const { + return PortionsCount; + } +}; + +class TDataClassCounters { +private: + std::shared_ptr<TValueAggregationClient> PortionsSize; + std::shared_ptr<TValueAggregationClient> PortionsCount; +public: + TDataClassCounters(const std::shared_ptr<TValueAggregationClient>& portionsSize, const std::shared_ptr<TValueAggregationClient>& portionsCount) + : PortionsSize(portionsSize) + , PortionsCount(portionsCount) + { + + } + + void OnPortionsInfo(const TBaseGranuleDataClassSummary& dataInfo) const { + PortionsSize->Set(dataInfo.GetPortionsSize()); + PortionsCount->Set(dataInfo.GetPortionsCount()); + } + + void OnPortionsInfo(const ui64 size, const ui32 chunks) const { + PortionsSize->Set(size); + PortionsCount->Set(chunks); + } +}; + +class TAgentDataClassCounters: public TCommonCountersOwner { +private: + using TBase = TCommonCountersOwner; + std::shared_ptr<TValueAggregationAgent> PortionsSize; + std::shared_ptr<TValueAggregationAgent> PortionsCount; +public: + TAgentDataClassCounters(const TString& baseName, const TString& signalId) + : TBase(baseName) + { + PortionsSize = TBase::GetValueAutoAggregations(signalId + "/Bytes"); + PortionsCount = TBase::GetValueAutoAggregations(signalId + "/Chunks"); + } + + TDataClassCounters RegisterClient() const { + return TDataClassCounters(PortionsSize->GetClient(PortionsSize), PortionsCount->GetClient(PortionsCount)); + } +}; + +class TGranuleDataCounters { +private: + const TDataClassCounters InsertedData; + const TDataClassCounters CompactedData; + const TDataClassCounters FullData; +public: + TGranuleDataCounters(const TDataClassCounters& insertedData, const TDataClassCounters& compactedData, const TDataClassCounters& fullData) + : InsertedData(insertedData) + , CompactedData(compactedData) + , FullData(fullData) + { + } + + void OnFullData(const ui64 size, const ui32 chunksCount) const { + FullData.OnPortionsInfo(size, chunksCount); + } + + void OnInsertedData(const TBaseGranuleDataClassSummary& dataInfo) const { + InsertedData.OnPortionsInfo(dataInfo); + } + + void OnCompactedData(const TBaseGranuleDataClassSummary& dataInfo) const { + CompactedData.OnPortionsInfo(dataInfo); + } +}; + +class TAgentGranuleDataCounters { +private: + TAgentDataClassCounters InsertedData; + TAgentDataClassCounters CompactedData; + TAgentDataClassCounters FullData; +public: + TAgentGranuleDataCounters(const TString& ownerId) + : InsertedData(ownerId, "ByGranule/Inserted") + , CompactedData(ownerId, "ByGranule/Compacted") + , FullData(ownerId, "ByGranule/Full") { + } + + TGranuleDataCounters RegisterClient() const { + return TGranuleDataCounters(InsertedData.RegisterClient(), CompactedData.RegisterClient(), FullData.RegisterClient()); + } +}; + class TEngineLogsCounters: public TCommonCountersOwner { private: using TBase = TCommonCountersOwner; @@ -18,6 +123,8 @@ private: NMonitoring::TDynamicCounters::TCounterPtr PortionNoBorderCount; NMonitoring::TDynamicCounters::TCounterPtr PortionNoBorderBytes; + + TAgentGranuleDataCounters GranuleDataAgent; public: NMonitoring::TDynamicCounters::TCounterPtr OverloadGranules; NMonitoring::TDynamicCounters::TCounterPtr CompactOverloadGranulesSelection; @@ -25,6 +132,10 @@ public: NMonitoring::TDynamicCounters::TCounterPtr SplitCompactGranulesSelection; NMonitoring::TDynamicCounters::TCounterPtr InternalCompactGranulesSelection; + TGranuleDataCounters RegisterGranuleDataCounters() const { + return GranuleDataAgent.RegisterClient(); + } + void OnPortionToEvict(const ui64 size) const { PortionToEvictCount->Add(1); PortionToEvictBytes->Add(size); diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index f838be03282..a32396dd0c5 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -946,7 +946,7 @@ bool TColumnEngineForLogs::SetGranule(const TGranuleRecord& rec, bool apply) { Y_VERIFY(PathGranules[rec.PathId].emplace(mark, rec.Granule).second); // Allocate granule info and ensure that there is no granule with same id inserted before. - Y_VERIFY(Granules.emplace(rec.Granule, std::make_shared<TGranuleMeta>(rec, GranulesStorage)).second); + Y_VERIFY(Granules.emplace(rec.Granule, std::make_shared<TGranuleMeta>(rec, GranulesStorage, SignalCounters.RegisterGranuleDataCounters())).second); } else { // Granule with same id already exists. if (Granules.contains(rec.Granule)) { diff --git a/ydb/core/tx/columnshard/engines/storage/granule.cpp b/ydb/core/tx/columnshard/engines/storage/granule.cpp index f41202c8cfc..3c39702af55 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule.cpp +++ b/ydb/core/tx/columnshard/engines/storage/granule.cpp @@ -64,6 +64,7 @@ void TGranuleMeta::OnBeforeChangePortion(const TPortionInfo* portionBefore, cons if (portionAfter && portionAfter->IsActive()) { AdditiveSummaryCache->AddPortion(*portionAfter); } + OnAdditiveSummaryChange(); } } @@ -152,4 +153,20 @@ void TGranuleMeta::RebuildAdditiveMetrics() const { AdditiveSummaryCache = result; } +const NKikimr::NOlap::TGranuleAdditiveSummary& TGranuleMeta::GetAdditiveSummary() const { + if (!AdditiveSummaryCache) { + RebuildAdditiveMetrics(); + OnAdditiveSummaryChange(); + } + return *AdditiveSummaryCache; +} + +void TGranuleMeta::OnAdditiveSummaryChange() const { + if (AdditiveSummaryCache) { + Counters.OnCompactedData(AdditiveSummaryCache->GetOther()); + Counters.OnInsertedData(AdditiveSummaryCache->GetInserted()); + Counters.OnFullData(AdditiveSummaryCache->GetGranuleSize(), AdditiveSummaryCache->GetActivePortionsCount()); + } +} + } // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/storage/granule.h b/ydb/core/tx/columnshard/engines/storage/granule.h index 054f73de995..677a3bdf3b9 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule.h +++ b/ydb/core/tx/columnshard/engines/storage/granule.h @@ -1,4 +1,5 @@ #pragma once +#include <ydb/core/tx/columnshard/counters/engine_logs.h> #include <ydb/core/tx/columnshard/engines/column_engine.h> #include <ydb/core/tx/columnshard/engines/portion_info.h> @@ -30,27 +31,10 @@ public: } }; -class TDataClassSummary { +class TDataClassSummary: public NColumnShard::TBaseGranuleDataClassSummary { private: - i64 PortionsSize = 0; - i64 MaxColumnsSize = 0; - i64 PortionsCount = 0; - i64 RecordsCount = 0; friend class TGranuleMeta; public: - i64 GetPortionsSize() const { - return PortionsSize; - } - i64 GetRecordsCount() const { - return RecordsCount; - } - i64 GetMaxColumnsSize() const { - return MaxColumnsSize; - } - i64 GetPortionsCount() const { - return PortionsCount; - } - void AddPortion(const TPortionInfo& info) { const auto sizes = info.BlobsSizes(); PortionsSize += sizes.first; @@ -219,9 +203,11 @@ private: TCompactionPriorityInfo CompactionPriorityInfo; mutable bool AllowInsertionFlag = false; std::shared_ptr<TGranulesStorage> Owner; + const NColumnShard::TGranuleDataCounters Counters; void OnBeforeChangePortion(const TPortionInfo* portionBefore, const TPortionInfo* portionAfter); void OnAfterChangePortion(); + void OnAdditiveSummaryChange() const; public: const TGranuleHardSummary& GetHardSummary() const { if (!HardSummaryCache) { @@ -229,12 +215,7 @@ public: } return *HardSummaryCache; } - const TGranuleAdditiveSummary& GetAdditiveSummary() const { - if (!AdditiveSummaryCache) { - RebuildAdditiveMetrics(); - } - return *AdditiveSummaryCache; - } + const TGranuleAdditiveSummary& GetAdditiveSummary() const; TCompactionPriority GetCompactionPriority() const { return TCompactionPriority(CompactionPriorityInfo, GetAdditiveSummary()); } @@ -304,9 +285,11 @@ public: bool ErasePortion(const ui64 portion); - explicit TGranuleMeta(const TGranuleRecord& rec, std::shared_ptr<TGranulesStorage> owner) + explicit TGranuleMeta(const TGranuleRecord& rec, std::shared_ptr<TGranulesStorage> owner, const NColumnShard::TGranuleDataCounters& counters) : Owner(owner) - , Record(rec) { + , Counters(counters) + , Record(rec) + { } ui64 GetGranuleId() const { |