aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-06-08 17:20:49 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-06-08 17:20:49 +0300
commit8bfba786a00c488c54b9950cea95da5a28e99be9 (patch)
tree1667a589e5890ee421225a5660051142f42acfe8
parent723a9414949d935da94e2df3ce0b64d5c5497c8e (diff)
downloadydb-8bfba786a00c488c54b9950cea95da5a28e99be9.tar.gz
add aggregation signals
-rw-r--r--ydb/core/tx/columnshard/counters/common/agent.cpp9
-rw-r--r--ydb/core/tx/columnshard/counters/common/agent.h3
-rw-r--r--ydb/core/tx/columnshard/counters/common/client.cpp4
-rw-r--r--ydb/core/tx/columnshard/counters/common/client.h1
-rw-r--r--ydb/core/tx/columnshard/counters/engine_logs.cpp1
-rw-r--r--ydb/core/tx/columnshard/counters/engine_logs.h111
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/storage/granule.cpp17
-rw-r--r--ydb/core/tx/columnshard/engines/storage/granule.h35
9 files changed, 151 insertions, 32 deletions
diff --git a/ydb/core/tx/columnshard/counters/common/agent.cpp b/ydb/core/tx/columnshard/counters/common/agent.cpp
index b86c4d92fcf..e34fe416ca3 100644
--- a/ydb/core/tx/columnshard/counters/common/agent.cpp
+++ b/ydb/core/tx/columnshard/counters/common/agent.cpp
@@ -11,14 +11,16 @@ TValueAggregationAgent::TValueAggregationAgent(const TString& signalName, const
}
-bool TValueAggregationAgent::CalcAggregations(i64& minValue, i64& maxValue) const {
+bool TValueAggregationAgent::CalcAggregations(i64& sum, i64& minValue, i64& maxValue) const {
const ui32 count = Values.size();
if (!count) {
return false;
}
+ sum = 0;
minValue = Values.front();
maxValue = Values.front();
for (ui32 i = 0; i < count; ++i) {
+ sum += Values[i];
if (minValue > Values[i]) {
minValue = Values[i];
}
@@ -30,12 +32,13 @@ bool TValueAggregationAgent::CalcAggregations(i64& minValue, i64& maxValue) cons
}
std::optional<NKikimr::NColumnShard::TSignalAggregations> TValueAggregationAgent::GetAggregations() const {
+ i64 sum;
i64 min;
i64 max;
- if (!CalcAggregations(min, max)) {
+ if (!CalcAggregations(sum, min, max)) {
return {};
}
- return TSignalAggregations(SumValue, min, max);
+ return TSignalAggregations(sum, min, max);
}
void TValueAggregationAgent::ResendStatus() const {
diff --git a/ydb/core/tx/columnshard/counters/common/agent.h b/ydb/core/tx/columnshard/counters/common/agent.h
index 9aa2b908ebc..c10ecb5dee8 100644
--- a/ydb/core/tx/columnshard/counters/common/agent.h
+++ b/ydb/core/tx/columnshard/counters/common/agent.h
@@ -26,10 +26,9 @@ private:
::NMonitoring::TDynamicCounters::TCounterPtr ValueSignalMin;
::NMonitoring::TDynamicCounters::TCounterPtr ValueSignalMax;
std::deque<i64> Values;
- i64 SumValue;
TMutex Mutex;
- bool CalcAggregations(i64& minValue, i64& maxValue) const;
+ bool CalcAggregations(i64& sum, i64& minValue, i64& maxValue) const;
std::optional<TSignalAggregations> GetAggregations() const;
public:
diff --git a/ydb/core/tx/columnshard/counters/common/client.cpp b/ydb/core/tx/columnshard/counters/common/client.cpp
index 95c400b35e2..71e18722a63 100644
--- a/ydb/core/tx/columnshard/counters/common/client.cpp
+++ b/ydb/core/tx/columnshard/counters/common/client.cpp
@@ -14,4 +14,8 @@ void TValueAggregationClient::Set(const i64 value) const {
*ValuePtr = value;
}
+TValueAggregationClient::~TValueAggregationClient() {
+ Set(0);
+}
+
}
diff --git a/ydb/core/tx/columnshard/counters/common/client.h b/ydb/core/tx/columnshard/counters/common/client.h
index 0c8f5a01075..f9fba2f5c29 100644
--- a/ydb/core/tx/columnshard/counters/common/client.h
+++ b/ydb/core/tx/columnshard/counters/common/client.h
@@ -11,6 +11,7 @@ private:
i64* ValuePtr = nullptr;
public:
TValueAggregationClient(std::shared_ptr<TValueAggregationAgent> owner);
+ ~TValueAggregationClient();
void Set(const i64 value) const;
};
diff --git a/ydb/core/tx/columnshard/counters/engine_logs.cpp b/ydb/core/tx/columnshard/counters/engine_logs.cpp
index cc208742fa0..25b84517577 100644
--- a/ydb/core/tx/columnshard/counters/engine_logs.cpp
+++ b/ydb/core/tx/columnshard/counters/engine_logs.cpp
@@ -6,6 +6,7 @@ namespace NKikimr::NColumnShard {
TEngineLogsCounters::TEngineLogsCounters()
: TBase("EngineLogs")
+ , GranuleDataAgent("EngineLogs")
{
OverloadGranules = TBase::GetValue("Granules/Overload");
CompactOverloadGranulesSelection = TBase::GetDeriviative("Granules/Selection/Overload/Count");
diff --git a/ydb/core/tx/columnshard/counters/engine_logs.h b/ydb/core/tx/columnshard/counters/engine_logs.h
index 6beb40308a0..cb33f37fb48 100644
--- a/ydb/core/tx/columnshard/counters/engine_logs.h
+++ b/ydb/core/tx/columnshard/counters/engine_logs.h
@@ -4,6 +4,111 @@
namespace NKikimr::NColumnShard {
+class TBaseGranuleDataClassSummary {
+protected:
+ i64 PortionsSize = 0;
+ i64 MaxColumnsSize = 0;
+ i64 PortionsCount = 0;
+ i64 RecordsCount = 0;
+public:
+ i64 GetPortionsSize() const {
+ return PortionsSize;
+ }
+ i64 GetRecordsCount() const {
+ return RecordsCount;
+ }
+ i64 GetMaxColumnsSize() const {
+ return MaxColumnsSize;
+ }
+ i64 GetPortionsCount() const {
+ return PortionsCount;
+ }
+};
+
+class TDataClassCounters {
+private:
+ std::shared_ptr<TValueAggregationClient> PortionsSize;
+ std::shared_ptr<TValueAggregationClient> PortionsCount;
+public:
+ TDataClassCounters(const std::shared_ptr<TValueAggregationClient>& portionsSize, const std::shared_ptr<TValueAggregationClient>& portionsCount)
+ : PortionsSize(portionsSize)
+ , PortionsCount(portionsCount)
+ {
+
+ }
+
+ void OnPortionsInfo(const TBaseGranuleDataClassSummary& dataInfo) const {
+ PortionsSize->Set(dataInfo.GetPortionsSize());
+ PortionsCount->Set(dataInfo.GetPortionsCount());
+ }
+
+ void OnPortionsInfo(const ui64 size, const ui32 chunks) const {
+ PortionsSize->Set(size);
+ PortionsCount->Set(chunks);
+ }
+};
+
+class TAgentDataClassCounters: public TCommonCountersOwner {
+private:
+ using TBase = TCommonCountersOwner;
+ std::shared_ptr<TValueAggregationAgent> PortionsSize;
+ std::shared_ptr<TValueAggregationAgent> PortionsCount;
+public:
+ TAgentDataClassCounters(const TString& baseName, const TString& signalId)
+ : TBase(baseName)
+ {
+ PortionsSize = TBase::GetValueAutoAggregations(signalId + "/Bytes");
+ PortionsCount = TBase::GetValueAutoAggregations(signalId + "/Chunks");
+ }
+
+ TDataClassCounters RegisterClient() const {
+ return TDataClassCounters(PortionsSize->GetClient(PortionsSize), PortionsCount->GetClient(PortionsCount));
+ }
+};
+
+class TGranuleDataCounters {
+private:
+ const TDataClassCounters InsertedData;
+ const TDataClassCounters CompactedData;
+ const TDataClassCounters FullData;
+public:
+ TGranuleDataCounters(const TDataClassCounters& insertedData, const TDataClassCounters& compactedData, const TDataClassCounters& fullData)
+ : InsertedData(insertedData)
+ , CompactedData(compactedData)
+ , FullData(fullData)
+ {
+ }
+
+ void OnFullData(const ui64 size, const ui32 chunksCount) const {
+ FullData.OnPortionsInfo(size, chunksCount);
+ }
+
+ void OnInsertedData(const TBaseGranuleDataClassSummary& dataInfo) const {
+ InsertedData.OnPortionsInfo(dataInfo);
+ }
+
+ void OnCompactedData(const TBaseGranuleDataClassSummary& dataInfo) const {
+ CompactedData.OnPortionsInfo(dataInfo);
+ }
+};
+
+class TAgentGranuleDataCounters {
+private:
+ TAgentDataClassCounters InsertedData;
+ TAgentDataClassCounters CompactedData;
+ TAgentDataClassCounters FullData;
+public:
+ TAgentGranuleDataCounters(const TString& ownerId)
+ : InsertedData(ownerId, "ByGranule/Inserted")
+ , CompactedData(ownerId, "ByGranule/Compacted")
+ , FullData(ownerId, "ByGranule/Full") {
+ }
+
+ TGranuleDataCounters RegisterClient() const {
+ return TGranuleDataCounters(InsertedData.RegisterClient(), CompactedData.RegisterClient(), FullData.RegisterClient());
+ }
+};
+
class TEngineLogsCounters: public TCommonCountersOwner {
private:
using TBase = TCommonCountersOwner;
@@ -18,6 +123,8 @@ private:
NMonitoring::TDynamicCounters::TCounterPtr PortionNoBorderCount;
NMonitoring::TDynamicCounters::TCounterPtr PortionNoBorderBytes;
+
+ TAgentGranuleDataCounters GranuleDataAgent;
public:
NMonitoring::TDynamicCounters::TCounterPtr OverloadGranules;
NMonitoring::TDynamicCounters::TCounterPtr CompactOverloadGranulesSelection;
@@ -25,6 +132,10 @@ public:
NMonitoring::TDynamicCounters::TCounterPtr SplitCompactGranulesSelection;
NMonitoring::TDynamicCounters::TCounterPtr InternalCompactGranulesSelection;
+ TGranuleDataCounters RegisterGranuleDataCounters() const {
+ return GranuleDataAgent.RegisterClient();
+ }
+
void OnPortionToEvict(const ui64 size) const {
PortionToEvictCount->Add(1);
PortionToEvictBytes->Add(size);
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index f838be03282..a32396dd0c5 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -946,7 +946,7 @@ bool TColumnEngineForLogs::SetGranule(const TGranuleRecord& rec, bool apply) {
Y_VERIFY(PathGranules[rec.PathId].emplace(mark, rec.Granule).second);
// Allocate granule info and ensure that there is no granule with same id inserted before.
- Y_VERIFY(Granules.emplace(rec.Granule, std::make_shared<TGranuleMeta>(rec, GranulesStorage)).second);
+ Y_VERIFY(Granules.emplace(rec.Granule, std::make_shared<TGranuleMeta>(rec, GranulesStorage, SignalCounters.RegisterGranuleDataCounters())).second);
} else {
// Granule with same id already exists.
if (Granules.contains(rec.Granule)) {
diff --git a/ydb/core/tx/columnshard/engines/storage/granule.cpp b/ydb/core/tx/columnshard/engines/storage/granule.cpp
index f41202c8cfc..3c39702af55 100644
--- a/ydb/core/tx/columnshard/engines/storage/granule.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/granule.cpp
@@ -64,6 +64,7 @@ void TGranuleMeta::OnBeforeChangePortion(const TPortionInfo* portionBefore, cons
if (portionAfter && portionAfter->IsActive()) {
AdditiveSummaryCache->AddPortion(*portionAfter);
}
+ OnAdditiveSummaryChange();
}
}
@@ -152,4 +153,20 @@ void TGranuleMeta::RebuildAdditiveMetrics() const {
AdditiveSummaryCache = result;
}
+const NKikimr::NOlap::TGranuleAdditiveSummary& TGranuleMeta::GetAdditiveSummary() const {
+ if (!AdditiveSummaryCache) {
+ RebuildAdditiveMetrics();
+ OnAdditiveSummaryChange();
+ }
+ return *AdditiveSummaryCache;
+}
+
+void TGranuleMeta::OnAdditiveSummaryChange() const {
+ if (AdditiveSummaryCache) {
+ Counters.OnCompactedData(AdditiveSummaryCache->GetOther());
+ Counters.OnInsertedData(AdditiveSummaryCache->GetInserted());
+ Counters.OnFullData(AdditiveSummaryCache->GetGranuleSize(), AdditiveSummaryCache->GetActivePortionsCount());
+ }
+}
+
} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/storage/granule.h b/ydb/core/tx/columnshard/engines/storage/granule.h
index 054f73de995..677a3bdf3b9 100644
--- a/ydb/core/tx/columnshard/engines/storage/granule.h
+++ b/ydb/core/tx/columnshard/engines/storage/granule.h
@@ -1,4 +1,5 @@
#pragma once
+#include <ydb/core/tx/columnshard/counters/engine_logs.h>
#include <ydb/core/tx/columnshard/engines/column_engine.h>
#include <ydb/core/tx/columnshard/engines/portion_info.h>
@@ -30,27 +31,10 @@ public:
}
};
-class TDataClassSummary {
+class TDataClassSummary: public NColumnShard::TBaseGranuleDataClassSummary {
private:
- i64 PortionsSize = 0;
- i64 MaxColumnsSize = 0;
- i64 PortionsCount = 0;
- i64 RecordsCount = 0;
friend class TGranuleMeta;
public:
- i64 GetPortionsSize() const {
- return PortionsSize;
- }
- i64 GetRecordsCount() const {
- return RecordsCount;
- }
- i64 GetMaxColumnsSize() const {
- return MaxColumnsSize;
- }
- i64 GetPortionsCount() const {
- return PortionsCount;
- }
-
void AddPortion(const TPortionInfo& info) {
const auto sizes = info.BlobsSizes();
PortionsSize += sizes.first;
@@ -219,9 +203,11 @@ private:
TCompactionPriorityInfo CompactionPriorityInfo;
mutable bool AllowInsertionFlag = false;
std::shared_ptr<TGranulesStorage> Owner;
+ const NColumnShard::TGranuleDataCounters Counters;
void OnBeforeChangePortion(const TPortionInfo* portionBefore, const TPortionInfo* portionAfter);
void OnAfterChangePortion();
+ void OnAdditiveSummaryChange() const;
public:
const TGranuleHardSummary& GetHardSummary() const {
if (!HardSummaryCache) {
@@ -229,12 +215,7 @@ public:
}
return *HardSummaryCache;
}
- const TGranuleAdditiveSummary& GetAdditiveSummary() const {
- if (!AdditiveSummaryCache) {
- RebuildAdditiveMetrics();
- }
- return *AdditiveSummaryCache;
- }
+ const TGranuleAdditiveSummary& GetAdditiveSummary() const;
TCompactionPriority GetCompactionPriority() const {
return TCompactionPriority(CompactionPriorityInfo, GetAdditiveSummary());
}
@@ -304,9 +285,11 @@ public:
bool ErasePortion(const ui64 portion);
- explicit TGranuleMeta(const TGranuleRecord& rec, std::shared_ptr<TGranulesStorage> owner)
+ explicit TGranuleMeta(const TGranuleRecord& rec, std::shared_ptr<TGranulesStorage> owner, const NColumnShard::TGranuleDataCounters& counters)
: Owner(owner)
- , Record(rec) {
+ , Counters(counters)
+ , Record(rec)
+ {
}
ui64 GetGranuleId() const {