aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/core/blobstorage/vdisk/defrag/defrag_actor.cpp2
-rw-r--r--ydb/core/blobstorage/vdisk/defrag/defrag_quantum.cpp56
-rw-r--r--ydb/core/blobstorage/vdisk/defrag/defrag_rewriter.cpp55
-rw-r--r--ydb/core/blobstorage/vdisk/defrag/defrag_rewriter.h1
-rw-r--r--ydb/core/blobstorage/vdisk/defrag/defrag_search.h76
5 files changed, 110 insertions, 80 deletions
diff --git a/ydb/core/blobstorage/vdisk/defrag/defrag_actor.cpp b/ydb/core/blobstorage/vdisk/defrag/defrag_actor.cpp
index 49f2f18b0e..86443524a2 100644
--- a/ydb/core/blobstorage/vdisk/defrag/defrag_actor.cpp
+++ b/ydb/core/blobstorage/vdisk/defrag/defrag_actor.cpp
@@ -105,7 +105,7 @@ namespace NKikimr {
void Handle(TEvTakeHullSnapshotResult::TPtr ev) {
TDefragCalcStat calcStat(std::move(ev->Get()->Snap), DCtx->HugeBlobCtx);
std::unique_ptr<IEventBase> res;
- if (calcStat.Scan(TDuration::Seconds(30))) {
+ if (calcStat.Scan(NDefrag::MaxSnapshotHoldDuration)) {
STLOG(PRI_ERROR, BS_VDISK_DEFRAG, BSVDD05, VDISKP(DCtx->VCtx->VDiskLogPrefix, "scan timed out"));
} else {
const ui32 totalChunks = calcStat.GetTotalChunks();
diff --git a/ydb/core/blobstorage/vdisk/defrag/defrag_quantum.cpp b/ydb/core/blobstorage/vdisk/defrag/defrag_quantum.cpp
index c3bc4e8c9d..4f907a2fb0 100644
--- a/ydb/core/blobstorage/vdisk/defrag/defrag_quantum.cpp
+++ b/ydb/core/blobstorage/vdisk/defrag/defrag_quantum.cpp
@@ -24,10 +24,6 @@ namespace NKikimr {
EvResume = EventSpaceBegin(TEvents::ES_PRIVATE)
};
- struct TExTimeout {};
-
- static constexpr TDuration MaxSnapshotHoldDuration = TDuration::Seconds(30);
-
public:
TDefragQuantum(const std::shared_ptr<TDefragCtx>& dctx, const TVDiskID& selfVDiskId,
std::optional<TChunksToDefrag> chunksToDefrag)
@@ -48,7 +44,11 @@ namespace NKikimr {
Y_VERIFY(*ChunksToDefrag);
} else {
TDefragQuantumFindChunks findChunks(GetSnapshot(), DCtx->HugeBlobCtx);
- while (findChunks.Scan(TDuration::MilliSeconds(10))) {
+ const ui64 endTime = GetCycleCountFast() + DurationToCycles(NDefrag::MaxSnapshotHoldDuration);
+ while (findChunks.Scan(NDefrag::WorkQuantum)) {
+ if (GetCycleCountFast() >= endTime) {
+ return (void)Send(ParentActorId, new TEvDefragQuantumResult(std::move(stat)));
+ }
Yield();
}
ChunksToDefrag.emplace(findChunks.GetChunksToDefrag(DCtx->MaxChunksToDefrag));
@@ -60,39 +60,27 @@ namespace NKikimr {
auto lockedChunks = LockChunks(*ChunksToDefrag);
- THPTimer timer;
- TDefragQuantumFindRecords findRecords(GetSnapshot(), std::move(*ChunksToDefrag));
- Schedule(MaxSnapshotHoldDuration, new TEvents::TEvWakeup);
- try {
- findRecords.Scan(TDuration::MilliSeconds(10), std::bind(&TDefragQuantum::Yield, this));
-
- const TActorId rewriterActorId = Register(CreateDefragRewriter(DCtx, SelfVDiskId, SelfActorId,
- findRecords.RetrieveSnapshot(), findRecords.GetRecordsToRewrite()));
- THolder<TEvDefragRewritten::THandle> ev;
- try {
- ev = WaitForSpecificEvent<TEvDefragRewritten>();
- } catch (const TPoisonPillException&) {
- Send(new IEventHandle(TEvents::TSystem::Poison, 0, rewriterActorId, {}, nullptr, 0));
- throw;
- } catch (const TExTimeout&) {
- Send(new IEventHandle(TEvents::TSystem::Poison, 0, rewriterActorId, {}, nullptr, 0));
- throw;
- }
- stat.RewrittenRecs = ev->Get()->RewrittenRecs;
- stat.RewrittenBytes = ev->Get()->RewrittenBytes;
- } catch (const TExTimeout&) {
- Send(DCtx->HugeKeeperId, new TEvHugeUnlockChunks(std::move(lockedChunks)));
- STLOG(PRI_ERROR, BS_VDISK_DEFRAG, BSVDD06, VDISKP(DCtx->VCtx->VDiskLogPrefix, "defrag worker timed out"));
+ TDefragQuantumFindRecords findRecords(std::move(*ChunksToDefrag), DCtx->VCtx->Top->GType);
+ while (findRecords.Scan(NDefrag::WorkQuantum, GetSnapshot())) {
+ Yield();
}
+ const TActorId rewriterActorId = Register(CreateDefragRewriter(DCtx, SelfVDiskId, SelfActorId,
+ findRecords.GetRecordsToRewrite()));
+ THolder<TEvDefragRewritten::THandle> ev;
try {
- Compact();
-
- auto hugeStat = GetHugeStat();
- Y_VERIFY(hugeStat.LockedChunks.size() < 100);
- } catch (const TExTimeout&) {
- // ignore timeout
+ ev = WaitForSpecificEvent<TEvDefragRewritten>();
+ } catch (const TPoisonPillException&) {
+ Send(new IEventHandle(TEvents::TSystem::Poison, 0, rewriterActorId, {}, nullptr, 0));
+ throw;
}
+ stat.RewrittenRecs = ev->Get()->RewrittenRecs;
+ stat.RewrittenBytes = ev->Get()->RewrittenBytes;
+
+ Compact();
+
+ auto hugeStat = GetHugeStat();
+ Y_VERIFY(hugeStat.LockedChunks.size() < 100);
}
Send(ParentActorId, new TEvDefragQuantumResult(std::move(stat)));
diff --git a/ydb/core/blobstorage/vdisk/defrag/defrag_rewriter.cpp b/ydb/core/blobstorage/vdisk/defrag/defrag_rewriter.cpp
index 71f47d5183..83bcfa1572 100644
--- a/ydb/core/blobstorage/vdisk/defrag/defrag_rewriter.cpp
+++ b/ydb/core/blobstorage/vdisk/defrag/defrag_rewriter.cpp
@@ -1,4 +1,5 @@
#include "defrag_rewriter.h"
+#include <ydb/core/blobstorage/vdisk/skeleton/blobstorage_takedbsnap.h>
namespace NKikimr {
@@ -13,7 +14,7 @@ namespace NKikimr {
const TVDiskID SelfVDiskId;
const TActorId NotifyId;
// we can rewrite data while we are holding snapshot for data being read
- THullDsSnap FullSnap;
+ std::optional<THullDsSnap> FullSnap;
std::vector<TDefragRecord> Recs;
size_t RecToReadIdx = 0;
size_t RewrittenRecsCounter = 0;
@@ -26,19 +27,55 @@ namespace NKikimr {
void SendNextRead(const TActorContext &ctx) {
if (RecToReadIdx < Recs.size()) {
- const TDiskPart &p = Recs[RecToReadIdx].OldDiskPart;
+ ctx.Send(DCtx->SkeletonId, new TEvTakeHullSnapshot(false));
+ } else if (RewrittenRecsCounter == Recs.size()) {
+ ctx.Send(NotifyId, new TEvDefragRewritten(RewrittenRecsCounter, RewrittenBytes));
+ Die(ctx);
+ }
+ }
+
+ void Handle(TEvTakeHullSnapshotResult::TPtr ev, const TActorContext& ctx) {
+ FullSnap.emplace(std::move(ev->Get()->Snap));
+ FullSnap->BlocksSnap.Destroy();
+ FullSnap->BarriersSnap.Destroy();
+
+ TLogoBlobsSnapshot::TForwardIterator iter(FullSnap->HullCtx, &FullSnap->LogoBlobsSnap);
+ const TLogoBlobID id = Recs[RecToReadIdx].LogoBlobId;
+ iter.Seek(id.FullID());
+ if (iter.Valid() && iter.GetCurKey().LogoBlobID() == id.FullID()) {
+ struct TCallback {
+ const ui32 PartId;
+ TDiskPart Location;
+
+ void operator ()(const TDiskPart& data, const NMatrix::TVectorType v) {
+ if (v.Get(PartId - 1)) {
+ Y_VERIFY(v == NMatrix::TVectorType::MakeOneHot(PartId - 1, v.GetSize()));
+ Location = data;
+ }
+ }
+
+ void operator ()(const TDiskBlob&) {}
+ } callback{id.PartId(), {}};
+ TRecordMergerCallback<TKeyLogoBlob, TMemRecLogoBlob, TCallback> merger(&callback, DCtx->VCtx->Top->GType);
+ iter.PutToMerger(&merger);
+ merger.Finish();
+
+ const TDiskPart &p = callback.Location;
auto msg = std::make_unique<NPDisk::TEvChunkRead>(DCtx->PDiskCtx->Dsk->Owner,
DCtx->PDiskCtx->Dsk->OwnerRound, p.ChunkIdx, p.Offset, p.Size, NPriRead::HullComp, nullptr);
ctx.Send(DCtx->PDiskCtx->PDiskId, msg.release());
DCtx->DefragMonGroup.DefragBytesRewritten() += p.Size;
RewrittenBytes += p.Size;
- } else if (RewrittenRecsCounter == Recs.size()) {
- ctx.Send(NotifyId, new TEvDefragRewritten(RewrittenRecsCounter, RewrittenBytes));
- Die(ctx);
+ } else {
+ ++RecToReadIdx;
+ ++RewrittenRecsCounter;
+ SendNextRead(ctx);
}
}
void Handle(NPDisk::TEvChunkReadResult::TPtr ev, const TActorContext& ctx) {
+ FullSnap.reset();
+
// FIXME: handle read errors gracefully
CHECK_PDISK_RESPONSE(DCtx->VCtx, ev, ctx);
@@ -84,6 +121,7 @@ namespace NKikimr {
STRICT_STFUNC(StateFunc,
HFunc(TEvents::TEvPoisonPill, HandlePoison)
+ HFunc(TEvTakeHullSnapshotResult, Handle)
HFunc(NPDisk::TEvChunkReadResult, Handle)
HFunc(TEvBlobStorage::TEvVPutResult, Handle)
)
@@ -99,19 +137,15 @@ namespace NKikimr {
const std::shared_ptr<TDefragCtx> &dCtx,
const TVDiskID &selfVDiskId,
const TActorId &notifyId,
- THullDsSnap &&fullSnap,
std::vector<TDefragRecord> &&recs)
: TActorBootstrapped<TDefragRewriter>()
, DCtx(dCtx)
, SelfVDiskId(selfVDiskId)
, NotifyId(notifyId)
- , FullSnap(std::move(fullSnap))
, Recs(std::move(recs))
{}
};
-
-
////////////////////////////////////////////////////////////////////////////
// VDISK DEFRAG REWRITER
// Rewrites selected huge blobs to free up some Huge Heap chunks
@@ -120,9 +154,8 @@ namespace NKikimr {
const std::shared_ptr<TDefragCtx> &dCtx,
const TVDiskID &selfVDiskId,
const TActorId &notifyId,
- THullDsSnap &&fullSnap,
std::vector<TDefragRecord> &&recs) {
- return new TDefragRewriter(dCtx, selfVDiskId, notifyId, std::move(fullSnap), std::move(recs));
+ return new TDefragRewriter(dCtx, selfVDiskId, notifyId, std::move(recs));
}
} // NKikimr
diff --git a/ydb/core/blobstorage/vdisk/defrag/defrag_rewriter.h b/ydb/core/blobstorage/vdisk/defrag/defrag_rewriter.h
index 2f9e2a79db..dba032d87a 100644
--- a/ydb/core/blobstorage/vdisk/defrag/defrag_rewriter.h
+++ b/ydb/core/blobstorage/vdisk/defrag/defrag_rewriter.h
@@ -37,7 +37,6 @@ namespace NKikimr {
const std::shared_ptr<TDefragCtx> &dCtx,
const TVDiskID &selfVDiskId,
const TActorId &notifyId,
- THullDsSnap &&fullSnap,
std::vector<TDefragRecord> &&recs);
} // NKikimr
diff --git a/ydb/core/blobstorage/vdisk/defrag/defrag_search.h b/ydb/core/blobstorage/vdisk/defrag/defrag_search.h
index 5578d0c240..9fccb8afe8 100644
--- a/ydb/core/blobstorage/vdisk/defrag/defrag_search.h
+++ b/ydb/core/blobstorage/vdisk/defrag/defrag_search.h
@@ -8,6 +8,11 @@
namespace NKikimr {
+ namespace NDefrag {
+ static constexpr TDuration MaxSnapshotHoldDuration = TDuration::Seconds(30);
+ static constexpr TDuration WorkQuantum = TDuration::MilliSeconds(10);
+ }
+
struct THugeBlobRecord {
TDiskPart Part;
TLogoBlobID Id;
@@ -314,19 +319,16 @@ namespace NKikimr {
using TLevelSegment = ::NKikimr::TLevelSegment<TKeyLogoBlob, TMemRecLogoBlob>;
using TLevelSstPtr = typename TLevelSegment::TLevelSstPtr;
- THullDsSnap FullSnap;
TChunksToDefrag ChunksToDefrag;
std::unordered_set<ui32> Chunks; // chunks to defrag (i.e. move all data from these chunks)
std::vector<TDefragRecord> RecsToRewrite;
- TDuration MaxTime;
- std::function<void()> Yield;
- ui64 EndTime;
- ui32 Count;
+ std::optional<TLogoBlobID> NextId;
+ const TBlobStorageGroupType GType;
public:
- TDefragQuantumFindRecords(THullDsSnap&& fullSnap, TChunksToDefrag&& chunksToDefrag)
- : FullSnap(std::move(fullSnap))
- , ChunksToDefrag(std::move(chunksToDefrag))
+ TDefragQuantumFindRecords(TChunksToDefrag&& chunksToDefrag, TBlobStorageGroupType gtype)
+ : ChunksToDefrag(std::move(chunksToDefrag))
+ , GType(gtype)
{
for (const auto& chunk : ChunksToDefrag.Chunks) {
Chunks.insert(chunk.ChunkId);
@@ -336,28 +338,50 @@ namespace NKikimr {
RecsToRewrite.reserve(ChunksToDefrag.EstimatedSlotsCount);
}
- template<typename T>
- void Scan(TDuration maxTime, T&& yield) {
- MaxTime = maxTime;
- Yield = yield;
- EndTime = GetCycleCountFast() + DurationToCycles(MaxTime);
- Count = 0;
- TraverseDbWithoutMerge(FullSnap.HullCtx, this, FullSnap.LogoBlobsSnap);
+ bool Scan(TDuration quota, THullDsSnap fullSnap) {
+ // create iterator and set it up to point to next blob of interest
+ TLogoBlobsSnapshot::TForwardIterator iter(fullSnap.HullCtx, &fullSnap.LogoBlobsSnap);
+ if (NextId) {
+ iter.Seek(*NextId);
+ } else {
+ iter.SeekToFirst();
+ }
+
+ // calculate timestamp to finish scanning
+ const ui64 endTime = GetCycleCountFast() + DurationToCycles(quota);
+ for (ui32 count = 0; iter.Valid(); iter.Next()) {
+ if (++count % 1024 == 0 && GetCycleCountFast() >= endTime) {
+ break;
+ }
+ iter.PutToMerger(this);
+ }
+
+ if (iter.Valid()) {
+ NextId.emplace(iter.GetCurKey().LogoBlobID());
+ }
+ return iter.Valid();
}
- void UpdateFresh(const char* /*segName*/, const TKeyLogoBlob& key, const TMemRecLogoBlob& memRec) {
+ void AddFromFresh(const TMemRecLogoBlob& memRec, const TRope* /*data*/, const TKeyLogoBlob& key, ui64 /*lsn*/) {
Update(key, memRec, nullptr);
}
- void UpdateLevel(const TLevelSstPtr& p, const TKeyLogoBlob& key, const TMemRecLogoBlob& memRec) {
- Update(key, memRec, p.SstPtr->GetOutbound());
+ void AddFromSegment(const TMemRecLogoBlob& memRec, const TDiskPart *outbound, const TKeyLogoBlob& key, ui64 /*circaLsn*/) {
+ Update(key, memRec, outbound);
}
+ static constexpr bool HaveToMergeData() { return false; }
+
+ std::vector<TDefragRecord> GetRecordsToRewrite() {
+ return std::move(RecsToRewrite);
+ }
+
+ private:
void Update(const TKeyLogoBlob& key, const TMemRecLogoBlob& memRec, const TDiskPart *outbound) {
TDiskDataExtractor extr;
if (memRec.GetType() == TBlobType::HugeBlob || memRec.GetType() == TBlobType::ManyHugeBlobs) {
memRec.GetDiskData(&extr, outbound);
- const NMatrix::TVectorType local = memRec.GetIngress().LocalParts(FullSnap.HullCtx->VCtx->Top->GType);
+ const NMatrix::TVectorType local = memRec.GetIngress().LocalParts(GType);
ui8 partIdx = local.FirstPosition();
for (const TDiskPart *p = extr.Begin; p != extr.End; ++p, partIdx = local.NextPosition(partIdx)) {
Y_VERIFY(partIdx != local.GetSize());
@@ -373,20 +397,6 @@ namespace NKikimr {
}
}
}
- if (++Count % 1024 == 0 && GetCycleCountFast() >= EndTime) {
- Yield();
- EndTime = GetCycleCountFast() + DurationToCycles(MaxTime);
- }
- }
-
- void Finish() {}
-
- THullDsSnap RetrieveSnapshot() {
- return std::move(FullSnap);
- }
-
- std::vector<TDefragRecord> GetRecordsToRewrite() {
- return std::move(RecsToRewrite);
}
};