diff options
5 files changed, 110 insertions, 80 deletions
diff --git a/ydb/core/blobstorage/vdisk/defrag/defrag_actor.cpp b/ydb/core/blobstorage/vdisk/defrag/defrag_actor.cpp index 49f2f18b0e..86443524a2 100644 --- a/ydb/core/blobstorage/vdisk/defrag/defrag_actor.cpp +++ b/ydb/core/blobstorage/vdisk/defrag/defrag_actor.cpp @@ -105,7 +105,7 @@ namespace NKikimr { void Handle(TEvTakeHullSnapshotResult::TPtr ev) { TDefragCalcStat calcStat(std::move(ev->Get()->Snap), DCtx->HugeBlobCtx); std::unique_ptr<IEventBase> res; - if (calcStat.Scan(TDuration::Seconds(30))) { + if (calcStat.Scan(NDefrag::MaxSnapshotHoldDuration)) { STLOG(PRI_ERROR, BS_VDISK_DEFRAG, BSVDD05, VDISKP(DCtx->VCtx->VDiskLogPrefix, "scan timed out")); } else { const ui32 totalChunks = calcStat.GetTotalChunks(); diff --git a/ydb/core/blobstorage/vdisk/defrag/defrag_quantum.cpp b/ydb/core/blobstorage/vdisk/defrag/defrag_quantum.cpp index c3bc4e8c9d..4f907a2fb0 100644 --- a/ydb/core/blobstorage/vdisk/defrag/defrag_quantum.cpp +++ b/ydb/core/blobstorage/vdisk/defrag/defrag_quantum.cpp @@ -24,10 +24,6 @@ namespace NKikimr { EvResume = EventSpaceBegin(TEvents::ES_PRIVATE) }; - struct TExTimeout {}; - - static constexpr TDuration MaxSnapshotHoldDuration = TDuration::Seconds(30); - public: TDefragQuantum(const std::shared_ptr<TDefragCtx>& dctx, const TVDiskID& selfVDiskId, std::optional<TChunksToDefrag> chunksToDefrag) @@ -48,7 +44,11 @@ namespace NKikimr { Y_VERIFY(*ChunksToDefrag); } else { TDefragQuantumFindChunks findChunks(GetSnapshot(), DCtx->HugeBlobCtx); - while (findChunks.Scan(TDuration::MilliSeconds(10))) { + const ui64 endTime = GetCycleCountFast() + DurationToCycles(NDefrag::MaxSnapshotHoldDuration); + while (findChunks.Scan(NDefrag::WorkQuantum)) { + if (GetCycleCountFast() >= endTime) { + return (void)Send(ParentActorId, new TEvDefragQuantumResult(std::move(stat))); + } Yield(); } ChunksToDefrag.emplace(findChunks.GetChunksToDefrag(DCtx->MaxChunksToDefrag)); @@ -60,39 +60,27 @@ namespace NKikimr { auto lockedChunks = LockChunks(*ChunksToDefrag); - THPTimer timer; - TDefragQuantumFindRecords findRecords(GetSnapshot(), std::move(*ChunksToDefrag)); - Schedule(MaxSnapshotHoldDuration, new TEvents::TEvWakeup); - try { - findRecords.Scan(TDuration::MilliSeconds(10), std::bind(&TDefragQuantum::Yield, this)); - - const TActorId rewriterActorId = Register(CreateDefragRewriter(DCtx, SelfVDiskId, SelfActorId, - findRecords.RetrieveSnapshot(), findRecords.GetRecordsToRewrite())); - THolder<TEvDefragRewritten::THandle> ev; - try { - ev = WaitForSpecificEvent<TEvDefragRewritten>(); - } catch (const TPoisonPillException&) { - Send(new IEventHandle(TEvents::TSystem::Poison, 0, rewriterActorId, {}, nullptr, 0)); - throw; - } catch (const TExTimeout&) { - Send(new IEventHandle(TEvents::TSystem::Poison, 0, rewriterActorId, {}, nullptr, 0)); - throw; - } - stat.RewrittenRecs = ev->Get()->RewrittenRecs; - stat.RewrittenBytes = ev->Get()->RewrittenBytes; - } catch (const TExTimeout&) { - Send(DCtx->HugeKeeperId, new TEvHugeUnlockChunks(std::move(lockedChunks))); - STLOG(PRI_ERROR, BS_VDISK_DEFRAG, BSVDD06, VDISKP(DCtx->VCtx->VDiskLogPrefix, "defrag worker timed out")); + TDefragQuantumFindRecords findRecords(std::move(*ChunksToDefrag), DCtx->VCtx->Top->GType); + while (findRecords.Scan(NDefrag::WorkQuantum, GetSnapshot())) { + Yield(); } + const TActorId rewriterActorId = Register(CreateDefragRewriter(DCtx, SelfVDiskId, SelfActorId, + findRecords.GetRecordsToRewrite())); + THolder<TEvDefragRewritten::THandle> ev; try { - Compact(); - - auto hugeStat = GetHugeStat(); - Y_VERIFY(hugeStat.LockedChunks.size() < 100); - } catch (const TExTimeout&) { - // ignore timeout + ev = WaitForSpecificEvent<TEvDefragRewritten>(); + } catch (const TPoisonPillException&) { + Send(new IEventHandle(TEvents::TSystem::Poison, 0, rewriterActorId, {}, nullptr, 0)); + throw; } + stat.RewrittenRecs = ev->Get()->RewrittenRecs; + stat.RewrittenBytes = ev->Get()->RewrittenBytes; + + Compact(); + + auto hugeStat = GetHugeStat(); + Y_VERIFY(hugeStat.LockedChunks.size() < 100); } Send(ParentActorId, new TEvDefragQuantumResult(std::move(stat))); diff --git a/ydb/core/blobstorage/vdisk/defrag/defrag_rewriter.cpp b/ydb/core/blobstorage/vdisk/defrag/defrag_rewriter.cpp index 71f47d5183..83bcfa1572 100644 --- a/ydb/core/blobstorage/vdisk/defrag/defrag_rewriter.cpp +++ b/ydb/core/blobstorage/vdisk/defrag/defrag_rewriter.cpp @@ -1,4 +1,5 @@ #include "defrag_rewriter.h" +#include <ydb/core/blobstorage/vdisk/skeleton/blobstorage_takedbsnap.h> namespace NKikimr { @@ -13,7 +14,7 @@ namespace NKikimr { const TVDiskID SelfVDiskId; const TActorId NotifyId; // we can rewrite data while we are holding snapshot for data being read - THullDsSnap FullSnap; + std::optional<THullDsSnap> FullSnap; std::vector<TDefragRecord> Recs; size_t RecToReadIdx = 0; size_t RewrittenRecsCounter = 0; @@ -26,19 +27,55 @@ namespace NKikimr { void SendNextRead(const TActorContext &ctx) { if (RecToReadIdx < Recs.size()) { - const TDiskPart &p = Recs[RecToReadIdx].OldDiskPart; + ctx.Send(DCtx->SkeletonId, new TEvTakeHullSnapshot(false)); + } else if (RewrittenRecsCounter == Recs.size()) { + ctx.Send(NotifyId, new TEvDefragRewritten(RewrittenRecsCounter, RewrittenBytes)); + Die(ctx); + } + } + + void Handle(TEvTakeHullSnapshotResult::TPtr ev, const TActorContext& ctx) { + FullSnap.emplace(std::move(ev->Get()->Snap)); + FullSnap->BlocksSnap.Destroy(); + FullSnap->BarriersSnap.Destroy(); + + TLogoBlobsSnapshot::TForwardIterator iter(FullSnap->HullCtx, &FullSnap->LogoBlobsSnap); + const TLogoBlobID id = Recs[RecToReadIdx].LogoBlobId; + iter.Seek(id.FullID()); + if (iter.Valid() && iter.GetCurKey().LogoBlobID() == id.FullID()) { + struct TCallback { + const ui32 PartId; + TDiskPart Location; + + void operator ()(const TDiskPart& data, const NMatrix::TVectorType v) { + if (v.Get(PartId - 1)) { + Y_VERIFY(v == NMatrix::TVectorType::MakeOneHot(PartId - 1, v.GetSize())); + Location = data; + } + } + + void operator ()(const TDiskBlob&) {} + } callback{id.PartId(), {}}; + TRecordMergerCallback<TKeyLogoBlob, TMemRecLogoBlob, TCallback> merger(&callback, DCtx->VCtx->Top->GType); + iter.PutToMerger(&merger); + merger.Finish(); + + const TDiskPart &p = callback.Location; auto msg = std::make_unique<NPDisk::TEvChunkRead>(DCtx->PDiskCtx->Dsk->Owner, DCtx->PDiskCtx->Dsk->OwnerRound, p.ChunkIdx, p.Offset, p.Size, NPriRead::HullComp, nullptr); ctx.Send(DCtx->PDiskCtx->PDiskId, msg.release()); DCtx->DefragMonGroup.DefragBytesRewritten() += p.Size; RewrittenBytes += p.Size; - } else if (RewrittenRecsCounter == Recs.size()) { - ctx.Send(NotifyId, new TEvDefragRewritten(RewrittenRecsCounter, RewrittenBytes)); - Die(ctx); + } else { + ++RecToReadIdx; + ++RewrittenRecsCounter; + SendNextRead(ctx); } } void Handle(NPDisk::TEvChunkReadResult::TPtr ev, const TActorContext& ctx) { + FullSnap.reset(); + // FIXME: handle read errors gracefully CHECK_PDISK_RESPONSE(DCtx->VCtx, ev, ctx); @@ -84,6 +121,7 @@ namespace NKikimr { STRICT_STFUNC(StateFunc, HFunc(TEvents::TEvPoisonPill, HandlePoison) + HFunc(TEvTakeHullSnapshotResult, Handle) HFunc(NPDisk::TEvChunkReadResult, Handle) HFunc(TEvBlobStorage::TEvVPutResult, Handle) ) @@ -99,19 +137,15 @@ namespace NKikimr { const std::shared_ptr<TDefragCtx> &dCtx, const TVDiskID &selfVDiskId, const TActorId ¬ifyId, - THullDsSnap &&fullSnap, std::vector<TDefragRecord> &&recs) : TActorBootstrapped<TDefragRewriter>() , DCtx(dCtx) , SelfVDiskId(selfVDiskId) , NotifyId(notifyId) - , FullSnap(std::move(fullSnap)) , Recs(std::move(recs)) {} }; - - //////////////////////////////////////////////////////////////////////////// // VDISK DEFRAG REWRITER // Rewrites selected huge blobs to free up some Huge Heap chunks @@ -120,9 +154,8 @@ namespace NKikimr { const std::shared_ptr<TDefragCtx> &dCtx, const TVDiskID &selfVDiskId, const TActorId ¬ifyId, - THullDsSnap &&fullSnap, std::vector<TDefragRecord> &&recs) { - return new TDefragRewriter(dCtx, selfVDiskId, notifyId, std::move(fullSnap), std::move(recs)); + return new TDefragRewriter(dCtx, selfVDiskId, notifyId, std::move(recs)); } } // NKikimr diff --git a/ydb/core/blobstorage/vdisk/defrag/defrag_rewriter.h b/ydb/core/blobstorage/vdisk/defrag/defrag_rewriter.h index 2f9e2a79db..dba032d87a 100644 --- a/ydb/core/blobstorage/vdisk/defrag/defrag_rewriter.h +++ b/ydb/core/blobstorage/vdisk/defrag/defrag_rewriter.h @@ -37,7 +37,6 @@ namespace NKikimr { const std::shared_ptr<TDefragCtx> &dCtx, const TVDiskID &selfVDiskId, const TActorId ¬ifyId, - THullDsSnap &&fullSnap, std::vector<TDefragRecord> &&recs); } // NKikimr diff --git a/ydb/core/blobstorage/vdisk/defrag/defrag_search.h b/ydb/core/blobstorage/vdisk/defrag/defrag_search.h index 5578d0c240..9fccb8afe8 100644 --- a/ydb/core/blobstorage/vdisk/defrag/defrag_search.h +++ b/ydb/core/blobstorage/vdisk/defrag/defrag_search.h @@ -8,6 +8,11 @@ namespace NKikimr { + namespace NDefrag { + static constexpr TDuration MaxSnapshotHoldDuration = TDuration::Seconds(30); + static constexpr TDuration WorkQuantum = TDuration::MilliSeconds(10); + } + struct THugeBlobRecord { TDiskPart Part; TLogoBlobID Id; @@ -314,19 +319,16 @@ namespace NKikimr { using TLevelSegment = ::NKikimr::TLevelSegment<TKeyLogoBlob, TMemRecLogoBlob>; using TLevelSstPtr = typename TLevelSegment::TLevelSstPtr; - THullDsSnap FullSnap; TChunksToDefrag ChunksToDefrag; std::unordered_set<ui32> Chunks; // chunks to defrag (i.e. move all data from these chunks) std::vector<TDefragRecord> RecsToRewrite; - TDuration MaxTime; - std::function<void()> Yield; - ui64 EndTime; - ui32 Count; + std::optional<TLogoBlobID> NextId; + const TBlobStorageGroupType GType; public: - TDefragQuantumFindRecords(THullDsSnap&& fullSnap, TChunksToDefrag&& chunksToDefrag) - : FullSnap(std::move(fullSnap)) - , ChunksToDefrag(std::move(chunksToDefrag)) + TDefragQuantumFindRecords(TChunksToDefrag&& chunksToDefrag, TBlobStorageGroupType gtype) + : ChunksToDefrag(std::move(chunksToDefrag)) + , GType(gtype) { for (const auto& chunk : ChunksToDefrag.Chunks) { Chunks.insert(chunk.ChunkId); @@ -336,28 +338,50 @@ namespace NKikimr { RecsToRewrite.reserve(ChunksToDefrag.EstimatedSlotsCount); } - template<typename T> - void Scan(TDuration maxTime, T&& yield) { - MaxTime = maxTime; - Yield = yield; - EndTime = GetCycleCountFast() + DurationToCycles(MaxTime); - Count = 0; - TraverseDbWithoutMerge(FullSnap.HullCtx, this, FullSnap.LogoBlobsSnap); + bool Scan(TDuration quota, THullDsSnap fullSnap) { + // create iterator and set it up to point to next blob of interest + TLogoBlobsSnapshot::TForwardIterator iter(fullSnap.HullCtx, &fullSnap.LogoBlobsSnap); + if (NextId) { + iter.Seek(*NextId); + } else { + iter.SeekToFirst(); + } + + // calculate timestamp to finish scanning + const ui64 endTime = GetCycleCountFast() + DurationToCycles(quota); + for (ui32 count = 0; iter.Valid(); iter.Next()) { + if (++count % 1024 == 0 && GetCycleCountFast() >= endTime) { + break; + } + iter.PutToMerger(this); + } + + if (iter.Valid()) { + NextId.emplace(iter.GetCurKey().LogoBlobID()); + } + return iter.Valid(); } - void UpdateFresh(const char* /*segName*/, const TKeyLogoBlob& key, const TMemRecLogoBlob& memRec) { + void AddFromFresh(const TMemRecLogoBlob& memRec, const TRope* /*data*/, const TKeyLogoBlob& key, ui64 /*lsn*/) { Update(key, memRec, nullptr); } - void UpdateLevel(const TLevelSstPtr& p, const TKeyLogoBlob& key, const TMemRecLogoBlob& memRec) { - Update(key, memRec, p.SstPtr->GetOutbound()); + void AddFromSegment(const TMemRecLogoBlob& memRec, const TDiskPart *outbound, const TKeyLogoBlob& key, ui64 /*circaLsn*/) { + Update(key, memRec, outbound); } + static constexpr bool HaveToMergeData() { return false; } + + std::vector<TDefragRecord> GetRecordsToRewrite() { + return std::move(RecsToRewrite); + } + + private: void Update(const TKeyLogoBlob& key, const TMemRecLogoBlob& memRec, const TDiskPart *outbound) { TDiskDataExtractor extr; if (memRec.GetType() == TBlobType::HugeBlob || memRec.GetType() == TBlobType::ManyHugeBlobs) { memRec.GetDiskData(&extr, outbound); - const NMatrix::TVectorType local = memRec.GetIngress().LocalParts(FullSnap.HullCtx->VCtx->Top->GType); + const NMatrix::TVectorType local = memRec.GetIngress().LocalParts(GType); ui8 partIdx = local.FirstPosition(); for (const TDiskPart *p = extr.Begin; p != extr.End; ++p, partIdx = local.NextPosition(partIdx)) { Y_VERIFY(partIdx != local.GetSize()); @@ -373,20 +397,6 @@ namespace NKikimr { } } } - if (++Count % 1024 == 0 && GetCycleCountFast() >= EndTime) { - Yield(); - EndTime = GetCycleCountFast() + DurationToCycles(MaxTime); - } - } - - void Finish() {} - - THullDsSnap RetrieveSnapshot() { - return std::move(FullSnap); - } - - std::vector<TDefragRecord> GetRecordsToRewrite() { - return std::move(RecsToRewrite); } }; |