diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-06-07 13:05:02 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-06-07 13:05:02 +0300 |
commit | aa3ba3163b1c51da3a1c01a4205f3c1b1f387928 (patch) | |
tree | 88d895b00645bdc9cfce59acb4e0d7b4237a570c | |
parent | ef854dbcf2e5f1dd1d75c96b2ea4fba819f7ae3f (diff) | |
download | ydb-aa3ba3163b1c51da3a1c01a4205f3c1b1f387928.tar.gz |
speedup compaction
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__write_index.cpp | 19 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.cpp | 108 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.h | 76 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/defs.cpp | 8 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/defs.h | 6 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine.h | 4 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine_logs.cpp | 67 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine_logs.h | 12 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/index_logic_logs.cpp | 13 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/portion_info.h | 35 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/storage/granule.cpp | 103 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/storage/granule.h | 114 |
12 files changed, 357 insertions, 208 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp index c7b59eb4bd5..5559af7dc0e 100644 --- a/ydb/core/tx/columnshard/columnshard__write_index.cpp +++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp @@ -169,9 +169,8 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx) // Eviction to S3. TTxExportFinish will delete src blob when dst blob get EEvictState::EXTERN state. } else if (!protectedBlobs.contains(blobId)) { // We could drop the blob immediately - if (!blobsToDrop.contains(blobId)) { + if (blobsToDrop.emplace(blobId).second) { LOG_S_TRACE("Delete evicted blob '" << blobId.ToStringNew() << "' at tablet " << Self->TabletID()); - blobsToDrop.insert(blobId); } } @@ -181,9 +180,8 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx) for (const auto& portionInfo : changes->PortionsToDrop) { for (const auto& rec : portionInfo.Records) { const auto& blobId = rec.BlobRange.BlobId; - if (!blobsToDrop.contains(blobId)) { + if (blobsToDrop.emplace(blobId).second) { LOG_S_TRACE("Delete blob '" << blobId.ToStringNew() << "' at tablet " << Self->TabletID()); - blobsToDrop.insert(blobId); } } Self->IncCounter(COUNTER_RAW_BYTES_ERASED, portionInfo.RawBytesSum()); @@ -258,6 +256,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx) } Self->FinishWriteIndex(ctx, Ev, ok, blobsWritten, bytesWritten); + Self->EnqueueProgressTx(ctx); return true; } @@ -291,19 +290,17 @@ void TColumnShard::FinishWriteIndex(const TActorContext& ctx, TEvPrivate::TEvWri TablesManager.MutablePrimaryIndex().FreeLocks(changes); if (changes->IsInsert()) { - ActiveIndexing = false; + BackgroundController.FinishIndexing(); IncCounter(ok ? COUNTER_INDEXING_SUCCESS : COUNTER_INDEXING_FAIL); IncCounter(COUNTER_INDEXING_BLOBS_WRITTEN, blobsWritten); IncCounter(COUNTER_INDEXING_BYTES_WRITTEN, bytesWritten); IncCounter(COUNTER_INDEXING_TIME, ev->Get()->Duration.MilliSeconds()); } else if (changes->IsCompaction()) { - ActiveCompaction--; - Y_VERIFY(changes->CompactionInfo); - bool inGranule = changes->CompactionInfo->InGranule(); + BackgroundController.FinishCompaction(changes->CompactionInfo->GetPlanCompaction()); - if (inGranule) { + if (changes->CompactionInfo->InGranule()) { IncCounter(ok ? COUNTER_COMPACTION_SUCCESS : COUNTER_COMPACTION_FAIL); IncCounter(COUNTER_COMPACTION_BLOBS_WRITTEN, blobsWritten); IncCounter(COUNTER_COMPACTION_BYTES_WRITTEN, bytesWritten); @@ -314,11 +311,11 @@ void TColumnShard::FinishWriteIndex(const TActorContext& ctx, TEvPrivate::TEvWri } IncCounter(COUNTER_COMPACTION_TIME, ev->Get()->Duration.MilliSeconds()); } else if (changes->IsCleanup()) { - ActiveCleanup = false; + BackgroundController.FinishCleanup(); IncCounter(ok ? COUNTER_CLEANUP_SUCCESS : COUNTER_CLEANUP_FAIL); } else if (changes->IsTtl()) { - ActiveTtl = false; + BackgroundController.FinishTtl(); IncCounter(ok ? COUNTER_TTL_SUCCESS : COUNTER_TTL_FAIL); IncCounter(COUNTER_EVICTION_BLOBS_WRITTEN, blobsWritten); diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 75c04412999..34fe9d09952 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -589,17 +589,22 @@ void TColumnShard::ScheduleNextGC(const TActorContext& ctx, bool cleanupOnly) { UpdateBlobMangerCounters(); if (BlobManager->CanCollectGarbage(cleanupOnly)) { + BlobManager->GetCounters().StartCollection->Add(1); Execute(CreateTxRunGc(), ctx); + } else { + BlobManager->GetCounters().SkipCollection->Add(1); } } void TColumnShard::EnqueueBackgroundActivities(bool periodic, TBackgroundActivity activity) { if (periodic) { if (LastPeriodicBackActivation > TInstant::Now() - ActivationPeriod) { + CSCounters.OnTooEarly(); return; } LastPeriodicBackActivation = TInstant::Now(); } + CSCounters.OnStartBackground(); SendPeriodicStats(); @@ -612,39 +617,14 @@ void TColumnShard::EnqueueBackgroundActivities(bool periodic, TBackgroundActivit // Schedule either indexing or compaction. if (activity.HasIndexation() || activity.HasCompaction()) { [&] { - if (ActiveIndexing || ActiveCompaction > 0) { - LOG_S_DEBUG("Indexing or compaction already in progress at tablet " << TabletID()); + if (BackgroundController.IsIndexingActive()) { return; } - // Preventing conflicts between indexing and compaction leads to election between them. - // Indexing vs compaction probability depends on index and insert table overload status. - // Prefer compaction: 25% by default; 50% if IndexOverloaded(); 6.25% if InsertTableOverloaded(). - const bool preferIndexing = RandomNumber<ui32>(1000) < 750; - - if (preferIndexing) { - if (activity.HasIndexation()) { - if (SetupIndexation()) { - BackgroundActivation++; - return; - } - } - if (activity.HasCompaction()) { - if (SetupCompaction()) { - return; - } - } - } else { - if (activity.HasCompaction()) { - if (SetupCompaction()) { - BackgroundActivation++; - return; - } - } - if (activity.HasIndexation()) { - if (SetupIndexation()) { - return; - } - } + if (activity.HasCompaction()) { + SetupCompaction(); + } + if (activity.HasIndexation()) { + SetupIndexation(); } }(); } @@ -671,16 +651,17 @@ void TColumnShard::EnqueueBackgroundActivities(bool periodic, TBackgroundActivit } } -bool TColumnShard::SetupIndexation() { +void TColumnShard::SetupIndexation() { + CSCounters.OnSetupIndexation(); ui32 blobs = 0; ui32 ignored = 0; ui64 size = 0; ui64 bytesToIndex = 0; std::vector<const NOlap::TInsertedData*> dataToIndex; dataToIndex.reserve(TLimits::MIN_SMALL_BLOBS_TO_INSERT); - THashMap<ui64, ui64> overloadedPathGranules; for (auto it = InsertTable->GetPathPriorities().rbegin(); it != InsertTable->GetPathPriorities().rend(); ++it) { for (auto* pathInfo : it->second) { + const bool hasSplitCompaction = BackgroundController.HasSplitCompaction(pathInfo->GetPathId()); const bool granulesOverloaded = TablesManager.GetPrimaryIndex()->HasOverloadedGranules(pathInfo->GetPathId()); for (auto& data : pathInfo->GetCommitted()) { ui32 dataSize = data.BlobSize(); @@ -695,6 +676,11 @@ bool TColumnShard::SetupIndexation() { CSCounters.SkipIndexationInputDueToGranuleOverload(dataSize); continue; } + if (hasSplitCompaction) { + ++ignored; + CSCounters.SkipIndexationInputDueToSplitCompaction(dataSize); + continue; + } ++blobs; bytesToIndex += dataSize; dataToIndex.push_back(&data); @@ -702,12 +688,6 @@ bool TColumnShard::SetupIndexation() { } } - for (auto& [p, cnt] : overloadedPathGranules) { - ui64 pathId(p); - ui64 count(cnt); - LOG_S_INFO("Overloaded granules (" << count << ") for pathId " << pathId << " at tablet " << TabletID()); - } - if (bytesToIndex < (ui64)Limits.MinInsertBytes && blobs < TLimits::MIN_SMALL_BLOBS_TO_INSERT) { LOG_S_DEBUG("Few data for indexation (" << bytesToIndex << " bytes in " << blobs << " blobs, ignored " << ignored << ") at tablet " << TabletID()); @@ -715,9 +695,10 @@ bool TColumnShard::SetupIndexation() { // Force small indexations sometimes to keep BatchCache smaller if (!bytesToIndex || SkippedIndexations < TSettings::MAX_INDEXATIONS_TO_SKIP) { ++SkippedIndexations; - return false; + return; } } + CSCounters.IndexationInput(bytesToIndex); SkippedIndexations = 0; LOG_S_DEBUG("Prepare indexing " << bytesToIndex << " bytes in " << dataToIndex.size() << " batches of committed " @@ -739,11 +720,11 @@ bool TColumnShard::SetupIndexation() { auto indexChanges = TablesManager.MutablePrimaryIndex().StartInsert(CompactionLimits.Get(), std::move(data)); if (!indexChanges) { LOG_S_NOTICE("Cannot prepare indexing at tablet " << TabletID()); - return false; + return; } auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndex(); - ActiveIndexing = true; + BackgroundController.StartIndexing(); auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, Settings.CacheDataAfterIndexing, std::move(cachedBlobs)); if (Tiers) { @@ -751,17 +732,16 @@ bool TColumnShard::SetupIndexation() { } ActorContext().Send(IndexingActor, std::make_unique<TEvPrivate::TEvIndexing>(std::move(ev))); - return true; } -bool TColumnShard::SetupCompaction() { - std::vector<std::unique_ptr<TEvPrivate::TEvCompaction>> events; +void TColumnShard::SetupCompaction() { + CSCounters.OnSetupCompaction(); - while (ActiveCompaction < TSettings::MAX_ACTIVE_COMPACTIONS) { + while (BackgroundController.GetCompactionsCount() < TSettings::MAX_ACTIVE_COMPACTIONS) { auto limits = CompactionLimits.Get(); auto compactionInfo = TablesManager.MutablePrimaryIndex().Compact(limits); if (!compactionInfo) { - if (events.empty()) { + if (!BackgroundController.GetCompactionsCount()) { LOG_S_DEBUG("Compaction not started: no portions to compact at tablet " << TabletID()); } break; @@ -769,38 +749,41 @@ bool TColumnShard::SetupCompaction() { LOG_S_DEBUG("Prepare " << *compactionInfo << " at tablet " << TabletID()); + auto& g = compactionInfo->GetObject<NOlap::TGranuleMeta>(); + if (compactionInfo->InGranule()) { + CSCounters.OnInternalCompactionInfo(g.GetAdditiveSummary().GetOther().GetPortionsSize(), g.GetAdditiveSummary().GetOther().GetPortionsCount()); + } else { + CSCounters.OnSplitCompactionInfo(g.GetAdditiveSummary().GetOther().GetPortionsSize(), g.GetAdditiveSummary().GetOther().GetPortionsCount()); + } + ui64 outdatedStep = GetOutdatedStep(); + const NOlap::TPlanCompactionInfo planInfo = compactionInfo->GetPlanCompaction(); auto indexChanges = TablesManager.MutablePrimaryIndex().StartCompaction(std::move(compactionInfo), NOlap::TSnapshot(outdatedStep, 0), limits); if (!indexChanges) { - if (events.empty()) { + if (!BackgroundController.GetCompactionsCount()) { LOG_S_DEBUG("Compaction not started: cannot prepare compaction at tablet " << TabletID()); } break; } + BackgroundController.StartCompaction(planInfo); auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndex(); - ActiveCompaction++; auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, Settings.CacheDataAfterCompaction); if (Tiers) { ev->SetTiering(Tiers->GetTiering()); } - events.push_back(std::make_unique<TEvPrivate::TEvCompaction>(std::move(ev), *BlobManager)); - } - - LOG_S_DEBUG("Compaction events " << events.size() << " ActiveCompaction " << ActiveCompaction << " at tablet " << TabletID()); - - for (auto& ev : events) { - ActorContext().Send(CompactionActor, std::move(ev)); + ActorContext().Send(CompactionActor, std::make_unique<TEvPrivate::TEvCompaction>(std::move(ev), *BlobManager)); } - return events.size() != 0; + LOG_S_DEBUG("ActiveCompactions: " << BackgroundController.GetCompactionsCount() << " at tablet " << TabletID()); } std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls, bool force) { - if (ActiveTtl) { + CSCounters.OnSetupTtl(); + if (BackgroundController.IsTtlActive()) { LOG_S_DEBUG("TTL already in progress at tablet " << TabletID()); return {}; } @@ -814,7 +797,7 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u if (Tiers) { eviction = Tiers->GetTiering(); // TODO: pathIds } - TablesManager.AddTtls(eviction, TInstant::Now(), force); + TablesManager.AddTtls(eviction, AppData()->TimeProvider->Now(), force); } if (eviction.empty()) { @@ -842,14 +825,15 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u bool needWrites = !indexChanges->PortionsToEvict.empty(); - ActiveTtl = true; + BackgroundController.StartTtl(); auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, false); ev->SetTiering(eviction); return std::make_unique<TEvPrivate::TEvEviction>(std::move(ev), *BlobManager, needWrites); } std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() { - if (ActiveCleanup) { + CSCounters.OnSetupCleanup(); + if (BackgroundController.IsCleanupActive()) { LOG_S_DEBUG("Cleanup already in progress at tablet " << TabletID()); return {}; } @@ -899,7 +883,7 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() { auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), changes, false); ev->SetPutStatus(NKikimrProto::OK); // No new blobs to write - ActiveCleanup = true; + BackgroundController.StartCleanup(); return ev; } diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 226c99d0e4c..f2302d96466 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -331,6 +331,72 @@ private: } }; + class TBackgroundController { + private: + using TCurrentCompaction = THashMap<ui64, NOlap::TPlanCompactionInfo>; + bool ActiveIndexing = false; + TCurrentCompaction ActiveCompactionInfo; + bool ActiveCleanup = false; + bool ActiveTtl = false; + public: + void StartCompaction(const NOlap::TPlanCompactionInfo& info) { + Y_VERIFY(ActiveCompactionInfo.emplace(info.GetPathId(), info).second); + } + void FinishCompaction(const NOlap::TPlanCompactionInfo& info) { + Y_VERIFY(ActiveCompactionInfo.erase(info.GetPathId())); + } + const TCurrentCompaction& GetActiveCompaction() const { + return ActiveCompactionInfo; + } + ui32 GetCompactionsCount() const { + return ActiveCompactionInfo.size(); + } + + void StartIndexing() { + Y_VERIFY(!ActiveIndexing); + ActiveIndexing = true; + } + void FinishIndexing() { + Y_VERIFY(ActiveIndexing); + ActiveIndexing = false; + } + bool IsIndexingActive() const { + return ActiveIndexing; + } + + void StartCleanup() { + Y_VERIFY(!ActiveCleanup); + ActiveCleanup = true; + } + void FinishCleanup() { + Y_VERIFY(ActiveCleanup); + ActiveCleanup = false; + } + bool IsCleanupActive() const { + return ActiveCleanup; + } + + void StartTtl() { + Y_VERIFY(!ActiveTtl); + ActiveTtl = true; + } + void FinishTtl() { + Y_VERIFY(ActiveTtl); + ActiveTtl = false; + } + bool IsTtlActive() const { + return ActiveTtl; + } + + bool HasSplitCompaction(const ui64 pathId) const { + auto it = ActiveCompactionInfo.find(pathId); + if (it == ActiveCompactionInfo.end()) { + return false; + } + return !it->second.IsInternal(); + } + }; + using TSchemaPreset = TSchemaPreset; using TTableInfo = TTableInfo; @@ -353,7 +419,6 @@ private: ui64 WritesInFly = 0; ui64 OwnerPathId = 0; ui64 StatsReportRound = 0; - ui64 BackgroundActivation = 0; ui32 SkippedIndexations = TSettings::MAX_INDEXATIONS_TO_SKIP; // Force indexation on tablet init TString OwnerPath; @@ -401,11 +466,8 @@ private: THashMap<TULID, TPartsForLTXShard> LongTxWritesByUniqueId; TMultiMap<TRowVersion, TEvColumnShard::TEvRead::TPtr> WaitingReads; TMultiMap<TRowVersion, TEvColumnShard::TEvScan::TPtr> WaitingScans; - bool ActiveIndexing = false; - ui32 ActiveCompaction = 0; - bool ActiveCleanup = false; - bool ActiveTtl = false; ui32 ActiveEvictions = 0; + TBackgroundController BackgroundController; std::unique_ptr<TBlobManager> BlobManager; TInFlightReadsTracker InFlightReadsTracker; TSettings Settings; @@ -465,8 +527,8 @@ private: void ScheduleNextGC(const TActorContext& ctx, bool cleanupOnly = false); - bool SetupIndexation(); - bool SetupCompaction(); + void SetupIndexation(); + void SetupCompaction(); std::unique_ptr<TEvPrivate::TEvEviction> SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls = {}, bool force = false); std::unique_ptr<TEvPrivate::TEvWriteIndex> SetupCleanup(); diff --git a/ydb/core/tx/columnshard/defs.cpp b/ydb/core/tx/columnshard/defs.cpp index 1f061db77c4..5728645fab8 100644 --- a/ydb/core/tx/columnshard/defs.cpp +++ b/ydb/core/tx/columnshard/defs.cpp @@ -20,4 +20,12 @@ void TLimits::SetMaxBlobSize(const ui64 value) { MaxBlobSize = value; } +TLimits::TLimits() + : MinInsertBytes(MIN_BYTES_TO_INSERT, 1, 2 * MAX_BYTES_TO_INSERT) + , MaxInsertBytes(15 * MAX_BYTES_TO_INSERT, 0, 30 * MAX_BYTES_TO_INSERT) + , InsertTableSize(MIN_SMALL_BLOBS_TO_INSERT, 0, 1000) +{ + +} + } diff --git a/ydb/core/tx/columnshard/defs.h b/ydb/core/tx/columnshard/defs.h index 7717ea64a70..49f59feae87 100644 --- a/ydb/core/tx/columnshard/defs.h +++ b/ydb/core/tx/columnshard/defs.h @@ -47,11 +47,7 @@ struct TLimits { TControlWrapper MaxInsertBytes; TControlWrapper InsertTableSize; - TLimits() - : MinInsertBytes(MIN_BYTES_TO_INSERT, 1, 2 * MAX_BYTES_TO_INSERT) - , MaxInsertBytes(MAX_BYTES_TO_INSERT, 0, 2 * MAX_BYTES_TO_INSERT) - , InsertTableSize(MIN_SMALL_BLOBS_TO_INSERT, 0, 1000) - {} + TLimits(); void RegisterControls(TControlBoard& icb) { icb.RegisterSharedControl(MinInsertBytes, "ColumnShardControls.MinBytesToIndex"); diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index 299d75dea0a..fbc893ae5a1 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -567,7 +567,7 @@ public: virtual const std::shared_ptr<arrow::Schema>& GetReplaceKey() const { return GetIndexInfo().GetReplaceKey(); } virtual const std::shared_ptr<arrow::Schema>& GetSortingKey() const { return GetIndexInfo().GetSortingKey(); } virtual const std::shared_ptr<arrow::Schema>& GetIndexKey() const { return GetIndexInfo().GetIndexKey(); } - virtual const THashSet<ui64>* GetOverloadedGranules(ui64 /*pathId*/) const { return nullptr; } + virtual const THashSet<ui64>* GetOverloadedGranules(const ui64 pathId) const = 0; bool HasOverloadedGranules(const ui64 pathId) const { return GetOverloadedGranules(pathId) != nullptr; } @@ -588,7 +588,7 @@ public: ui32 maxRecords) = 0; virtual std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiering>& pathEviction, const std::shared_ptr<arrow::Schema>& schema, ui64 maxBytesToEvict = TCompactionLimits::DEFAULT_EVICTION_BYTES) = 0; - virtual bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> changes, const TSnapshot& snapshot) = 0; + virtual bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> changes, const TSnapshot& snapshot) noexcept = 0; virtual void FreeLocks(std::shared_ptr<TColumnEngineChanges> changes) = 0; virtual void UpdateDefaultSchema(const TSnapshot& snapshot, TIndexInfo&& info) = 0; //virtual void UpdateTableSchema(ui64 pathId, const TSnapshot& snapshot, TIndexInfo&& info) = 0; // TODO diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 076100fe535..f838be03282 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -312,11 +312,12 @@ bool TColumnEngineForLogs::Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBl if (spg->Empty()) { EmptyGranules.insert(granule); emptyGranulePaths.insert(spg->PathId()); - } else { - CleanupGranules.insert(granule); } for (const auto& [_, portionInfo] : spg->GetPortions()) { UpdatePortionStats(portionInfo, EStatsUpdateType::LOAD); + if (portionInfo.CheckForCleanup()) { + CleanupPortions.emplace(portionInfo.GetAddress()); + } } } @@ -472,7 +473,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const T const TCompactionLimits& limits, THashSet<ui64>& pathsToDrop, ui32 maxRecords) { - auto changes = TChanges::BuildClenupChanges(DefaultMark(), snapshot, limits); + auto changes = TChanges::BuildCleanupChanges(DefaultMark(), snapshot, limits); ui32 affectedRecords = 0; // Add all portions from dropped paths @@ -514,39 +515,31 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const T // Add stale portions of alive paths THashSet<ui64> cleanGranules; - for (ui64 granule : CleanupGranules) { - auto spg = Granules.find(granule)->second; - Y_VERIFY(spg); - - bool isClean = true; - for (auto& [portion, info] : spg->GetPortions()) { - if (info.IsActive() || dropPortions.contains(portion)) { + std::shared_ptr<TGranuleMeta> granuleMeta; + for (auto it = CleanupPortions.begin(); it != CleanupPortions.end();) { + if (!granuleMeta || granuleMeta->GetGranuleId() != it->GetGranuleId()) { + auto itGranule = Granules.find(it->GetGranuleId()); + if (itGranule == Granules.end()) { + it = CleanupPortions.erase(it); continue; } - - isClean = false; - if (info.GetXSnapshot() < snapshot) { - affectedRecords += info.NumRecords(); - changes->PortionsToDrop.push_back(info); - } - + granuleMeta = itGranule->second; + } + Y_VERIFY(granuleMeta); + auto* portionInfo = granuleMeta->GetPortionPointer(it->GetPortionId()); + if (!portionInfo) { + it = CleanupPortions.erase(it); + } else if (portionInfo->CheckForCleanup(snapshot)) { + affectedRecords += portionInfo->NumRecords(); + changes->PortionsToDrop.push_back(*portionInfo); + it = CleanupPortions.erase(it); if (affectedRecords > maxRecords) { + changes->NeedRepeat = true; break; } + } else if (portionInfo->CheckForCleanup()) { + ++it; } - - if (isClean) { - cleanGranules.insert(granule); - } - - if (affectedRecords > maxRecords) { - changes->NeedRepeat = true; - break; - } - } - - for (ui64 granule : cleanGranules) { - CleanupGranules.erase(granule); } return changes; @@ -593,12 +586,13 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash if (auto max = info.MaxValue(ttlColumnId)) { bool keep = NArrow::ScalarLess(expireTimestamp, max); - + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "scalar_less_result")("keep", keep)("tryEvictPortion", tryEvictPortion)("allowDrop", allowDrop); if (keep && tryEvictPortion) { TString tierName; for (auto& tierRef : ttl.GetOrderedTiers()) { // TODO: lower/upper_bound + move into TEviction auto& tierInfo = tierRef.Get(); if (!indexInfo.AllowTtlOverColumn(tierInfo.GetEvictColumnName())) { + SignalCounters.OnPortionNoTtlColumn(info.BlobsBytes()); continue; // Ignore tiers with bad ttl column } if (NArrow::ScalarLess(tierInfo.EvictScalar(schema), max)) { @@ -612,12 +606,17 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash bool needExport = ttl.NeedExport(tierName); changes->PortionsToEvict.emplace_back( info, TPortionEvictionFeatures(tierName, pathId, needExport)); + SignalCounters.OnPortionToEvict(info.BlobsBytes()); } } if (!keep && allowDrop) { dropBlobs += info.NumRecords(); changes->PortionsToDrop.push_back(info); + SignalCounters.OnPortionToDrop(info.BlobsBytes()); } + } else { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "scalar_less_not_max"); + SignalCounters.OnPortionNoBorder(info.BlobsBytes()); } } } @@ -660,7 +659,7 @@ std::vector<std::vector<std::pair<TMark, ui64>>> TColumnEngineForLogs::EmptyGran } bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges, - const TSnapshot& snapshot) { + const TSnapshot& snapshot) noexcept { auto changes = std::static_pointer_cast<TChanges>(indexChanges); // Update tmp granules with real ids @@ -713,13 +712,13 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnE if (changes->IsCompaction()) { Y_VERIFY(changes->CompactionInfo); for (auto& portionInfo : changes->SwitchedPortions) { - CleanupGranules.insert(portionInfo.Granule()); + CleanupPortions.insert(portionInfo.GetAddress()); } } return true; } -bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes, const TSnapshot& snapshot, bool apply) { +bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes, const TSnapshot& snapshot, bool apply) noexcept { const std::vector<TPortionInfo>* switchedPortions = nullptr; if (changes.IsCompaction()) { Y_VERIFY(changes.CompactionInfo); diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index 4a225e60dfe..5e8e1b53a9e 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -102,7 +102,7 @@ public: return changes; } - static std::shared_ptr<TChanges> BuildClenupChanges(const TMark& defaultMark, const TSnapshot& initSnapshot, const TCompactionLimits& limits) { + static std::shared_ptr<TChanges> BuildCleanupChanges(const TMark& defaultMark, const TSnapshot& initSnapshot, const TCompactionLimits& limits) { auto changes = std::make_shared<TChanges>(TColumnEngineChanges::CLEANUP, defaultMark, limits, TPrivateTag()); changes->InitSnapshot = initSnapshot; return changes; @@ -197,7 +197,7 @@ public: return VersionedIndex; } - const THashSet<ui64>* GetOverloadedGranules(ui64 pathId) const override { + const THashSet<ui64>* GetOverloadedGranules(const ui64 pathId) const override { return GranulesStorage->GetOverloaded(pathId); } @@ -239,7 +239,7 @@ public: ui64 maxEvictBytes = TCompactionLimits::DEFAULT_EVICTION_BYTES) override; bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges, - const TSnapshot& snapshot) override; + const TSnapshot& snapshot) noexcept override; void FreeLocks(std::shared_ptr<TColumnEngineChanges> changes) override; @@ -286,7 +286,7 @@ private: /// Set of empty granules. /// Just for providing count of empty granules. THashSet<ui64> EmptyGranules; - THashSet<ui64> CleanupGranules; + TSet<TPortionAddress> CleanupPortions; TColumnEngineStats Counters; ui64 LastPortion; ui64 LastGranule; @@ -314,7 +314,7 @@ private: PathGranules.clear(); PathStats.clear(); EmptyGranules.clear(); - CleanupGranules.clear(); + CleanupPortions.clear(); Counters.Clear(); LastPortion = 0; @@ -325,7 +325,7 @@ private: bool LoadGranules(IDbWrapper& db); bool LoadColumns(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBlobs); bool LoadCounters(IDbWrapper& db); - bool ApplyChanges(IDbWrapper& db, const TChanges& changes, const TSnapshot& snapshot, bool apply); + bool ApplyChanges(IDbWrapper& db, const TChanges& changes, const TSnapshot& snapshot, bool apply) noexcept; void EraseGranule(ui64 pathId, ui64 granule, const TMark& mark); diff --git a/ydb/core/tx/columnshard/engines/index_logic_logs.cpp b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp index ef6a6649ff2..e4e693f1205 100644 --- a/ydb/core/tx/columnshard/engines/index_logic_logs.cpp +++ b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp @@ -92,11 +92,11 @@ public: , Batch(batch) , Schema(schema) { - if (granuleMeta && granuleMeta->GetSummary().GetOther().GetRecordsCount()) { - SortedColumnIds = granuleMeta->GetSummary().GetColumnIdsSortedBySizeDescending(); + if (granuleMeta && granuleMeta->GetAdditiveSummary().GetOther().GetRecordsCount()) { + SortedColumnIds = granuleMeta->GetHardSummary().GetColumnIdsSortedBySizeDescending(); const auto biggestColumn = SortedColumnIds.front(); Y_VERIFY(biggestColumn.GetPackedBlobsSize()); - const double expectedPackedRecordSize = 1.0 * biggestColumn.GetPackedBlobsSize() / granuleMeta->GetSummary().GetOther().GetRecordsCount(); + const double expectedPackedRecordSize = 1.0 * biggestColumn.GetPackedBlobsSize() / granuleMeta->GetAdditiveSummary().GetOther().GetRecordsCount(); BaseStepRecordsCount = ExpectedBlobSize / expectedPackedRecordSize; for (ui32 i = 1; i < SortedColumnIds.size(); ++i) { Y_VERIFY(SortedColumnIds[i - 1].GetPackedBlobsSize() >= SortedColumnIds[i].GetPackedBlobsSize()); @@ -360,17 +360,10 @@ TConclusion<std::vector<TString>> TIndexationLogic::DoApply(std::shared_ptr<TCol changes->AddPathIfNotExists(pathId); // We could merge data here cause tablet limits indexing data portions -#if 0 - auto merged = NArrow::CombineSortedBatches(batches, indexInfo.SortDescription()); // insert: no replace - Y_VERIFY(merged); - Y_VERIFY_DEBUG(NArrow::IsSorted(merged, indexInfo.GetReplaceKey())); -#else auto merged = NArrow::CombineSortedBatches(batches, resultSchema->GetIndexInfo().SortReplaceDescription()); Y_VERIFY(merged); Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(merged, resultSchema->GetIndexInfo().GetReplaceKey())); -#endif - auto granuleBatches = SliceIntoGranules(merged, changes->PathToGranule[pathId], resultSchema->GetIndexInfo()); for (auto& [granule, batch] : granuleBatches) { auto portions = MakeAppendedPortions(pathId, batch, granule, maxSnapshot, blobs, changes->GetGranuleMeta()); diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h index bd96e230781..e943c29f710 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portion_info.h @@ -221,6 +221,23 @@ struct TPortionMeta { } }; +class TPortionAddress { +private: + YDB_READONLY(ui64, GranuleId, 0); + YDB_READONLY(ui64, PortionId, 0); +public: + TPortionAddress(const ui64 granuleId, const ui64 portionId) + : GranuleId(granuleId) + , PortionId(portionId) + { + + } + + bool operator<(const TPortionAddress& item) const { + return std::tie(GranuleId, PortionId) < std::tie(item.GranuleId, item.PortionId); + } +}; + struct TPortionInfo { static constexpr const ui32 BLOB_BYTES_LIMIT = 8 * 1024 * 1024; @@ -237,6 +254,18 @@ struct TPortionInfo { bool CanIntersectOthers() const { return !Valid() || IsInserted() || IsEvicted(); } size_t NumRecords() const { return Records.size(); } + bool CheckForCleanup(const TSnapshot& snapshot) const { + if (!CheckForCleanup()) { + return false; + } + + return GetXSnapshot() < snapshot; + } + + bool CheckForCleanup() const { + return !IsActive(); + } + bool AllowEarlyFilter() const { return Meta.Produced == TPortionMeta::COMPACTED || Meta.Produced == TPortionMeta::SPLIT_COMPACTED; @@ -261,6 +290,12 @@ struct TPortionInfo { return rec.Granule; } + TPortionAddress GetAddress() const { + Y_VERIFY(!Empty()); + auto& rec = Records[0]; + return TPortionAddress(rec.Granule, rec.Portion); + } + void SetGranule(ui64 granule) { for (auto& rec : Records) { rec.Granule = granule; diff --git a/ydb/core/tx/columnshard/engines/storage/granule.cpp b/ydb/core/tx/columnshard/engines/storage/granule.cpp index 43f973f9761..f41202c8cfc 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule.cpp +++ b/ydb/core/tx/columnshard/engines/storage/granule.cpp @@ -4,22 +4,30 @@ namespace NKikimr::NOlap { bool TGranuleMeta::NeedSplit(const TCompactionLimits& limits, bool& inserted) const { - inserted = GetSummary().GetInserted().GetPortionsCount(); - bool differentBorders = GetSummary().GetDifferentBorders(); - if (GetSummary().GetActivePortionsCount() < 2) { + inserted = GetAdditiveSummary().GetInserted().GetPortionsCount(); + bool differentBorders = GetHardSummary().GetDifferentBorders(); + if (GetAdditiveSummary().GetActivePortionsCount() < 2) { inserted = false; return false; } - return differentBorders && (GetSummary().GetMaxColumnsSize() >= limits.GranuleBlobSplitSize || GetSummary().GetGranuleSize() >= limits.GranuleOverloadSize); + return differentBorders && (GetAdditiveSummary().GetMaxColumnsSize() >= limits.GranuleBlobSplitSize || + GetAdditiveSummary().GetGranuleSize() >= limits.GranuleOverloadSize); } ui64 TGranuleMeta::Size() const { - return GetSummary().GetGranuleSize(); + return GetAdditiveSummary().GetGranuleSize(); } void TGranuleMeta::UpsertPortion(const TPortionInfo& info) { - Portions[info.Portion()] = info; - OnAfterChangePortion(info.Portion()); + auto it = Portions.find(info.Portion()); + if (it == Portions.end()) { + OnBeforeChangePortion(nullptr, &info); + Portions.emplace(info.Portion(), info); + } else { + OnBeforeChangePortion(&it->second, &info); + it->second = info; + } + OnAfterChangePortion(); } bool TGranuleMeta::ErasePortion(const ui64 portion) { @@ -28,21 +36,37 @@ bool TGranuleMeta::ErasePortion(const ui64 portion) { return false; } AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "portion_erased")("portion_info", it->second)("pathId", Record.PathId); + OnBeforeChangePortion(&it->second, nullptr); Portions.erase(it); - OnAfterChangePortion(portion); + OnAfterChangePortion(); return true; } void TGranuleMeta::AddColumnRecord(const TIndexInfo& indexInfo, const TColumnRecord& rec) { - Portions[rec.Portion].AddRecord(indexInfo, rec); - OnAfterChangePortion(rec.Portion); + auto& portion = Portions[rec.Portion]; + auto portionNew = portion; + portionNew.AddRecord(indexInfo, rec); + OnBeforeChangePortion(&portion, &portionNew); + portion = std::move(portionNew); + OnAfterChangePortion(); } -void TGranuleMeta::OnAfterChangePortion(const ui64 /*portion*/) { - ResetCaches(); +void TGranuleMeta::OnAfterChangePortion() { Owner->UpdateGranuleInfo(*this); } +void TGranuleMeta::OnBeforeChangePortion(const TPortionInfo* portionBefore, const TPortionInfo* portionAfter) { + HardSummaryCache = {}; + if (!!AdditiveSummaryCache) { + if (portionBefore && portionBefore->IsActive()) { + AdditiveSummaryCache->RemovePortion(*portionBefore); + } + if (portionAfter && portionAfter->IsActive()) { + AdditiveSummaryCache->AddPortion(*portionAfter); + } + } +} + void TGranuleMeta::OnCompactionFinished() { AllowInsertionFlag = false; Y_VERIFY(Activity.erase(EActivity::InternalCompaction) || Activity.erase(EActivity::SplitCompaction)); @@ -77,38 +101,27 @@ void TGranuleMeta::OnCompactionStarted(const bool inGranule) { } } -void TGranuleMeta::RebuildMetrics() const { - TGranuleSummary result; +void TGranuleMeta::RebuildHardMetrics() const { + TGranuleHardSummary result; std::map<ui32, TColumnSummary> packedSizeByColumns; bool differentBorders = false; THashSet<NArrow::TReplaceKey> borders; for (auto&& i : Portions) { - if (i.second.IsActive()) { - if (!differentBorders) { - borders.insert(i.second.IndexKeyStart()); - borders.insert(i.second.IndexKeyEnd()); - differentBorders = (borders.size() > 1); - } - auto sizes = i.second.BlobsSizes(); - for (auto&& c : i.second.Records) { - auto it = packedSizeByColumns.find(c.ColumnId); - if (it == packedSizeByColumns.end()) { - it = packedSizeByColumns.emplace(c.ColumnId, TColumnSummary(c.ColumnId)).first; - } - it->second.AddData(i.second.IsInserted(), c.BlobRange.Size, i.second.NumRows()); - } - if (i.second.IsInserted()) { - result.Inserted.PortionsSize += sizes.first; - result.Inserted.MaxColumnsSize += sizes.second; - result.Inserted.RecordsCount += i.second.NumRows(); - ++result.Inserted.PortionsCount; - } else { - result.Other.PortionsSize += sizes.first; - result.Other.MaxColumnsSize += sizes.second; - result.Other.RecordsCount += i.second.NumRows(); - ++result.Other.PortionsCount; + if (!i.second.IsActive()) { + continue; + } + if (!differentBorders) { + borders.insert(i.second.IndexKeyStart()); + borders.insert(i.second.IndexKeyEnd()); + differentBorders = (borders.size() > 1); + } + for (auto&& c : i.second.Records) { + auto it = packedSizeByColumns.find(c.ColumnId); + if (it == packedSizeByColumns.end()) { + it = packedSizeByColumns.emplace(c.ColumnId, TColumnSummary(c.ColumnId)).first; } + it->second.AddData(i.second.IsInserted(), c.BlobRange.Size, i.second.NumRows()); } } { @@ -124,7 +137,19 @@ void TGranuleMeta::RebuildMetrics() const { std::swap(result.ColumnIdsSortedBySizeDescending, transpSorted); } result.DifferentBorders = differentBorders; - SummaryCache = result; + HardSummaryCache = result; +} + +void TGranuleMeta::RebuildAdditiveMetrics() const { + TGranuleAdditiveSummary result; + + for (auto&& i : Portions) { + if (!i.second.IsActive()) { + continue; + } + result.AddPortion(i.second); + } + AdditiveSummaryCache = result; } } // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/storage/granule.h b/ydb/core/tx/columnshard/engines/storage/granule.h index 1446af52ffb..054f73de995 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule.h +++ b/ydb/core/tx/columnshard/engines/storage/granule.h @@ -32,25 +32,45 @@ public: class TDataClassSummary { private: - ui64 PortionsSize = 0; - ui64 MaxColumnsSize = 0; - ui64 PortionsCount = 0; - ui64 RecordsCount = 0; + i64 PortionsSize = 0; + i64 MaxColumnsSize = 0; + i64 PortionsCount = 0; + i64 RecordsCount = 0; friend class TGranuleMeta; public: - ui64 GetPortionsSize() const { + i64 GetPortionsSize() const { return PortionsSize; } - ui64 GetRecordsCount() const { + i64 GetRecordsCount() const { return RecordsCount; } - ui64 GetMaxColumnsSize() const { + i64 GetMaxColumnsSize() const { return MaxColumnsSize; } - ui64 GetPortionsCount() const { + i64 GetPortionsCount() const { return PortionsCount; } + void AddPortion(const TPortionInfo& info) { + const auto sizes = info.BlobsSizes(); + PortionsSize += sizes.first; + MaxColumnsSize += sizes.second; + RecordsCount += info.NumRows(); + ++PortionsCount; + } + + void RemovePortion(const TPortionInfo& info) { + const auto sizes = info.BlobsSizes(); + PortionsSize -= sizes.first; + Y_VERIFY(PortionsSize >= 0); + MaxColumnsSize -= sizes.second; + Y_VERIFY(MaxColumnsSize >= 0); + RecordsCount -= info.NumRows(); + Y_VERIFY(RecordsCount >= 0); + --PortionsCount; + Y_VERIFY(PortionsCount >= 0); + } + TDataClassSummary operator+(const TDataClassSummary& item) const { TDataClassSummary result; result.PortionsSize = PortionsSize + item.PortionsSize; @@ -106,26 +126,32 @@ public: } }; -class TGranuleSummary { +class TGranuleHardSummary { private: - TDataClassSummary Inserted; - TDataClassSummary Other; - friend class TGranuleMeta; std::vector<TColumnSummary> ColumnIdsSortedBySizeDescending; bool DifferentBorders = false; + friend class TGranuleMeta; public: + bool GetDifferentBorders() const { + return DifferentBorders; + } const std::vector<TColumnSummary>& GetColumnIdsSortedBySizeDescending() const { return ColumnIdsSortedBySizeDescending; } +}; + +class TGranuleAdditiveSummary { +private: + TDataClassSummary Inserted; + TDataClassSummary Other; + friend class TGranuleMeta; +public: const TDataClassSummary& GetInserted() const { return Inserted; } const TDataClassSummary& GetOther() const { return Other; } - bool GetDifferentBorders() const { - return DifferentBorders; - } ui64 GetGranuleSize() const { return (Inserted + Other).GetPortionsSize(); } @@ -135,23 +161,34 @@ public: ui64 GetMaxColumnsSize() const { return (Inserted + Other).GetMaxColumnsSize(); } + void AddPortion(const TPortionInfo& info) { + if (info.IsInserted()) { + Inserted.AddPortion(info); + } else { + Other.AddPortion(info); + } + } + void RemovePortion(const TPortionInfo& info) { + if (info.IsInserted()) { + Inserted.RemovePortion(info); + } else { + Other.RemovePortion(info); + } + } }; class TCompactionPriority: public TCompactionPriorityInfo { private: using TBase = TCompactionPriorityInfo; - TGranuleSummary GranuleSummary; + TGranuleAdditiveSummary GranuleSummary; ui64 GetWeightCorrected() const { - if (!GranuleSummary.GetDifferentBorders()) { - return 0; - } if (GranuleSummary.GetActivePortionsCount() <= 1) { return 0; } return GranuleSummary.GetGranuleSize() * GranuleSummary.GetActivePortionsCount() * GranuleSummary.GetActivePortionsCount(); } public: - TCompactionPriority(const TCompactionPriorityInfo& data, const TGranuleSummary& granuleSummary) + TCompactionPriority(const TCompactionPriorityInfo& data, const TGranuleAdditiveSummary& granuleSummary) : TBase(data) , GranuleSummary(granuleSummary) { @@ -167,13 +204,11 @@ public: class TGranuleMeta: public ICompactionObjectCallback, TNonCopyable { private: THashMap<ui64, TPortionInfo> Portions; // portion -> portionInfo - mutable std::optional<TGranuleSummary> SummaryCache; + mutable std::optional<TGranuleAdditiveSummary> AdditiveSummaryCache; + mutable std::optional<TGranuleHardSummary> HardSummaryCache; bool NeedSplit(const TCompactionLimits& limits, bool& inserted) const; - void RebuildMetrics() const; - - void ResetCaches() { - SummaryCache = {}; - } + void RebuildHardMetrics() const; + void RebuildAdditiveMetrics() const; enum class EActivity { SplitCompaction, @@ -185,16 +220,23 @@ private: mutable bool AllowInsertionFlag = false; std::shared_ptr<TGranulesStorage> Owner; - void OnAfterChangePortion(const ui64 portion); + void OnBeforeChangePortion(const TPortionInfo* portionBefore, const TPortionInfo* portionAfter); + void OnAfterChangePortion(); public: - const TGranuleSummary& GetSummary() const { - if (!SummaryCache) { - RebuildMetrics(); + const TGranuleHardSummary& GetHardSummary() const { + if (!HardSummaryCache) { + RebuildHardMetrics(); + } + return *HardSummaryCache; + } + const TGranuleAdditiveSummary& GetAdditiveSummary() const { + if (!AdditiveSummaryCache) { + RebuildAdditiveMetrics(); } - return *SummaryCache; + return *AdditiveSummaryCache; } TCompactionPriority GetCompactionPriority() const { - return TCompactionPriority(CompactionPriorityInfo, GetSummary()); + return TCompactionPriority(CompactionPriorityInfo, GetAdditiveSummary()); } bool NeedCompaction(const TCompactionLimits& limits) const { @@ -252,6 +294,14 @@ public: return it->second; } + const TPortionInfo* GetPortionPointer(const ui64 portion) const { + auto it = Portions.find(portion); + if (it == Portions.end()) { + return nullptr; + } + return &it->second; + } + bool ErasePortion(const ui64 portion); explicit TGranuleMeta(const TGranuleRecord& rec, std::shared_ptr<TGranulesStorage> owner) |