diff options
author | alexvru <alexvru@ydb.tech> | 2022-11-18 11:56:10 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2022-11-18 11:56:10 +0300 |
commit | 551ae898cac3eee67cb5de921f630f485968e470 (patch) | |
tree | b8d0c7083af72c33b7591b925c8d0989e79fcbe2 | |
parent | 0c7d05761ea0a649f947749448b1eb1bb812fc93 (diff) | |
download | ydb-551ae898cac3eee67cb5de921f630f485968e470.tar.gz |
Improve PhantomCheck logic
9 files changed, 223 insertions, 152 deletions
diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_mongroups.h b/ydb/core/blobstorage/vdisk/common/vdisk_mongroups.h index 118cc062cad..3e5b5edc9ba 100644 --- a/ydb/core/blobstorage/vdisk/common/vdisk_mongroups.h +++ b/ydb/core/blobstorage/vdisk/common/vdisk_mongroups.h @@ -200,19 +200,16 @@ public: COUNTER_INIT(ReplHugeBlobsRecovered, true); COUNTER_INIT(ReplHugeBlobBytesRecovered, true); COUNTER_INIT(ReplChunksWritten, true); - COUNTER_INIT(ReplCurrentUnreplicatedParts, false); - COUNTER_INIT(ReplCurrentUnreplicatedBytes, false); - COUNTER_INIT(ReplCurrentPhantoms, false); - COUNTER_INIT(ReplUnreplicatedParts, false); - COUNTER_INIT(ReplUnreplicatedBytes, false); - COUNTER_INIT(ReplPhantoms, false); COUNTER_INIT(ReplUnreplicatedVDisks, false); - COUNTER_INIT(ReplUnreplicatedBlobs, false); COUNTER_INIT(ReplVGetBytesReceived, true); - COUNTER_INIT(ReplCurrentNumUnrecoveredPhantomBlobs, false); - COUNTER_INIT(ReplCurrentNumUnrecoveredNonPhantomBlobs, false); - COUNTER_INIT(ReplNumUnrecoveredPhantomBlobs, false); - COUNTER_INIT(ReplNumUnrecoveredNonPhantomBlobs, false); + COUNTER_INIT(ReplPhantomLikeDiscovered, false); + COUNTER_INIT(ReplPhantomLikeRecovered, false); + COUNTER_INIT(ReplPhantomLikeUnrecovered, false); + COUNTER_INIT(ReplPhantomLikeDropped, false); + COUNTER_INIT(ReplWorkUnitsDone, false); + COUNTER_INIT(ReplWorkUnitsRemaining, false); + COUNTER_INIT(ReplUnreplicatedPhantoms, false); + COUNTER_INIT(ReplUnreplicatedNonPhantoms, false); } COUNTER_DEF(SyncerVSyncMessagesSent); @@ -222,19 +219,16 @@ public: COUNTER_DEF(ReplHugeBlobsRecovered); COUNTER_DEF(ReplHugeBlobBytesRecovered); COUNTER_DEF(ReplChunksWritten); - COUNTER_DEF(ReplCurrentUnreplicatedParts); - COUNTER_DEF(ReplCurrentUnreplicatedBytes); - COUNTER_DEF(ReplCurrentPhantoms); - COUNTER_DEF(ReplUnreplicatedParts); - COUNTER_DEF(ReplUnreplicatedBytes); - COUNTER_DEF(ReplPhantoms); COUNTER_DEF(ReplUnreplicatedVDisks); - COUNTER_DEF(ReplUnreplicatedBlobs); COUNTER_DEF(ReplVGetBytesReceived); - COUNTER_DEF(ReplCurrentNumUnrecoveredPhantomBlobs); - COUNTER_DEF(ReplCurrentNumUnrecoveredNonPhantomBlobs); - COUNTER_DEF(ReplNumUnrecoveredPhantomBlobs); - COUNTER_DEF(ReplNumUnrecoveredNonPhantomBlobs); + COUNTER_DEF(ReplPhantomLikeDiscovered); + COUNTER_DEF(ReplPhantomLikeRecovered); + COUNTER_DEF(ReplPhantomLikeUnrecovered); + COUNTER_DEF(ReplPhantomLikeDropped); + COUNTER_DEF(ReplWorkUnitsDone); + COUNTER_DEF(ReplWorkUnitsRemaining); + COUNTER_DEF(ReplUnreplicatedPhantoms); + COUNTER_DEF(ReplUnreplicatedNonPhantoms); }; /////////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp index 4f6287387d6..19ed6686f96 100644 --- a/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp +++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp @@ -42,6 +42,7 @@ namespace NKikimr { TIntrusivePtr<TBlobStorageGroupInfo> GInfo; TActorId Recipient; TLogoBlobID StartKey; + std::optional<TLogoBlobID> ReplyKey; TEvReplFinished::TInfoPtr ReplInfo; TBlobIdQueuePtr BlobsToReplicatePtr; TBlobIdQueuePtr UnreplicatedBlobsPtr; @@ -72,6 +73,7 @@ namespace NKikimr { // create iterator for the logoblobs metabase TLogoBlobsSnapshot::TIndexForwardIterator it(snap.HullCtx, &snap.LogoBlobsSnap); bool eof = false; + bool resume = false; const ui64 plannedEndTime = GetCycleCountFast() + DurationToCycles(ReplCtx->VDiskCfg->ReplPlanQuantum); auto going = [&, first = true]() mutable { // the predicate that determines the length of the quantum @@ -86,13 +88,18 @@ namespace NKikimr { if (BlobsToReplicatePtr) { // iterate over queue items and match them with iterator - for (; !BlobsToReplicatePtr->empty() && going(); BlobsToReplicatePtr->pop()) { + for (; !BlobsToReplicatePtr->empty() && going(); BlobsToReplicatePtr->pop_front()) { const TLogoBlobID& key = BlobsToReplicatePtr->front(); it.Seek(key); if (it.Valid() && it.GetCurKey().LogoBlobID() == key) { ProcessItem(it, barriers, allowKeepFlags); } } + if (RecoveryMachine->FullOfTasks() || QuantumBytes >= ReplCtx->VDiskCfg->ReplMaxQuantumBytes) { + for (const TLogoBlobID& id : *BlobsToReplicatePtr) { + ReplInfo->WorkUnitsTotal += id.BlobSize(); + } + } eof = BlobsToReplicatePtr->empty(); } else { // scan through the index until we have enough blobs to recover or the time is out @@ -101,18 +108,35 @@ namespace NKikimr { } if (it.Valid()) { StartKey = it.GetCurKey().LogoBlobID(); // we gonna resume later starting from this key + if (!ReplyKey) { + ReplyKey = StartKey; + } + + const TBlobStorageGroupInfo::TTopology& topology = *ReplCtx->VCtx->Top; + ui32 counter = 0; + for (; it.Valid(); it.Next()) { + if (++counter % 1024 == 0 && GetCycleCountFast() > plannedEndTime) { + resume = true; + break; + } + + const TLogoBlobID key = it.GetCurKey().LogoBlobID(); + const TMemRecLogoBlob memRec = it.GetMemRec(); + const TIngress ingress = memRec.GetIngress(); + const auto parts = ingress.PartsWeMustHaveLocally(&topology, ReplCtx->VCtx->ShortSelfVDisk, + key) - ingress.LocalParts(topology.GType); + if (!parts.Empty()) { + ReplInfo->WorkUnitsTotal += key.BlobSize(); + } + } } else { eof = true; } } - if (eof || RecoveryMachine->FullOfTasks() || QuantumBytes >= ReplCtx->VDiskCfg->ReplMaxQuantumBytes) { - // adjust counters - ReplCtx->MonGroup.ReplCurrentUnreplicatedParts() += QuantumParts; - ReplCtx->MonGroup.ReplCurrentUnreplicatedBytes() += QuantumBytes; - + if (!resume && (eof || RecoveryMachine->FullOfTasks() || QuantumBytes >= ReplCtx->VDiskCfg->ReplMaxQuantumBytes)) { // the planning stage has finished, issue reply to the job actor - Send(Recipient, new TEvReplPlanFinished(std::move(RecoveryMachine), StartKey, eof)); + Send(Recipient, new TEvReplPlanFinished(std::move(RecoveryMachine), ReplyKey.value_or(TLogoBlobID()), eof)); // finish processing for this actor PassAway(); @@ -138,19 +162,30 @@ namespace NKikimr { NMatrix::TVectorType parts = ingress.PartsWeMustHaveLocally(&topology, ReplCtx->VCtx->ShortSelfVDisk, key) - ingress.LocalParts(topology.GType); - // scan for metadata parts - for (ui8 i = parts.FirstPosition(); i != parts.GetSize(); i = parts.NextPosition(i)) { - const TLogoBlobID id(key, i + 1); - if (!gtype.PartSize(id)) { - parts.Clear(i); - RecoveryMachine->AddMetadataPart(id); + if (!parts.Empty()) { + // scan for metadata parts + for (ui8 i = parts.FirstPosition(); i != parts.GetSize(); i = parts.NextPosition(i)) { + const TLogoBlobID id(key, i + 1); + if (!gtype.PartSize(id)) { + parts.Clear(i); + RecoveryMachine->AddMetadataPart(id); + } } - } - if (!parts.Empty()) { const bool phantomLike = !status.KeepByBarrier && ReplInfo->DonorVDiskId == TVDiskID(); RecoveryMachine->AddTask(key, parts, phantomLike, ingress); + ReplInfo->WorkUnitsPlanned += key.BlobSize(); + ReplInfo->WorkUnitsTotal += key.BlobSize(); + ReplInfo->PhantomLike += phantomLike; + + if (phantomLike) { + ++ReplCtx->MonGroup.ReplPhantomLikeDiscovered(); + ReplCtx->MonGroup.ReplUnreplicatedPhantoms() = 1; + } else { + ReplCtx->MonGroup.ReplUnreplicatedNonPhantoms() = 1; + } + // calculate part size and total size to recover for (ui8 partIdx = parts.FirstPosition(); partIdx != parts.GetSize(); partIdx = parts.NextPosition(partIdx)) { QuantumBytes += gtype.PartSize(TLogoBlobID(key, partIdx + 1)); @@ -236,7 +271,10 @@ namespace NKikimr { TVDiskProxySet DiskProxySet; ui32 NumRunningProxies = 0; - bool PhantomCheckPending = false; + using TPhantomCheck = std::tuple<TLogoBlobID, NMatrix::TVectorType>; + std::deque<TPhantomCheck> PhantomChecksPending; + std::unordered_multimap<ui64, TPhantomCheck> PhantomChecksInFlight; + ui32 LastPhantomCheckId = 0; TDeque<TLogoBlobID> Phantoms; THashSet<TChunkIdx> WrittenChunkIdxSet; @@ -252,10 +290,15 @@ namespace NKikimr { (LastKey, LastKey), (Eof, Eof)); if (!Phantoms.empty()) { + // remove all determined phantom blobs from the UnreplicatedBlobsPtr queue + std::sort(Phantoms.begin(), Phantoms.end()); + auto pred = [this](const auto& id) { return std::binary_search(Phantoms.begin(), Phantoms.end(), id); }; + UnreplicatedBlobsPtr->erase(std::remove_if(UnreplicatedBlobsPtr->begin(), UnreplicatedBlobsPtr->end(), + pred), UnreplicatedBlobsPtr->end()); + STLOG(PRI_DEBUG, BS_REPL, BSVR06, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "sending phantoms"), (NumPhantoms, Phantoms.size())); - Send(ReplCtx->SkeletonId, new TEvDetectedPhantomBlob(std::move(Phantoms))); - Phantoms.clear(); + Send(ReplCtx->SkeletonId, new TEvDetectedPhantomBlob(std::exchange(Phantoms, {}))); } bool dropDonor = true; @@ -296,6 +339,12 @@ namespace NKikimr { LastKey = ev->Get()->LastKey; Eof = ev->Get()->Eof; + auto& mon = ReplCtx->MonGroup; + mon.ReplPhantomLikeDiscovered() += ReplInfo->PhantomLike; + Y_VERIFY_DEBUG_S(mon.ReplWorkUnitsRemaining() == 1 || ReplInfo->WorkUnitsTotal <= (ui64)mon.ReplWorkUnitsRemaining(), + "WorkUnitsTotal# " << ReplInfo->WorkUnitsTotal << " ReplWorkUnitsRemaining# " << mon.ReplWorkUnitsRemaining()); + mon.ReplWorkUnitsRemaining() = ReplInfo->WorkUnitsTotal; + if (RecoveryMachine->NoTasks()) { Finish(); return; @@ -473,9 +522,9 @@ namespace NKikimr { return false; } - if (PhantomCheckPending) { - return false; // still waiting for proxy response about phantom validation - } + { Y_DEFER { + RunPhantomChecks(); + }; while (!MergeHeap.empty()) { TimeAccount.SetState(ETimeState::MERGE); @@ -544,21 +593,11 @@ namespace NKikimr { } // recover data - TRecoveryMachine::EPhantomState phantom = TRecoveryMachine::EPhantomState::Unknown; - RecoveryMachine->Recover(*CurrentKey, *CurrentParts, RecoveryQueue, phantom); - if (phantom == TRecoveryMachine::EPhantomState::Check) { + NMatrix::TVectorType parts; + if (!RecoveryMachine->Recover(*CurrentKey, *CurrentParts, RecoveryQueue, parts)) { STLOG(PRI_INFO, BS_REPL, BSVR33, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "Sending phantom validation query"), (GroupId, GInfo->GroupID), (CurKey, *CurrentKey)); - - auto ev = std::make_unique<TEvBlobStorage::TEvGet>(*CurrentKey, 0, 0, TInstant::Max(), - NKikimrBlobStorage::EGetHandleClass::AsyncRead); - ev->PhantomCheck = true; - SendToBSProxy(SelfId(), GInfo->GroupID, ev.release()); - - PhantomCheckPending = true; - - TimeAccount.SetState(ETimeState::PHANTOM); - return false; + PhantomChecksPending.emplace_back(*CurrentKey, parts); } CurrentKey.reset(); CurrentParts.reset(); @@ -580,6 +619,14 @@ namespace NKikimr { } } + } // Y_DEFER + + if (!PhantomChecksInFlight.empty()) { + TimeAccount.SetState(ETimeState::PHANTOM); + return false; // still waiting for proxy response about phantom validation + } + Y_VERIFY(PhantomChecksPending.empty()); + Y_VERIFY(!NumRunningProxies && MergeHeap.empty() && RecoveryQueue.empty()); TimeAccount.SetState(ETimeState::OTHER); @@ -614,27 +661,57 @@ namespace NKikimr { Y_FAIL("incorrect merger state State# %" PRIu32, ui32(Writer.GetState())); } + void RunPhantomChecks() { + while (!PhantomChecksPending.empty() && PhantomChecksInFlight.size() < 32) { + const ui64 cookie = ++LastPhantomCheckId; + + size_t numItems = Min<size_t>(PhantomChecksPending.size(), 32); + TArrayHolder<TEvBlobStorage::TEvGet::TQuery> queries(new TEvBlobStorage::TEvGet::TQuery[numItems]); + for (size_t i = 0; i < numItems; ++i) { + auto& pending = PhantomChecksPending.front(); + auto& [id, parts] = pending; + queries[i].Set(id); + PhantomChecksInFlight.emplace(cookie, pending); + PhantomChecksPending.pop_front(); + } + + auto ev = std::make_unique<TEvBlobStorage::TEvGet>(queries, numItems, TInstant::Max(), + NKikimrBlobStorage::EGetHandleClass::AsyncRead); + ev->PhantomCheck = true; + SendToBSProxy(SelfId(), GInfo->GroupID, ev.release(), cookie); + } + } + void Handle(TEvBlobStorage::TEvGetResult::TPtr ev) { STLOG(PRI_INFO, BS_REPL, BSVR34, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "Received phantom validation reply"), (Msg, ev->Get()->ToString())); - Y_VERIFY(PhantomCheckPending); - Y_VERIFY(CurrentKey); - Y_VERIFY(CurrentParts); - TRecoveryMachine::EPhantomState phantom = TRecoveryMachine::EPhantomState::NonPhantom; + + auto [begin, end] = PhantomChecksInFlight.equal_range(ev->Cookie); + Y_VERIFY(begin != end); + + std::unordered_map<TLogoBlobID, bool> isPhantom; auto *msg = ev->Get(); - if (msg->Status == NKikimrProto::OK) { - Y_VERIFY(msg->ResponseSz == 1); - auto& r = msg->Responses[0]; - Y_VERIFY(r.Id == *CurrentKey); - if (r.Status == NKikimrProto::NODATA) { - Phantoms.push_back(r.Id); - phantom = TRecoveryMachine::EPhantomState::Phantom; + for (size_t i = 0; i < msg->ResponseSz; ++i) { + auto& r = msg->Responses[i]; + isPhantom.emplace(r.Id, r.Status == NKikimrProto::NODATA); + } + + for (auto it = begin; it != end; ++it) { + const auto& [_, item] = *it; + const auto& [id, parts] = item; + const auto isPhantomIt = isPhantom.find(id); + Y_VERIFY(isPhantomIt != isPhantom.end()); + const bool phantom = isPhantomIt->second; + isPhantom.erase(isPhantomIt); + RecoveryMachine->ProcessPhantomBlob(id, parts, phantom); + if (phantom) { + Phantoms.push_back(id); } } - RecoveryMachine->Recover(*CurrentKey, *CurrentParts, RecoveryQueue, phantom); - PhantomCheckPending = false; - CurrentKey.reset(); - CurrentParts.reset(); + + PhantomChecksInFlight.erase(begin, end); + Y_VERIFY(isPhantom.empty()); + Merge(); } diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp index 34be219abce..e23438fa3ce 100644 --- a/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp +++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp @@ -71,6 +71,10 @@ namespace NKikimr { PARAM_V(ReplicaOk); PARAM_V(RecoveryScheduled); PARAM_V(IgnoredDueToGC); + PARAM_V(WorkUnitsPlanned); + PARAM_V(WorkUnitsTotal); + PARAM_V(WorkUnitsProcessed); + PARAM_V(PhantomLike); } GROUP("Plan Execution Stats") { PARAM_V(DataRecoverySuccess); @@ -250,12 +254,9 @@ namespace NKikimr { STLOG(PRI_DEBUG, BS_REPL, BSVR15, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "QUANTUM START")); LastReplStart = TAppData::TimeProvider->Now(); - ReplCtx->MonGroup.ReplCurrentUnreplicatedParts() = 0; - ReplCtx->MonGroup.ReplCurrentUnreplicatedBytes() = 0; - ReplCtx->MonGroup.ReplCurrentPhantoms() = 0; - ReplCtx->MonGroup.ReplCurrentNumUnrecoveredPhantomBlobs() = 0; - ReplCtx->MonGroup.ReplCurrentNumUnrecoveredNonPhantomBlobs() = 0; ReplCtx->MonGroup.ReplUnreplicatedVDisks() = 1; + ReplCtx->MonGroup.ReplWorkUnitsRemaining() = 1; + ReplCtx->MonGroup.ReplWorkUnitsDone() = 0; Become(&TThis::StateRepl); @@ -342,7 +343,6 @@ namespace NKikimr { bool finished = false; if (info->Eof) { // when it is the last quantum for some donor, rotate the blob sets - ReplCtx->MonGroup.ReplUnreplicatedBlobs() = UnreplicatedBlobsPtr->size(); BlobsToReplicatePtr = std::move(UnreplicatedBlobsPtr); UnreplicatedBlobsPtr = std::make_shared<TBlobIdQueue>(); if (BlobsToReplicatePtr->empty()) { @@ -371,10 +371,6 @@ namespace NKikimr { STLOG(PRI_DEBUG, BS_REPL, BSVR17, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "REPL COMPLETED"), (BlobsToReplicate, BlobsToReplicatePtr->size())); LastReplEnd = now; - ReplCtx->MonGroup.ReplUnreplicatedBytes() = ReplCtx->MonGroup.ReplCurrentUnreplicatedBytes(); - ReplCtx->MonGroup.ReplPhantoms() = ReplCtx->MonGroup.ReplCurrentPhantoms(); - ReplCtx->MonGroup.ReplNumUnrecoveredPhantomBlobs() = ReplCtx->MonGroup.ReplCurrentNumUnrecoveredPhantomBlobs(); - ReplCtx->MonGroup.ReplNumUnrecoveredNonPhantomBlobs() = ReplCtx->MonGroup.ReplCurrentNumUnrecoveredNonPhantomBlobs(); if (State == WaitQueues || State == Replication) { // release token as we have finished replicating @@ -382,7 +378,7 @@ namespace NKikimr { } Become(&TThis::StateRelax); - if (*BlobsToReplicatePtr) { + if (!BlobsToReplicatePtr->empty()) { // try again for unreplicated blobs in some future State = Relaxation; Schedule(ReplCtx->VDiskCfg->ReplTimeInterval, new TEvents::TEvWakeup); @@ -390,6 +386,10 @@ namespace NKikimr { // no more blobs to replicate; replication will not resume State = Finished; ReplCtx->MonGroup.ReplUnreplicatedVDisks() = 0; + ReplCtx->MonGroup.ReplUnreplicatedPhantoms() = 1; + ReplCtx->MonGroup.ReplUnreplicatedNonPhantoms() = 1; + ReplCtx->MonGroup.ReplWorkUnitsRemaining() = 0; + ReplCtx->MonGroup.ReplWorkUnitsDone() = 0; TActivationContext::Send(new IEventHandle(TEvBlobStorage::EvReplDone, 0, ReplCtx->SkeletonId, SelfId(), nullptr, 0)); } diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.h b/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.h index 629d65bea97..ee6bb57e883 100644 --- a/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.h +++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.h @@ -11,7 +11,7 @@ namespace NKikimr { struct TProxyStat; }; - using TBlobIdQueue = TQueue<TLogoBlobID>; + using TBlobIdQueue = std::deque<TLogoBlobID>; using TBlobIdQueuePtr = std::shared_ptr<TBlobIdQueue>; //////////////////////////////////////////////////////////////////////////// @@ -33,6 +33,10 @@ namespace NKikimr { ui64 ReplicaOk = 0; ui64 RecoveryScheduled = 0; ui64 IgnoredDueToGC = 0; + ui64 WorkUnitsPlanned = 0; + ui64 WorkUnitsTotal = 0; + ui64 WorkUnitsProcessed = 0; + ui64 PhantomLike = 0; // plan execution stats ui64 DataRecoverySuccess = 0; diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h b/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h index 3c13238ad54..9dbc1bb4f5f 100644 --- a/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h +++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h @@ -118,16 +118,9 @@ namespace NKikimr { , Arena(&TRopeArenaBackend::Allocate) {} - enum class EPhantomState { - Unknown, - Check, - Phantom, - NonPhantom, - }; - - void Recover(const TLogoBlobID& id, TPartSet& partSet, TRecoveredBlobsQueue& rbq, EPhantomState& phantom) { + bool Recover(const TLogoBlobID& id, TPartSet& partSet, TRecoveredBlobsQueue& rbq, NMatrix::TVectorType& parts) { Y_VERIFY(!id.PartId()); - Y_VERIFY(PhantomCheckPending ? *PhantomCheckPending == id : (!LastRecoveredId || *LastRecoveredId < id)); + Y_VERIFY(!LastRecoveredId || *LastRecoveredId < id); LastRecoveredId = id; RecoverMetadata(id, rbq); @@ -140,7 +133,7 @@ namespace NKikimr { if (LostVec.empty() || LostVec.front().Id != id) { STLOG(PRI_ERROR, BS_REPL, BSVR27, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "blob not in LostVec"), (BlobId, id)); - return; + return true; } const TLost& lost = LostVec.front(); @@ -148,7 +141,7 @@ namespace NKikimr { const TBlobStorageGroupType groupType = ReplCtx->VCtx->Top->GType; - const NMatrix::TVectorType parts = lost.PartsToRecover; + parts = lost.PartsToRecover; ui32 partsSize = 0; bool hasExactParts = false; @@ -161,45 +154,28 @@ namespace NKikimr { } } - bool countAsRecovered = false; - Y_VERIFY_DEBUG((partSet.PartSet.PartsMask >> groupType.TotalPartCount()) == 0); const ui32 presentParts = PopCount(partSet.PartSet.PartsMask); bool canRestore = presentParts >= groupType.MinimalRestorablePartCount(); - if (phantom == EPhantomState::Unknown && lost.PossiblePhantom && needToRestore && !canRestore) { - phantom = EPhantomState::Check; + if (lost.PossiblePhantom && needToRestore && !canRestore) { + ReplInfo->DataRecoveryNoParts++; + ReplInfo->PartsMissing += parts.CountBits(); ++ReplInfo->DataRecoveryPhantomCheck; - PhantomCheckPending = id; - return; // reentry expected with the check result - } else { - PhantomCheckPending.reset(); + UnreplicatedBlobsPtr->push_back(id); // treat this blob as non-phantom by default, sort it out later + LostVec.pop_front(); + return false; } // first of all, count present parts and recover only if there are enough of these parts - if ((!canRestore && needToRestore && !hasExactParts) || phantom == EPhantomState::Phantom) { + if (!canRestore && needToRestore && !hasExactParts) { ReplInfo->DataRecoveryNoParts++; ReplInfo->PartsMissing += parts.CountBits(); STLOG(PRI_INFO, BS_REPL, BSVR28, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "not enough data parts to recover"), (BlobId, id), (NumPresentParts, presentParts), (MinParts, groupType.DataParts()), - (PartSet, partSet.ToString()), (IsPhantom, phantom == EPhantomState::Phantom), - (IsNonPhantom, phantom == EPhantomState::NonPhantom), (PossiblePhantom, lost.PossiblePhantom), - (Ingress, lost.Ingress.ToString(ReplCtx->VCtx->Top.get(), ReplCtx->VCtx->ShortSelfVDisk, id))); - ++(phantom == EPhantomState::Phantom - ? ReplCtx->MonGroup.ReplCurrentNumUnrecoveredPhantomBlobs() - : ReplCtx->MonGroup.ReplCurrentNumUnrecoveredNonPhantomBlobs()); - - if (phantom == EPhantomState::Phantom) { - ++ReplCtx->MonGroup.ReplCurrentPhantoms(); - - // count phantoms as replicated blobs - countAsRecovered = true; - for (ui8 i = parts.FirstPosition(); i != parts.GetSize(); i = parts.NextPosition(i)) { - partsSize += groupType.PartSize(TLogoBlobID(id, i + 1)); - } - } else { - UnreplicatedBlobsPtr->push(id); - } + (PartSet, partSet.ToString()), (Ingress, lost.Ingress.ToString(ReplCtx->VCtx->Top.get(), + ReplCtx->VCtx->ShortSelfVDisk, id))); + UnreplicatedBlobsPtr->push_back(id); } else { // recover try { @@ -246,7 +222,7 @@ namespace NKikimr { if (numMissingParts) { // this blob is not fully replicated yet - UnreplicatedBlobsPtr->push(id); + UnreplicatedBlobsPtr->push_back(id); } if (numSmallParts) { @@ -259,26 +235,37 @@ namespace NKikimr { ReplInfo->HugeLogoBlobsRecovered += numHuge; ReplInfo->BytesRecovered += partsSize; ReplInfo->PartsMissing += numMissingParts; - - // count recovered parts - countAsRecovered = true; - ReplInfo->DataRecoverySuccess++; + ReplInfo->WorkUnitsProcessed += id.BlobSize(); + Y_VERIFY_DEBUG(ReplInfo->WorkUnitsProcessed <= ReplInfo->WorkUnitsPlanned); + + if (!numMissingParts) { + ReplCtx->MonGroup.ReplWorkUnitsDone() += id.BlobSize(); + ReplCtx->MonGroup.ReplWorkUnitsRemaining() -= id.BlobSize(); + } + if (lost.PossiblePhantom) { + ++ReplCtx->MonGroup.ReplPhantomLikeRecovered(); + } } catch (const std::exception& ex) { ++ReplCtx->MonGroup.ReplRecoveryGroupTypeErrors(); STLOG(PRI_ERROR, BS_REPL, BSVR29, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "recovery exception"), (BlobId, id), (Error, TString(ex.what()))); ReplInfo->DataRecoveryFailure++; - UnreplicatedBlobsPtr->push(id); + UnreplicatedBlobsPtr->push_back(id); } } - if (countAsRecovered) { - ReplCtx->MonGroup.ReplCurrentUnreplicatedBytes() -= partsSize; - ReplCtx->MonGroup.ReplCurrentUnreplicatedParts() -= parts.CountBits(); - } - LostVec.pop_front(); + return true; + } + + void ProcessPhantomBlob(const TLogoBlobID& id, NMatrix::TVectorType parts, bool isPhantom) { + STLOG(PRI_INFO, BS_REPL, BSVR00, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "phantom check completed"), + (BlobId, id), (Parts, parts), (IsPhantom, isPhantom)); + + ++(isPhantom + ? ReplCtx->MonGroup.ReplPhantomLikeDropped() + : ReplCtx->MonGroup.ReplPhantomLikeUnrecovered()); } // finish work @@ -350,7 +337,6 @@ namespace NKikimr { TDeque<TLogoBlobID> MetadataParts; TRopeArena Arena; std::optional<TLogoBlobID> LastRecoveredId; - std::optional<TLogoBlobID> PhantomCheckPending; void AddBlobToQueue(const TLogoBlobID& id, TRope blob, NMatrix::TVectorType parts, bool isHugeBlob, TRecoveredBlobsQueue& rbq) { @@ -393,7 +379,10 @@ namespace NKikimr { STLOG(PRI_INFO, BS_REPL, BSVR31, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "TRecoveryMachine::SkipItem"), (BlobId, item.Id)); ++ReplInfo->DataRecoverySkip; - UnreplicatedBlobsPtr->push(item.Id); + UnreplicatedBlobsPtr->push_back(item.Id); + if (item.PossiblePhantom) { + ++ReplCtx->MonGroup.ReplPhantomLikeUnrecovered(); + } } }; diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp index 83d376047b0..fe835e0eacb 100644 --- a/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp +++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp @@ -86,6 +86,7 @@ namespace NKikimr { auto groupInfo = MakeIntrusive<TBlobStorageGroupInfo>(TBlobStorageGroupType::Erasure4Plus2Block); auto replCtx = CreateReplCtx(vdisks, groupInfo); auto info = MakeIntrusive<TEvReplFinished::TInfo>(); + info->WorkUnitsPlanned = Max<ui64>(); TBlobIdQueuePtr unreplicatedBlobsPtr = std::make_shared<TBlobIdQueue>(); NRepl::TRecoveryMachine m(replCtx, info, unreplicatedBlobsPtr); TMap<TLogoBlobID, TVector<TString>> data = GenerateData(10000, 1024, groupInfo, vdisks); @@ -148,9 +149,9 @@ namespace NKikimr { p.AddData(0, TLogoBlobID(id, partIndex + 1), NKikimrProto::OK, v[i]); } NRepl::TRecoveryMachine::TRecoveredBlobsQueue rbq; - NRepl::TRecoveryMachine::EPhantomState phantom = NRepl::TRecoveryMachine::EPhantomState::Unknown; - m.Recover(id, p, rbq, phantom); - Y_VERIFY(phantom == NRepl::TRecoveryMachine::EPhantomState::Unknown); + NMatrix::TVectorType parts; + const bool success = m.Recover(id, p, rbq, parts); + Y_VERIFY(success); ui8 partIndex; for (partIndex = 0; partIndex < groupInfo->Type.BlobSubgroupSize(); ++partIndex) { diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp index afd133c51b0..dc85118ddfd 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp @@ -86,15 +86,13 @@ namespace NKikimr { } ctx.Send(*SkeletonFrontIDPtr, ev.release()); // send VDisk's metric to NodeWarden - if (OverloadHandler) { - ctx.Send(NodeWardenServiceId, - new TEvBlobStorage::TEvControllerUpdateDiskStatus( - SelfVDiskId, - OverloadHandler->GetIntegralRankPercent(), - SelfId().NodeId(), - Config->BaseInfo.PDiskId, - Config->BaseInfo.VDiskSlotId)); - } + ctx.Send(NodeWardenServiceId, + new TEvBlobStorage::TEvControllerUpdateDiskStatus( + SelfVDiskId, + OverloadHandler ? OverloadHandler->GetIntegralRankPercent() : 0, + SelfId().NodeId(), + Config->BaseInfo.PDiskId, + Config->BaseInfo.VDiskSlotId)); // repeat later ctx.Schedule(Config->WhiteboardUpdateInterval, new TEvTimeToUpdateWhiteboard()); } diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp index 3cd92fbe56d..354302d1d02 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp @@ -903,10 +903,8 @@ namespace NKikimr { const auto state = VDiskMonGroup.VDiskState(); // replicated? bool replicated = !ReplMonGroup.ReplUnreplicatedVDisks() && !HasUnreadableBlobs; - bool unreplicatedPhantoms = ReplMonGroup.ReplCurrentNumUnrecoveredPhantomBlobs() + - ReplMonGroup.ReplNumUnrecoveredPhantomBlobs(); - bool unreplicatedNonPhantoms = ReplMonGroup.ReplCurrentNumUnrecoveredNonPhantomBlobs() + - ReplMonGroup.ReplNumUnrecoveredNonPhantomBlobs(); + bool unreplicatedPhantoms = ReplMonGroup.ReplUnreplicatedPhantoms(); + bool unreplicatedNonPhantoms = ReplMonGroup.ReplUnreplicatedNonPhantoms(); // unsynced VDisks ui64 unsyncedVDisks = SyncerMonGroup.SyncerUnsyncedDisks(); // calculate cumulative status of Skeleton Front overload @@ -917,8 +915,16 @@ namespace NKikimr { light = Max(light, queue->GetCumulativeLight()); } // send a message to Whiteboard - ctx.Send(SelfId(), new NNodeWhiteboard::TEvWhiteboard::TEvVDiskStateUpdate(state, outOfSpaceFlags, - replicated, unreplicatedPhantoms, unreplicatedNonPhantoms, unsyncedVDisks, light, HasUnreadableBlobs)); + auto ev = std::make_unique<NNodeWhiteboard::TEvWhiteboard::TEvVDiskStateUpdate>(state, outOfSpaceFlags, + replicated, unreplicatedPhantoms, unreplicatedNonPhantoms, unsyncedVDisks, light, HasUnreadableBlobs); + if (ReplMonGroup.ReplUnreplicatedVDisks()) { + const ui64 a = ReplMonGroup.ReplWorkUnitsDone(); + const ui64 b = ReplMonGroup.ReplWorkUnitsRemaining(); + ev->Record.SetReplicationProgress((double)a / (a + b)); + } else { + ev->Record.SetReplicationProgress(1.0); + } + ctx.Send(SelfId(), ev.release()); // repeat later if (schedule) { ctx.Schedule(Config->WhiteboardUpdateInterval, new TEvTimeToUpdateWhiteboard); diff --git a/ydb/core/protos/node_whiteboard.proto b/ydb/core/protos/node_whiteboard.proto index b15e8b09929..4e44eea60c9 100644 --- a/ydb/core/protos/node_whiteboard.proto +++ b/ydb/core/protos/node_whiteboard.proto @@ -182,6 +182,8 @@ message TVDiskStateInfo { optional bool UnreplicatedPhantoms = 20 [default = false]; // The same for the non-phantom-like blobs. optional bool UnreplicatedNonPhantoms = 21 [default = false]; + // Replication progress (0 to 1). Only for replication, not blob scrubbing. + optional float ReplicationProgress = 30; // How many unsynced VDisks from current BlobStorage group we see optional uint64 UnsyncedVDisks = 15 [default = 0]; // How much this VDisk have allocated on corresponding PDisk |