diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-10-20 17:35:44 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-10-20 18:56:53 +0300 |
commit | 3ccfbd6aca63fc40ba20bc9ab2840049d9bdd0fd (patch) | |
tree | 12e1ed2e7f382c457f2d5bb190772c6c092d3be2 | |
parent | fc0e05b8840f83b3cacf286352c4c8f741145176 (diff) | |
download | ydb-3ccfbd6aca63fc40ba20bc9ab2840049d9bdd0fd.tar.gz |
KIKIMR-19211: simple granules rating for actualization usage
8 files changed, 133 insertions, 96 deletions
diff --git a/ydb/core/tx/columnshard/counters/engine_logs.cpp b/ydb/core/tx/columnshard/counters/engine_logs.cpp index 7e0df8d3e67..a4e06b8644d 100644 --- a/ydb/core/tx/columnshard/counters/engine_logs.cpp +++ b/ydb/core/tx/columnshard/counters/engine_logs.cpp @@ -1,6 +1,7 @@ #include "engine_logs.h" #include <ydb/core/base/appdata.h> #include <ydb/core/base/counters.h> +#include <ydb/core/tx/columnshard/engines/portions/portion_info.h> #include <util/generic/serialized_enum.h> namespace NKikimr::NColumnShard { @@ -13,11 +14,23 @@ TEngineLogsCounters::TEngineLogsCounters() {2 * 1024 * 1024, "2Mb"}, {4 * 1024 * 1024, "4Mb"}, {5 * 1024 * 1024, "5Mb"}, {6 * 1024 * 1024, "6Mb"}, {7 * 1024 * 1024, "7Mb"}, {8 * 1024 * 1024, "8Mb"}}; + const std::map<i64, TString> portionSizeBorders = {{0, "0"}, {512 * 1024, "512kb"}, {1024 * 1024, "1Mb"}, + {2 * 1024 * 1024, "2Mb"}, {4 * 1024 * 1024, "4Mb"}, + {8 * 1024 * 1024, "8Mb"}, {16 * 1024 * 1024, "16Mb"}, + {32 * 1024 * 1024, "32Mb"}, {64 * 1024 * 1024, "64Mb"}}; + const std::set<i64> portionRecordBorders = {0, 2500, 5000, 7500, 9000, 10000, 20000, 40000, 80000, 160000}; for (auto&& i : GetEnumNames<NOlap::NPortion::EProduced>()) { if (BlobSizeDistribution.size() <= (ui32)i.first) { BlobSizeDistribution.resize((ui32)i.first + 1); + PortionSizeDistribution.resize((ui32)i.first + 1); + PortionRecordsDistribution.resize((ui32)i.first + 1); + } + if (PortionSizeDistribution.size() <= (ui32)i.first) { + PortionSizeDistribution.resize((ui32)i.first + 1); } BlobSizeDistribution[(ui32)i.first] = std::make_shared<TIncrementalHistogram>("EngineLogs", "BlobSizeDistribution", i.second, borders); + PortionSizeDistribution[(ui32)i.first] = std::make_shared<TIncrementalHistogram>("EngineLogs", "PortionSizeDistribution", i.second, portionSizeBorders); + PortionRecordsDistribution[(ui32)i.first] = std::make_shared<TIncrementalHistogram>("EngineLogs", "PortionRecordsDistribution", i.second, portionRecordBorders); } for (auto&& i : BlobSizeDistribution) { Y_ABORT_UNLESS(i); @@ -41,4 +54,24 @@ TEngineLogsCounters::TEngineLogsCounters() PortionNoBorderBytes = TBase::GetDeriviative("Ttl/PortionNoBorder/Bytes"); } +void TEngineLogsCounters::TPortionsInfoGuard::OnNewPortion(const std::shared_ptr<NOlap::TPortionInfo>& portion) const { + const ui32 producedId = (ui32)(portion->HasRemoveSnapshot() ? NOlap::NPortion::EProduced::INACTIVE : portion->GetMeta().Produced); + Y_ABORT_UNLESS(producedId < BlobGuards.size()); + for (auto&& i : portion->GetBlobIds()) { + BlobGuards[producedId]->Add(i.BlobSize(), i.BlobSize()); + } + PortionRecordCountGuards[producedId]->Add(portion->GetRecordsCount(), 1); + PortionSizeGuards[producedId]->Add(portion->GetBlobBytes(), 1); +} + +void TEngineLogsCounters::TPortionsInfoGuard::OnDropPortion(const std::shared_ptr<NOlap::TPortionInfo>& portion) const { + const ui32 producedId = (ui32)(portion->HasRemoveSnapshot() ? NOlap::NPortion::EProduced::INACTIVE : portion->GetMeta().Produced); + Y_ABORT_UNLESS(producedId < BlobGuards.size()); + for (auto&& i : portion->GetBlobIds()) { + BlobGuards[producedId]->Sub(i.BlobSize(), i.BlobSize()); + } + PortionRecordCountGuards[producedId]->Sub(portion->GetRecordsCount(), 1); + PortionSizeGuards[producedId]->Sub(portion->GetBlobBytes(), 1); +} + } diff --git a/ydb/core/tx/columnshard/counters/engine_logs.h b/ydb/core/tx/columnshard/counters/engine_logs.h index 003dcc2eaa9..4f36e28afbc 100644 --- a/ydb/core/tx/columnshard/counters/engine_logs.h +++ b/ydb/core/tx/columnshard/counters/engine_logs.h @@ -5,6 +5,10 @@ #include <util/string/builder.h> #include <set> +namespace NKikimr::NOlap { +class TPortionInfo; +} + namespace NKikimr::NColumnShard { class TBaseGranuleDataClassSummary { @@ -244,6 +248,8 @@ private: TAgentGranuleDataCounters GranuleDataAgent; std::vector<std::shared_ptr<TIncrementalHistogram>> BlobSizeDistribution; + std::vector<std::shared_ptr<TIncrementalHistogram>> PortionSizeDistribution; + std::vector<std::shared_ptr<TIncrementalHistogram>> PortionRecordsDistribution; public: NMonitoring::TDynamicCounters::TCounterPtr OverloadGranules; NMonitoring::TDynamicCounters::TCounterPtr CompactOverloadGranulesSelection; @@ -253,29 +259,33 @@ public: class TPortionsInfoGuard { private: - std::vector<std::shared_ptr<TIncrementalHistogram::TGuard>> Guards; + std::vector<std::shared_ptr<TIncrementalHistogram::TGuard>> BlobGuards; + std::vector<std::shared_ptr<TIncrementalHistogram::TGuard>> PortionRecordCountGuards; + std::vector<std::shared_ptr<TIncrementalHistogram::TGuard>> PortionSizeGuards; public: - TPortionsInfoGuard(const std::vector<std::shared_ptr<TIncrementalHistogram>>& distr) + TPortionsInfoGuard(const std::vector<std::shared_ptr<TIncrementalHistogram>>& distrBlobs, + const std::vector<std::shared_ptr<TIncrementalHistogram>>& distrPortionSize, + const std::vector<std::shared_ptr<TIncrementalHistogram>>& distrRecordsCount) { - for (auto&& i : distr) { - Guards.emplace_back(i->BuildGuard()); + for (auto&& i : distrBlobs) { + BlobGuards.emplace_back(i->BuildGuard()); + } + for (auto&& i : distrPortionSize) { + PortionSizeGuards.emplace_back(i->BuildGuard()); + } + for (auto&& i : distrRecordsCount) { + PortionRecordCountGuards.emplace_back(i->BuildGuard()); } } - void OnNewBlob(const NOlap::NPortion::EProduced produced, const ui64 size) const { - Y_ABORT_UNLESS((ui32)produced < Guards.size()); - Guards[(ui32)produced]->Add(size, size); - } - void OnDropBlob(const NOlap::NPortion::EProduced produced, const ui64 size) const { - Y_ABORT_UNLESS((ui32)produced < Guards.size()); - Guards[(ui32)produced]->Sub(size, size); - } + void OnNewPortion(const std::shared_ptr<NOlap::TPortionInfo>& portion) const; + void OnDropPortion(const std::shared_ptr<NOlap::TPortionInfo>& portion) const; }; TPortionsInfoGuard BuildPortionBlobsGuard() const { - return TPortionsInfoGuard(BlobSizeDistribution); + return TPortionsInfoGuard(BlobSizeDistribution, PortionSizeDistribution, PortionRecordsDistribution); } TGranuleDataCounters RegisterGranuleDataCounters() const { diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp index 0a07d0f0ac3..33d8e6ba65a 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp @@ -70,33 +70,50 @@ bool TChangesWithAppend::DoApplyChanges(TColumnEngineForLogs& self, TApplyChange } // Save new portions (their column records) + THashMap<ui64, std::shared_ptr<TGranuleMeta>> granules; for (auto& portionInfoWithBlobs : AppendedPortions) { auto& portionInfo = portionInfoWithBlobs.GetPortionInfo(); Y_ABORT_UNLESS(!portionInfo.Empty()); auto it = remapGranules.find(portionInfo.GetGranule()); if (it != remapGranules.end()) { - portionInfo.SetGranule(it->second); - } - self.UpsertPortion(portionInfo); - for (auto& record : portionInfo.Records) { - self.ColumnsTable->Write(context.DB, portionInfo, record); + granules.emplace(it->second, self.GetGranulePtrVerified(it->second)); + } else { + granules.emplace(portionInfo.GetGranule(), self.GetGranulePtrVerified(portionInfo.GetGranule())); } } - - auto g = self.GranulesStorage->StartPackModification(); for (auto& [_, portionInfo] : PortionsToRemove) { - Y_ABORT_UNLESS(!portionInfo.Empty()); - Y_ABORT_UNLESS(portionInfo.HasRemoveSnapshot()); + granules.emplace(portionInfo.GetGranule(), self.GetGranulePtrVerified(portionInfo.GetGranule())); + } + NJson::TJsonValue sbJson = NJson::JSON_MAP; + { + auto g = self.GranulesStorage->StartPackModification(); + + for (auto& [_, portionInfo] : PortionsToRemove) { + Y_ABORT_UNLESS(!portionInfo.Empty()); + Y_ABORT_UNLESS(portionInfo.HasRemoveSnapshot()); - const ui64 granule = portionInfo.GetGranule(); - const ui64 portion = portionInfo.GetPortion(); + const ui64 granule = portionInfo.GetGranule(); + const ui64 portion = portionInfo.GetPortion(); - const TPortionInfo& oldInfo = self.GetGranuleVerified(granule).GetPortionVerified(portion); + const TPortionInfo& oldInfo = self.GetGranuleVerified(granule).GetPortionVerified(portion); - self.UpsertPortion(portionInfo, &oldInfo); + self.UpsertPortion(portionInfo, &oldInfo); - for (auto& record : portionInfo.Records) { - self.ColumnsTable->Write(context.DB, portionInfo, record); + for (auto& record : portionInfo.Records) { + self.ColumnsTable->Write(context.DB, portionInfo, record); + } + } + for (auto& portionInfoWithBlobs : AppendedPortions) { + auto& portionInfo = portionInfoWithBlobs.GetPortionInfo(); + Y_ABORT_UNLESS(!portionInfo.Empty()); + auto it = remapGranules.find(portionInfo.GetGranule()); + if (it != remapGranules.end()) { + portionInfo.SetGranule(it->second); + } + self.UpsertPortion(portionInfo); + for (auto& record : portionInfo.Records) { + self.ColumnsTable->Write(context.DB, portionInfo, record); + } } } diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index cd1ddc442c0..1cbf1132704 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -288,11 +288,15 @@ std::shared_ptr<TInsertColumnEngineChanges> TColumnEngineForLogs::StartInsert(st std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(const TCompactionLimits& limits, const THashSet<TPortionAddress>& busyPortions) noexcept { auto granule = GranulesStorage->GetGranuleForCompaction(Granules); if (!granule) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "no granules for start compaction"); return nullptr; } granule->OnStartCompaction(); auto changes = granule->GetOptimizationTask(limits, granule, busyPortions); NYDBTest::TControllers::GetColumnShardController()->OnStartCompaction(changes); + if (!changes) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "cannot build optimization task for granule that need compaction")("weight", granule->GetCompactionPriority().DebugString()); + } return changes; } diff --git a/ydb/core/tx/columnshard/engines/storage/granule.cpp b/ydb/core/tx/columnshard/engines/storage/granule.cpp index c8d9f49ed7b..843c3a098da 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule.cpp +++ b/ydb/core/tx/columnshard/engines/storage/granule.cpp @@ -3,6 +3,7 @@ #include <library/cpp/actors/core/log.h> #include "optimizer/intervals/optimizer.h" #include "optimizer/levels/optimizer.h" +#include "optimizer/lbuckets/optimizer.h" namespace NKikimr::NOlap { @@ -72,13 +73,7 @@ void TGranuleMeta::OnAfterChangePortion(const std::shared_ptr<TPortionInfo> port if (portionAfter) { AFL_VERIFY(PortionsByPK[portionAfter->IndexKeyStart()].emplace(portionAfter->GetPortion(), portionAfter).second); - THashMap<TUnifiedBlobId, ui64> blobIdSize; - for (auto&& i : portionAfter->Records) { - blobIdSize[i.BlobRange.BlobId] += i.BlobRange.Size; - } - for (auto&& i : blobIdSize) { - PortionInfoGuard.OnNewBlob(portionAfter->HasRemoveSnapshot() ? NPortion::EProduced::INACTIVE : portionAfter->GetMeta().Produced, i.second); - } + PortionInfoGuard.OnNewPortion(portionAfter); if (!portionAfter->HasRemoveSnapshot()) { if (modificationGuard) { modificationGuard->AddPortion(portionAfter); @@ -111,13 +106,7 @@ void TGranuleMeta::OnBeforeChangePortion(const std::shared_ptr<TPortionInfo> por } } - THashMap<TUnifiedBlobId, ui64> blobIdSize; - for (auto&& i : portionBefore->Records) { - blobIdSize[i.BlobRange.BlobId] += i.BlobRange.Size; - } - for (auto&& i : blobIdSize) { - PortionInfoGuard.OnDropBlob(portionBefore->HasRemoveSnapshot() ? NPortion::EProduced::INACTIVE : portionBefore->GetMeta().Produced, i.second); - } + PortionInfoGuard.OnDropPortion(portionBefore); if (!portionBefore->HasRemoveSnapshot()) { OptimizerPlanner->StartModificationGuard().RemovePortion(portionBefore); } diff --git a/ydb/core/tx/columnshard/engines/storage/granule.h b/ydb/core/tx/columnshard/engines/storage/granule.h index b98f488094b..550d9ea0daf 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule.h +++ b/ydb/core/tx/columnshard/engines/storage/granule.h @@ -148,30 +148,6 @@ public: } }; -class TCompactionPriority { -private: - NStorageOptimizer::TOptimizationPriority Weight; - TMonotonic ConstructionInstant = TMonotonic::Now(); -public: - const NStorageOptimizer::TOptimizationPriority& GetWeight() const { - return Weight; - } - - TCompactionPriority(std::shared_ptr<NStorageOptimizer::IOptimizerPlanner> planner) - : Weight(planner->GetUsefulMetric()) - { - - } - bool operator<(const TCompactionPriority& item) const { - return Weight < item.Weight; - } - - TString DebugString() const { - return TStringBuilder() << "summary:(" << Weight.DebugString() << ");"; - } - -}; - class TGranuleMeta: TNonCopyable { public: enum class EActivity { @@ -240,8 +216,15 @@ public: TGranuleAdditiveSummary::ECompactionClass GetCompactionType(const TCompactionLimits& limits) const; const TGranuleAdditiveSummary& GetAdditiveSummary() const; - TCompactionPriority GetCompactionPriority() const { - return TCompactionPriority(OptimizerPlanner); + + NStorageOptimizer::TOptimizationPriority GetCompactionPriority() const { + return OptimizerPlanner->GetUsefulMetric(); + } + + void ActualizeOptimizer(const TInstant currentInstant) const { + if (currentInstant - OptimizerPlanner->GetActualizationInstant() > TDuration::Seconds(1)) { + OptimizerPlanner->Actualize(currentInstant); + } } bool NeedCompaction(const TCompactionLimits& limits) const { diff --git a/ydb/core/tx/columnshard/engines/storage/storage.cpp b/ydb/core/tx/columnshard/engines/storage/storage.cpp index b4f5561778e..543f042f527 100644 --- a/ydb/core/tx/columnshard/engines/storage/storage.cpp +++ b/ydb/core/tx/columnshard/engines/storage/storage.cpp @@ -7,32 +7,29 @@ void TGranulesStorage::UpdateGranuleInfo(const TGranuleMeta& granule) { PackModifiedGranules[granule.GetGranuleId()] = &granule; return; } - { - auto it = GranulesCompactionPriority.find(granule.GetGranuleId()); - auto gPriority = granule.GetCompactionPriority(); - if (it == GranulesCompactionPriority.end()) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "UpdateGranuleInfo")("granule", granule.DebugString())("new_priority", gPriority.DebugString()); - it = GranulesCompactionPriority.emplace(granule.GetGranuleId(), gPriority).first; - Y_ABORT_UNLESS(GranuleCompactionPrioritySorting[gPriority].emplace(granule.GetGranuleId()).second); - } else { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "UpdateGranuleInfo")("granule", granule.DebugString())("new_priority", gPriority.DebugString())("old_priority", it->second.DebugString()); - auto itSorting = GranuleCompactionPrioritySorting.find(it->second); - Y_ABORT_UNLESS(itSorting != GranuleCompactionPrioritySorting.end()); - Y_ABORT_UNLESS(itSorting->second.erase(granule.GetGranuleId())); - if (itSorting->second.empty()) { - GranuleCompactionPrioritySorting.erase(itSorting); - } - it->second = gPriority; - Y_ABORT_UNLESS(GranuleCompactionPrioritySorting[gPriority].emplace(granule.GetGranuleId()).second); - } - } } std::shared_ptr<NKikimr::NOlap::TGranuleMeta> TGranulesStorage::GetGranuleForCompaction(const THashMap<ui64, std::shared_ptr<TGranuleMeta>>& granules) const { - if (!GranuleCompactionPrioritySorting.size()) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "no_granules_for_compaction"); - return {}; + const TInstant now = TInstant::Now(); + std::optional<NStorageOptimizer::TOptimizationPriority> priority; + std::shared_ptr<TGranuleMeta> granule; + for (auto&& i : granules) { + i.second->ActualizeOptimizer(now); + if (!priority || *priority < i.second->GetCompactionPriority()) { + priority = i.second->GetCompactionPriority(); + granule = i.second; + } + } + if (!priority) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "no_granules"); + return nullptr; } + if (priority->IsZero()) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "zero_priority"); + return nullptr; + } + return granule; +/* for (auto it = GranuleCompactionPrioritySorting.rbegin(); it != GranuleCompactionPrioritySorting.rend(); ++it) { if (it->first.GetWeight().IsZero()) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "zero_granule_reached"); @@ -42,15 +39,21 @@ std::shared_ptr<NKikimr::NOlap::TGranuleMeta> TGranulesStorage::GetGranuleForCom for (auto&& i : it->second) { auto itGranule = granules.find(i); Y_ABORT_UNLESS(itGranule != granules.end()); -// if (TMonotonic::Now() - itGranule->second->GetLastCompactionInstant() > TDuration::Seconds(1) || it->first.GetWeight().GetInternalLevelWeight() > 5000000) { + if (it->first.GetWeight().GetInternalLevelWeight() > 0 * 1024 * 1024) { + +// if (it->first.GetWeight().GetInternalLevelWeight() / 10000000 > 100 || +// it->first.GetWeight().GetInternalLevelWeight() % 10000000 > 100000) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "test_granule")("granule_stats", it->first.DebugString())("granule_id", i); return itGranule->second; -// } else { -// AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "test_granule_skipped")("granule_stats", it->first.DebugString())("granule_id", i)("skip_reason", "too_early_and_low_critical"); -// } + } else { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "test_granule_skipped")("granule_stats", it->first.DebugString())("granule_id", i)("skip_reason", "too_early_and_low_critical"); + break; + } } } return {}; +*/ } } // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/storage/storage.h b/ydb/core/tx/columnshard/engines/storage/storage.h index 353a9b59573..1fa7b524c88 100644 --- a/ydb/core/tx/columnshard/engines/storage/storage.h +++ b/ydb/core/tx/columnshard/engines/storage/storage.h @@ -10,8 +10,6 @@ class TGranulesStorage { private: const TCompactionLimits Limits; const NColumnShard::TEngineLogsCounters Counters; - THashMap<ui64, TCompactionPriority> GranulesCompactionPriority; - std::map<TCompactionPriority, std::set<ui64>> GranuleCompactionPrioritySorting; std::shared_ptr<IStoragesManager> StoragesManager; bool PackModificationFlag = false; THashMap<ui64, const TGranuleMeta*> PackModifiedGranules; |