diff options
author | alexvru <alexvru@ydb.tech> | 2022-09-01 20:09:02 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2022-09-01 20:09:02 +0300 |
commit | f64eb8ed70d8ec6b2761cd046be5a9ecfce78d07 (patch) | |
tree | c3fa9307d87597ef478aa8c93ed87dc5acd6f224 | |
parent | 7cfd122cb661b485fb0fb4139573a0beac97ae48 (diff) | |
download | ydb-f64eb8ed70d8ec6b2761cd046be5a9ecfce78d07.tar.gz |
Fix long compaction
5 files changed, 27 insertions, 19 deletions
diff --git a/ydb/core/blobstorage/pdisk/mock/pdisk_mock.cpp b/ydb/core/blobstorage/pdisk/mock/pdisk_mock.cpp index ecb91e1b2be..866dea43dd3 100644 --- a/ydb/core/blobstorage/pdisk/mock/pdisk_mock.cpp +++ b/ydb/core/blobstorage/pdisk/mock/pdisk_mock.cpp @@ -151,6 +151,7 @@ struct TPDiskMockState::TImpl { void DeleteChunk(TOwner& owner, TChunkIdx chunkIdx) { const ui32 num = owner.ReservedChunks.erase(chunkIdx) + owner.CommittedChunks.erase(chunkIdx); Y_VERIFY(num); + owner.ChunkData.erase(chunkIdx); const bool inserted = FreeChunks.insert(chunkIdx).second; Y_VERIFY(inserted); AdjustFreeChunks(); diff --git a/ydb/core/blobstorage/vdisk/hulldb/fresh/fresh_data.h b/ydb/core/blobstorage/vdisk/hulldb/fresh/fresh_data.h index 0ff369c0882..64e0f4ed64e 100644 --- a/ydb/core/blobstorage/vdisk/hulldb/fresh/fresh_data.h +++ b/ydb/core/blobstorage/vdisk/hulldb/fresh/fresh_data.h @@ -55,7 +55,7 @@ namespace NKikimr { void PutAppendix(std::shared_ptr<TFreshAppendix> &&a, ui64 firstLsn, ui64 lastLsn); // Compaction - bool NeedsCompaction(ui64 yardFreeUpToLsn) const; + bool NeedsCompaction(ui64 yardFreeUpToLsn, bool force) const; TIntrusivePtr<TFreshSegment> FindSegmentForCompaction(); void CompactionSstCreated(TIntrusivePtr<TFreshSegment> &&freshSegment); void CompactionFinished(); @@ -112,9 +112,11 @@ namespace NKikimr { } template <class TKey, class TMemRec> - bool TFreshData<TKey, TMemRec>::NeedsCompaction(ui64 yardFreeUpToLsn) const { + bool TFreshData<TKey, TMemRec>::NeedsCompaction(ui64 yardFreeUpToLsn, bool force) const { if (CompactionInProgress()) { return false; + } else if (force) { + return true; } else { const bool compactDregByYard = UseDreg && Dreg && Dreg->NeedsCompactionByYard(yardFreeUpToLsn); const bool compactCurByYard = Cur && Cur->NeedsCompactionByYard(yardFreeUpToLsn); diff --git a/ydb/core/blobstorage/vdisk/hulldb/generic/hullds_idx.h b/ydb/core/blobstorage/vdisk/hulldb/generic/hullds_idx.h index cb647db1bdf..de1affc0a11 100644 --- a/ydb/core/blobstorage/vdisk/hulldb/generic/hullds_idx.h +++ b/ydb/core/blobstorage/vdisk/hulldb/generic/hullds_idx.h @@ -282,9 +282,9 @@ namespace NKikimr { } // Fresh Compaction - bool NeedsFreshCompaction(ui64 yardFreeUpToLsn) const { + bool NeedsFreshCompaction(ui64 yardFreeUpToLsn, bool force) const { Y_VERIFY_DEBUG(Loaded); - return Fresh.NeedsCompaction(yardFreeUpToLsn); + return Fresh.NeedsCompaction(yardFreeUpToLsn, force); } TIntrusivePtr<TFreshSegment> FindFreshSegmentForCompaction() { diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullactor.cpp b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullactor.cpp index 31f2fe47977..7581fcd3c53 100644 --- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullactor.cpp +++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullactor.cpp @@ -32,7 +32,7 @@ namespace NKikimr { ui64 requestId, const TActorId &recipient) { - FullCompactionAttrs = std::make_optional<NHullComp::TFullCompactionAttrs>(fullCompactionLsn, now); + FullCompactionAttrs.emplace(fullCompactionLsn, now); Requests.push_back({type, requestId, recipient}); } @@ -53,6 +53,11 @@ namespace NKikimr { } } + template<typename TRTCtx> + bool ForceFreshCompaction(const TRTCtx& rtCtx) const { + return Enabled() && !rtCtx->LevelIndex->IsWrittenToSstBeforeLsn(FullCompactionAttrs->FullCompactionLsn); + } + // returns FullCompactionAttrs for Level Compaction Selector // if Fresh segment before FullCompactionAttrs->FullCompationLsn has not been written to sst yet, // there is no profit in starting LevelCompaction, so we return std::optional<ui64>() @@ -81,7 +86,7 @@ namespace NKikimr { using TFreshCompaction = ::NKikimr::THullCompaction<TKey, TMemRec, TIterator>; auto &hullCtx = hullDs->HullCtx; - Y_VERIFY(hullCtx->FreshCompaction && rtCtx->LevelIndex->NeedsFreshCompaction(rtCtx->GetFreeUpToLsn())); + Y_VERIFY(hullCtx->FreshCompaction); // get fresh segment to compact TIntrusivePtr<TFreshSegment> freshSegment = rtCtx->LevelIndex->FindFreshSegmentForCompaction(); @@ -218,7 +223,7 @@ namespace NKikimr { void ScheduleCompaction(const TActorContext &ctx) { // schedule fresh if required - CompactFreshSegmentIfRequired<TKey, TMemRec>(HullDs, RTCtx, ctx); + CompactFreshSegmentIfRequired<TKey, TMemRec>(HullDs, RTCtx, ctx, FullCompactionState.ForceFreshCompaction(RTCtx)); if (!RunLevelCompactionSelector(ctx)) { ScheduleCompactionWakeup(ctx); } @@ -259,13 +264,9 @@ namespace NKikimr { NHullComp::EAction action = ev->Get()->Action; CompactionTask = std::move(ev->Get()->CompactionTask); - if (action != NHullComp::ActNothing) { - // log out decision - LOG_INFO(ctx, NKikimrServices::BS_HULLCOMP, - VDISKP(HullDs->HullCtx->VCtx, "%s: selected compaction %s", - PDiskSignatureForHullDbKey<TKey>().ToString().data(), - CompactionTask->ToString().data())); - } + LOG_LOG(ctx, action != NHullComp::ActNothing ? NLog::PRI_INFO : NLog::PRI_DEBUG, + NKikimrServices::BS_HULLCOMP, VDISKP(HullDs->HullCtx->VCtx, "%s: selected compaction %s", + PDiskSignatureForHullDbKey<TKey>().ToString().data(), CompactionTask->ToString().data())); FullCompactionState.Compacted(ctx, CompactionTask->FullCompactionInfo); @@ -518,7 +519,8 @@ namespace NKikimr { if (FullCompactionState.Enabled()) { ScheduleCompaction(ctx); } else { - CompactFreshSegmentIfRequired<TKey, TMemRec>(HullDs, RTCtx, ctx); + CompactFreshSegmentIfRequired<TKey, TMemRec>(HullDs, RTCtx, ctx, + FullCompactionState.ForceFreshCompaction(RTCtx)); } break; case THullCommitFinished::CommitAdvanceLsn: @@ -541,7 +543,8 @@ namespace NKikimr { const ui64 freeUpToLsn = ev->Get()->FreeUpToLsn; RTCtx->SetFreeUpToLsn(freeUpToLsn); // we check if we need to start fresh compaction, FreeUpToLsn influence our decision - const bool freshCompStarted = CompactFreshSegmentIfRequired<TKey, TMemRec>(HullDs, RTCtx, ctx); + const bool freshCompStarted = CompactFreshSegmentIfRequired<TKey, TMemRec>(HullDs, RTCtx, ctx, + FullCompactionState.ForceFreshCompaction(RTCtx)); // just for valid info output to the log bool moveEntryPointStarted = false; if (!freshCompStarted && !AdvanceCommitInProgress) { @@ -585,7 +588,8 @@ namespace NKikimr { const ui64 confirmedLsn = RTCtx->LsnMngr->GetConfirmedLsnForHull(); auto *msg = ev->Get(); STLOG(PRI_INFO, BS_HULLCOMP, VDHC01, VDISKP(HullDs->HullCtx->VCtx, "TEvHullCompact"), - (ConfirmedLsn, confirmedLsn), (Msg, *msg)); + (ConfirmedLsn, confirmedLsn), (Msg, *msg), + (CompState, TLevelIndexBase::LevelCompStateToStr(RTCtx->LevelIndex->GetCompState()))); Y_VERIFY(TKeyToEHullDbType<TKey>() == msg->Type); switch (msg->Mode) { diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullactor.h b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullactor.h index 5749a70ecd2..fa7e1412480 100644 --- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullactor.h +++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullactor.h @@ -90,10 +90,11 @@ namespace NKikimr { bool CompactFreshSegmentIfRequired( TIntrusivePtr<THullDs> &hullDs, std::shared_ptr<TLevelIndexRunTimeCtx<TKey, TMemRec>> &rtCtx, - const TActorContext &ctx) + const TActorContext &ctx, + bool force = false) { ui64 yardFreeUpToLsn = rtCtx->GetFreeUpToLsn(); - bool compact = hullDs->HullCtx->FreshCompaction && rtCtx->LevelIndex->NeedsFreshCompaction(yardFreeUpToLsn); + bool compact = hullDs->HullCtx->FreshCompaction && rtCtx->LevelIndex->NeedsFreshCompaction(yardFreeUpToLsn, force); if (compact) { CompactFreshSegment<TKey, TMemRec>(hullDs, rtCtx, ctx); } |