aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-10-20 17:35:44 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-10-20 18:56:53 +0300
commit3ccfbd6aca63fc40ba20bc9ab2840049d9bdd0fd (patch)
tree12e1ed2e7f382c457f2d5bb190772c6c092d3be2
parentfc0e05b8840f83b3cacf286352c4c8f741145176 (diff)
downloadydb-3ccfbd6aca63fc40ba20bc9ab2840049d9bdd0fd.tar.gz
KIKIMR-19211: simple granules rating for actualization usage
-rw-r--r--ydb/core/tx/columnshard/counters/engine_logs.cpp33
-rw-r--r--ydb/core/tx/columnshard/counters/engine_logs.h36
-rw-r--r--ydb/core/tx/columnshard/engines/changes/with_appended.cpp47
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/storage/granule.cpp17
-rw-r--r--ydb/core/tx/columnshard/engines/storage/granule.h35
-rw-r--r--ydb/core/tx/columnshard/engines/storage/storage.cpp55
-rw-r--r--ydb/core/tx/columnshard/engines/storage/storage.h2
8 files changed, 133 insertions, 96 deletions
diff --git a/ydb/core/tx/columnshard/counters/engine_logs.cpp b/ydb/core/tx/columnshard/counters/engine_logs.cpp
index 7e0df8d3e67..a4e06b8644d 100644
--- a/ydb/core/tx/columnshard/counters/engine_logs.cpp
+++ b/ydb/core/tx/columnshard/counters/engine_logs.cpp
@@ -1,6 +1,7 @@
#include "engine_logs.h"
#include <ydb/core/base/appdata.h>
#include <ydb/core/base/counters.h>
+#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
#include <util/generic/serialized_enum.h>
namespace NKikimr::NColumnShard {
@@ -13,11 +14,23 @@ TEngineLogsCounters::TEngineLogsCounters()
{2 * 1024 * 1024, "2Mb"}, {4 * 1024 * 1024, "4Mb"},
{5 * 1024 * 1024, "5Mb"}, {6 * 1024 * 1024, "6Mb"},
{7 * 1024 * 1024, "7Mb"}, {8 * 1024 * 1024, "8Mb"}};
+ const std::map<i64, TString> portionSizeBorders = {{0, "0"}, {512 * 1024, "512kb"}, {1024 * 1024, "1Mb"},
+ {2 * 1024 * 1024, "2Mb"}, {4 * 1024 * 1024, "4Mb"},
+ {8 * 1024 * 1024, "8Mb"}, {16 * 1024 * 1024, "16Mb"},
+ {32 * 1024 * 1024, "32Mb"}, {64 * 1024 * 1024, "64Mb"}};
+ const std::set<i64> portionRecordBorders = {0, 2500, 5000, 7500, 9000, 10000, 20000, 40000, 80000, 160000};
for (auto&& i : GetEnumNames<NOlap::NPortion::EProduced>()) {
if (BlobSizeDistribution.size() <= (ui32)i.first) {
BlobSizeDistribution.resize((ui32)i.first + 1);
+ PortionSizeDistribution.resize((ui32)i.first + 1);
+ PortionRecordsDistribution.resize((ui32)i.first + 1);
+ }
+ if (PortionSizeDistribution.size() <= (ui32)i.first) {
+ PortionSizeDistribution.resize((ui32)i.first + 1);
}
BlobSizeDistribution[(ui32)i.first] = std::make_shared<TIncrementalHistogram>("EngineLogs", "BlobSizeDistribution", i.second, borders);
+ PortionSizeDistribution[(ui32)i.first] = std::make_shared<TIncrementalHistogram>("EngineLogs", "PortionSizeDistribution", i.second, portionSizeBorders);
+ PortionRecordsDistribution[(ui32)i.first] = std::make_shared<TIncrementalHistogram>("EngineLogs", "PortionRecordsDistribution", i.second, portionRecordBorders);
}
for (auto&& i : BlobSizeDistribution) {
Y_ABORT_UNLESS(i);
@@ -41,4 +54,24 @@ TEngineLogsCounters::TEngineLogsCounters()
PortionNoBorderBytes = TBase::GetDeriviative("Ttl/PortionNoBorder/Bytes");
}
+void TEngineLogsCounters::TPortionsInfoGuard::OnNewPortion(const std::shared_ptr<NOlap::TPortionInfo>& portion) const {
+ const ui32 producedId = (ui32)(portion->HasRemoveSnapshot() ? NOlap::NPortion::EProduced::INACTIVE : portion->GetMeta().Produced);
+ Y_ABORT_UNLESS(producedId < BlobGuards.size());
+ for (auto&& i : portion->GetBlobIds()) {
+ BlobGuards[producedId]->Add(i.BlobSize(), i.BlobSize());
+ }
+ PortionRecordCountGuards[producedId]->Add(portion->GetRecordsCount(), 1);
+ PortionSizeGuards[producedId]->Add(portion->GetBlobBytes(), 1);
+}
+
+void TEngineLogsCounters::TPortionsInfoGuard::OnDropPortion(const std::shared_ptr<NOlap::TPortionInfo>& portion) const {
+ const ui32 producedId = (ui32)(portion->HasRemoveSnapshot() ? NOlap::NPortion::EProduced::INACTIVE : portion->GetMeta().Produced);
+ Y_ABORT_UNLESS(producedId < BlobGuards.size());
+ for (auto&& i : portion->GetBlobIds()) {
+ BlobGuards[producedId]->Sub(i.BlobSize(), i.BlobSize());
+ }
+ PortionRecordCountGuards[producedId]->Sub(portion->GetRecordsCount(), 1);
+ PortionSizeGuards[producedId]->Sub(portion->GetBlobBytes(), 1);
+}
+
}
diff --git a/ydb/core/tx/columnshard/counters/engine_logs.h b/ydb/core/tx/columnshard/counters/engine_logs.h
index 003dcc2eaa9..4f36e28afbc 100644
--- a/ydb/core/tx/columnshard/counters/engine_logs.h
+++ b/ydb/core/tx/columnshard/counters/engine_logs.h
@@ -5,6 +5,10 @@
#include <util/string/builder.h>
#include <set>
+namespace NKikimr::NOlap {
+class TPortionInfo;
+}
+
namespace NKikimr::NColumnShard {
class TBaseGranuleDataClassSummary {
@@ -244,6 +248,8 @@ private:
TAgentGranuleDataCounters GranuleDataAgent;
std::vector<std::shared_ptr<TIncrementalHistogram>> BlobSizeDistribution;
+ std::vector<std::shared_ptr<TIncrementalHistogram>> PortionSizeDistribution;
+ std::vector<std::shared_ptr<TIncrementalHistogram>> PortionRecordsDistribution;
public:
NMonitoring::TDynamicCounters::TCounterPtr OverloadGranules;
NMonitoring::TDynamicCounters::TCounterPtr CompactOverloadGranulesSelection;
@@ -253,29 +259,33 @@ public:
class TPortionsInfoGuard {
private:
- std::vector<std::shared_ptr<TIncrementalHistogram::TGuard>> Guards;
+ std::vector<std::shared_ptr<TIncrementalHistogram::TGuard>> BlobGuards;
+ std::vector<std::shared_ptr<TIncrementalHistogram::TGuard>> PortionRecordCountGuards;
+ std::vector<std::shared_ptr<TIncrementalHistogram::TGuard>> PortionSizeGuards;
public:
- TPortionsInfoGuard(const std::vector<std::shared_ptr<TIncrementalHistogram>>& distr)
+ TPortionsInfoGuard(const std::vector<std::shared_ptr<TIncrementalHistogram>>& distrBlobs,
+ const std::vector<std::shared_ptr<TIncrementalHistogram>>& distrPortionSize,
+ const std::vector<std::shared_ptr<TIncrementalHistogram>>& distrRecordsCount)
{
- for (auto&& i : distr) {
- Guards.emplace_back(i->BuildGuard());
+ for (auto&& i : distrBlobs) {
+ BlobGuards.emplace_back(i->BuildGuard());
+ }
+ for (auto&& i : distrPortionSize) {
+ PortionSizeGuards.emplace_back(i->BuildGuard());
+ }
+ for (auto&& i : distrRecordsCount) {
+ PortionRecordCountGuards.emplace_back(i->BuildGuard());
}
}
- void OnNewBlob(const NOlap::NPortion::EProduced produced, const ui64 size) const {
- Y_ABORT_UNLESS((ui32)produced < Guards.size());
- Guards[(ui32)produced]->Add(size, size);
- }
- void OnDropBlob(const NOlap::NPortion::EProduced produced, const ui64 size) const {
- Y_ABORT_UNLESS((ui32)produced < Guards.size());
- Guards[(ui32)produced]->Sub(size, size);
- }
+ void OnNewPortion(const std::shared_ptr<NOlap::TPortionInfo>& portion) const;
+ void OnDropPortion(const std::shared_ptr<NOlap::TPortionInfo>& portion) const;
};
TPortionsInfoGuard BuildPortionBlobsGuard() const {
- return TPortionsInfoGuard(BlobSizeDistribution);
+ return TPortionsInfoGuard(BlobSizeDistribution, PortionSizeDistribution, PortionRecordsDistribution);
}
TGranuleDataCounters RegisterGranuleDataCounters() const {
diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
index 0a07d0f0ac3..33d8e6ba65a 100644
--- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
@@ -70,33 +70,50 @@ bool TChangesWithAppend::DoApplyChanges(TColumnEngineForLogs& self, TApplyChange
}
// Save new portions (their column records)
+ THashMap<ui64, std::shared_ptr<TGranuleMeta>> granules;
for (auto& portionInfoWithBlobs : AppendedPortions) {
auto& portionInfo = portionInfoWithBlobs.GetPortionInfo();
Y_ABORT_UNLESS(!portionInfo.Empty());
auto it = remapGranules.find(portionInfo.GetGranule());
if (it != remapGranules.end()) {
- portionInfo.SetGranule(it->second);
- }
- self.UpsertPortion(portionInfo);
- for (auto& record : portionInfo.Records) {
- self.ColumnsTable->Write(context.DB, portionInfo, record);
+ granules.emplace(it->second, self.GetGranulePtrVerified(it->second));
+ } else {
+ granules.emplace(portionInfo.GetGranule(), self.GetGranulePtrVerified(portionInfo.GetGranule()));
}
}
-
- auto g = self.GranulesStorage->StartPackModification();
for (auto& [_, portionInfo] : PortionsToRemove) {
- Y_ABORT_UNLESS(!portionInfo.Empty());
- Y_ABORT_UNLESS(portionInfo.HasRemoveSnapshot());
+ granules.emplace(portionInfo.GetGranule(), self.GetGranulePtrVerified(portionInfo.GetGranule()));
+ }
+ NJson::TJsonValue sbJson = NJson::JSON_MAP;
+ {
+ auto g = self.GranulesStorage->StartPackModification();
+
+ for (auto& [_, portionInfo] : PortionsToRemove) {
+ Y_ABORT_UNLESS(!portionInfo.Empty());
+ Y_ABORT_UNLESS(portionInfo.HasRemoveSnapshot());
- const ui64 granule = portionInfo.GetGranule();
- const ui64 portion = portionInfo.GetPortion();
+ const ui64 granule = portionInfo.GetGranule();
+ const ui64 portion = portionInfo.GetPortion();
- const TPortionInfo& oldInfo = self.GetGranuleVerified(granule).GetPortionVerified(portion);
+ const TPortionInfo& oldInfo = self.GetGranuleVerified(granule).GetPortionVerified(portion);
- self.UpsertPortion(portionInfo, &oldInfo);
+ self.UpsertPortion(portionInfo, &oldInfo);
- for (auto& record : portionInfo.Records) {
- self.ColumnsTable->Write(context.DB, portionInfo, record);
+ for (auto& record : portionInfo.Records) {
+ self.ColumnsTable->Write(context.DB, portionInfo, record);
+ }
+ }
+ for (auto& portionInfoWithBlobs : AppendedPortions) {
+ auto& portionInfo = portionInfoWithBlobs.GetPortionInfo();
+ Y_ABORT_UNLESS(!portionInfo.Empty());
+ auto it = remapGranules.find(portionInfo.GetGranule());
+ if (it != remapGranules.end()) {
+ portionInfo.SetGranule(it->second);
+ }
+ self.UpsertPortion(portionInfo);
+ for (auto& record : portionInfo.Records) {
+ self.ColumnsTable->Write(context.DB, portionInfo, record);
+ }
}
}
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index cd1ddc442c0..1cbf1132704 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -288,11 +288,15 @@ std::shared_ptr<TInsertColumnEngineChanges> TColumnEngineForLogs::StartInsert(st
std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(const TCompactionLimits& limits, const THashSet<TPortionAddress>& busyPortions) noexcept {
auto granule = GranulesStorage->GetGranuleForCompaction(Granules);
if (!granule) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "no granules for start compaction");
return nullptr;
}
granule->OnStartCompaction();
auto changes = granule->GetOptimizationTask(limits, granule, busyPortions);
NYDBTest::TControllers::GetColumnShardController()->OnStartCompaction(changes);
+ if (!changes) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "cannot build optimization task for granule that need compaction")("weight", granule->GetCompactionPriority().DebugString());
+ }
return changes;
}
diff --git a/ydb/core/tx/columnshard/engines/storage/granule.cpp b/ydb/core/tx/columnshard/engines/storage/granule.cpp
index c8d9f49ed7b..843c3a098da 100644
--- a/ydb/core/tx/columnshard/engines/storage/granule.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/granule.cpp
@@ -3,6 +3,7 @@
#include <library/cpp/actors/core/log.h>
#include "optimizer/intervals/optimizer.h"
#include "optimizer/levels/optimizer.h"
+#include "optimizer/lbuckets/optimizer.h"
namespace NKikimr::NOlap {
@@ -72,13 +73,7 @@ void TGranuleMeta::OnAfterChangePortion(const std::shared_ptr<TPortionInfo> port
if (portionAfter) {
AFL_VERIFY(PortionsByPK[portionAfter->IndexKeyStart()].emplace(portionAfter->GetPortion(), portionAfter).second);
- THashMap<TUnifiedBlobId, ui64> blobIdSize;
- for (auto&& i : portionAfter->Records) {
- blobIdSize[i.BlobRange.BlobId] += i.BlobRange.Size;
- }
- for (auto&& i : blobIdSize) {
- PortionInfoGuard.OnNewBlob(portionAfter->HasRemoveSnapshot() ? NPortion::EProduced::INACTIVE : portionAfter->GetMeta().Produced, i.second);
- }
+ PortionInfoGuard.OnNewPortion(portionAfter);
if (!portionAfter->HasRemoveSnapshot()) {
if (modificationGuard) {
modificationGuard->AddPortion(portionAfter);
@@ -111,13 +106,7 @@ void TGranuleMeta::OnBeforeChangePortion(const std::shared_ptr<TPortionInfo> por
}
}
- THashMap<TUnifiedBlobId, ui64> blobIdSize;
- for (auto&& i : portionBefore->Records) {
- blobIdSize[i.BlobRange.BlobId] += i.BlobRange.Size;
- }
- for (auto&& i : blobIdSize) {
- PortionInfoGuard.OnDropBlob(portionBefore->HasRemoveSnapshot() ? NPortion::EProduced::INACTIVE : portionBefore->GetMeta().Produced, i.second);
- }
+ PortionInfoGuard.OnDropPortion(portionBefore);
if (!portionBefore->HasRemoveSnapshot()) {
OptimizerPlanner->StartModificationGuard().RemovePortion(portionBefore);
}
diff --git a/ydb/core/tx/columnshard/engines/storage/granule.h b/ydb/core/tx/columnshard/engines/storage/granule.h
index b98f488094b..550d9ea0daf 100644
--- a/ydb/core/tx/columnshard/engines/storage/granule.h
+++ b/ydb/core/tx/columnshard/engines/storage/granule.h
@@ -148,30 +148,6 @@ public:
}
};
-class TCompactionPriority {
-private:
- NStorageOptimizer::TOptimizationPriority Weight;
- TMonotonic ConstructionInstant = TMonotonic::Now();
-public:
- const NStorageOptimizer::TOptimizationPriority& GetWeight() const {
- return Weight;
- }
-
- TCompactionPriority(std::shared_ptr<NStorageOptimizer::IOptimizerPlanner> planner)
- : Weight(planner->GetUsefulMetric())
- {
-
- }
- bool operator<(const TCompactionPriority& item) const {
- return Weight < item.Weight;
- }
-
- TString DebugString() const {
- return TStringBuilder() << "summary:(" << Weight.DebugString() << ");";
- }
-
-};
-
class TGranuleMeta: TNonCopyable {
public:
enum class EActivity {
@@ -240,8 +216,15 @@ public:
TGranuleAdditiveSummary::ECompactionClass GetCompactionType(const TCompactionLimits& limits) const;
const TGranuleAdditiveSummary& GetAdditiveSummary() const;
- TCompactionPriority GetCompactionPriority() const {
- return TCompactionPriority(OptimizerPlanner);
+
+ NStorageOptimizer::TOptimizationPriority GetCompactionPriority() const {
+ return OptimizerPlanner->GetUsefulMetric();
+ }
+
+ void ActualizeOptimizer(const TInstant currentInstant) const {
+ if (currentInstant - OptimizerPlanner->GetActualizationInstant() > TDuration::Seconds(1)) {
+ OptimizerPlanner->Actualize(currentInstant);
+ }
}
bool NeedCompaction(const TCompactionLimits& limits) const {
diff --git a/ydb/core/tx/columnshard/engines/storage/storage.cpp b/ydb/core/tx/columnshard/engines/storage/storage.cpp
index b4f5561778e..543f042f527 100644
--- a/ydb/core/tx/columnshard/engines/storage/storage.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/storage.cpp
@@ -7,32 +7,29 @@ void TGranulesStorage::UpdateGranuleInfo(const TGranuleMeta& granule) {
PackModifiedGranules[granule.GetGranuleId()] = &granule;
return;
}
- {
- auto it = GranulesCompactionPriority.find(granule.GetGranuleId());
- auto gPriority = granule.GetCompactionPriority();
- if (it == GranulesCompactionPriority.end()) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "UpdateGranuleInfo")("granule", granule.DebugString())("new_priority", gPriority.DebugString());
- it = GranulesCompactionPriority.emplace(granule.GetGranuleId(), gPriority).first;
- Y_ABORT_UNLESS(GranuleCompactionPrioritySorting[gPriority].emplace(granule.GetGranuleId()).second);
- } else {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "UpdateGranuleInfo")("granule", granule.DebugString())("new_priority", gPriority.DebugString())("old_priority", it->second.DebugString());
- auto itSorting = GranuleCompactionPrioritySorting.find(it->second);
- Y_ABORT_UNLESS(itSorting != GranuleCompactionPrioritySorting.end());
- Y_ABORT_UNLESS(itSorting->second.erase(granule.GetGranuleId()));
- if (itSorting->second.empty()) {
- GranuleCompactionPrioritySorting.erase(itSorting);
- }
- it->second = gPriority;
- Y_ABORT_UNLESS(GranuleCompactionPrioritySorting[gPriority].emplace(granule.GetGranuleId()).second);
- }
- }
}
std::shared_ptr<NKikimr::NOlap::TGranuleMeta> TGranulesStorage::GetGranuleForCompaction(const THashMap<ui64, std::shared_ptr<TGranuleMeta>>& granules) const {
- if (!GranuleCompactionPrioritySorting.size()) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "no_granules_for_compaction");
- return {};
+ const TInstant now = TInstant::Now();
+ std::optional<NStorageOptimizer::TOptimizationPriority> priority;
+ std::shared_ptr<TGranuleMeta> granule;
+ for (auto&& i : granules) {
+ i.second->ActualizeOptimizer(now);
+ if (!priority || *priority < i.second->GetCompactionPriority()) {
+ priority = i.second->GetCompactionPriority();
+ granule = i.second;
+ }
+ }
+ if (!priority) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "no_granules");
+ return nullptr;
}
+ if (priority->IsZero()) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "zero_priority");
+ return nullptr;
+ }
+ return granule;
+/*
for (auto it = GranuleCompactionPrioritySorting.rbegin(); it != GranuleCompactionPrioritySorting.rend(); ++it) {
if (it->first.GetWeight().IsZero()) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "zero_granule_reached");
@@ -42,15 +39,21 @@ std::shared_ptr<NKikimr::NOlap::TGranuleMeta> TGranulesStorage::GetGranuleForCom
for (auto&& i : it->second) {
auto itGranule = granules.find(i);
Y_ABORT_UNLESS(itGranule != granules.end());
-// if (TMonotonic::Now() - itGranule->second->GetLastCompactionInstant() > TDuration::Seconds(1) || it->first.GetWeight().GetInternalLevelWeight() > 5000000) {
+ if (it->first.GetWeight().GetInternalLevelWeight() > 0 * 1024 * 1024) {
+
+// if (it->first.GetWeight().GetInternalLevelWeight() / 10000000 > 100 ||
+// it->first.GetWeight().GetInternalLevelWeight() % 10000000 > 100000) {
+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "test_granule")("granule_stats", it->first.DebugString())("granule_id", i);
return itGranule->second;
-// } else {
-// AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "test_granule_skipped")("granule_stats", it->first.DebugString())("granule_id", i)("skip_reason", "too_early_and_low_critical");
-// }
+ } else {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "test_granule_skipped")("granule_stats", it->first.DebugString())("granule_id", i)("skip_reason", "too_early_and_low_critical");
+ break;
+ }
}
}
return {};
+*/
}
} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/storage/storage.h b/ydb/core/tx/columnshard/engines/storage/storage.h
index 353a9b59573..1fa7b524c88 100644
--- a/ydb/core/tx/columnshard/engines/storage/storage.h
+++ b/ydb/core/tx/columnshard/engines/storage/storage.h
@@ -10,8 +10,6 @@ class TGranulesStorage {
private:
const TCompactionLimits Limits;
const NColumnShard::TEngineLogsCounters Counters;
- THashMap<ui64, TCompactionPriority> GranulesCompactionPriority;
- std::map<TCompactionPriority, std::set<ui64>> GranuleCompactionPrioritySorting;
std::shared_ptr<IStoragesManager> StoragesManager;
bool PackModificationFlag = false;
THashMap<ui64, const TGranuleMeta*> PackModifiedGranules;