diff options
author | alexvru <alexvru@ydb.tech> | 2023-11-15 14:46:01 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-11-15 16:06:24 +0300 |
commit | cba5bedd1aaa67cdfad2dbdcc98c8a72acd39382 (patch) | |
tree | 0cb739c2e8cbd0b02a9a8c7b2e44290c2d6e064a | |
parent | 92c4058c74bb6bf043bbb65cb375f5a34540d4b6 (diff) | |
download | ydb-cba5bedd1aaa67cdfad2dbdcc98c8a72acd39382.tar.gz |
Fix replication bug KIKIMR-20008
4 files changed, 106 insertions, 83 deletions
diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp index c4a2a28b9ff..c1d4c81ffb4 100644 --- a/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp +++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp @@ -60,7 +60,7 @@ namespace NKikimr { ReplInfo->ItemsTotal += UnreplicatedBlobsPtr->size(); // prepare the recovery machine - RecoveryMachine = std::make_unique<TRecoveryMachine>(ReplCtx, ReplInfo, std::move(UnreplicatedBlobsPtr)); + RecoveryMachine = std::make_unique<TRecoveryMachine>(ReplCtx, ReplInfo); // request for snapshot Send(ReplCtx->SkeletonId, new TEvTakeHullSnapshot(true)); @@ -322,6 +322,13 @@ namespace NKikimr { void Bootstrap() { STLOG(PRI_DEBUG, BS_REPL, BSVR02, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "THullReplJobActor::Bootstrap")); + + if (BlobsToReplicatePtr) { + for (const TLogoBlobID& id : *BlobsToReplicatePtr) { + Y_ABORT_UNLESS(UnreplicatedBlobRecords.contains(id)); + } + } + TimeAccount.SetState(ETimeState::PREPARE_PLAN); auto actor = std::make_unique<THullReplPlannerActor>(ReplCtx, GInfo, StartKey, ReplInfo, BlobsToReplicatePtr, UnreplicatedBlobsPtr); auto aid = RunInBatchPool(ActorContext(), actor.release()); @@ -336,6 +343,20 @@ namespace NKikimr { LastKey = ev->Get()->LastKey; Eof = ev->Get()->Eof; + if (BlobsToReplicatePtr) { + TUnreplicatedBlobRecords temp; + RecoveryMachine->ForEach([&](TLogoBlobID id, NMatrix::TVectorType /*partsToRecover*/, TIngress /*ingress*/) { + auto nh = UnreplicatedBlobRecords.extract(id); + Y_ABORT_UNLESS(nh); + temp.insert(std::move(nh)); + }); + for (auto it = UnreplicatedBlobRecords.begin(); it != UnreplicatedBlobRecords.end(); ) { + const auto& [key, value] = *it++; + DropUnreplicatedBlobRecord(key); + } + UnreplicatedBlobRecords = std::move(temp); + } + auto& mon = ReplCtx->MonGroup; if ((mon.ReplWorkUnitsRemaining() && ReplInfo->WorkUnitsTotal > (ui64)mon.ReplWorkUnitsRemaining()) || @@ -599,26 +620,7 @@ namespace NKikimr { } // recover data - NMatrix::TVectorType parts; - TIngress ingress; - switch (RecoveryMachine->Recover(item, RecoveryQueue, parts, ingress)) { - case TRecoveryMachine::ERecoverStatus::UNKNOWN: - Y_ABORT(); - - case TRecoveryMachine::ERecoverStatus::RESTORED: - UnreplicatedBlobRecords.erase(item.Id); - break; - - case TRecoveryMachine::ERecoverStatus::PHANTOM_CHECK: - STLOG(PRI_INFO, BS_REPL, BSVR33, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "Sending phantom validation query"), - (GroupId, GInfo->GroupID), (CurKey, item.Id)); - PhantomChecksPending.emplace_back(item.Id, parts, item, ingress); - break; - - case TRecoveryMachine::ERecoverStatus::RETRY: - AddUnreplicatedBlobRecord(item, ingress, false); - break; - } + RecoveryMachine->Recover(item, RecoveryQueue, *this); CurrentItem.reset(); // process recovered items, if any; queueProcessed.first will be false when writer is not ready for new data @@ -650,10 +652,13 @@ namespace NKikimr { TimeAccount.SetState(ETimeState::OTHER); if (!RecoveryMachineFinished) { - RecoveryMachine->Finish(RecoveryQueue); + RecoveryMachine->Finish(RecoveryQueue, *this); RecoveryMachineFinished = true; STLOG(PRI_DEBUG, BS_REPL, BSVR07, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "finished recovery machine"), (RecoveryQueueSize, RecoveryQueue.size())); + + // sort unreplicated blobs vector as it may contain records in incorrect order due to phantom checking + std::sort(UnreplicatedBlobsPtr->begin(), UnreplicatedBlobsPtr->end()); return true; } @@ -732,12 +737,9 @@ namespace NKikimr { auto node = isPhantom.extract(id); Y_ABORT_UNLESS(node); auto [phantom, looksLikePhantom] = node.mapped(); - RecoveryMachine->ProcessPhantomBlob(id, parts, phantom, looksLikePhantom); + RecoveryMachine->ProcessPhantomBlob(partSet, parts, phantom, looksLikePhantom, ingress, *this); if (phantom) { Phantoms.push_back(id); - AddUnreplicatedBlobRecord(partSet, ingress, true); - } else { - UnreplicatedBlobRecords.erase(id); } } @@ -747,7 +749,12 @@ namespace NKikimr { Merge(); } - void AddUnreplicatedBlobRecord(const TRecoveryMachine::TPartSet& item, const TIngress& ingress, bool looksLikePhantom) { + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // TRecoveryMachine interoperation + + friend class TRecoveryMachine; + + void AddUnreplicatedBlobRecord(const TRecoveryMachine::TPartSet& item, TIngress ingress, bool looksLikePhantom) { TUnreplicatedBlobRecord record{ingress, item.PartsMask, item.DisksRepliedOK, item.DisksRepliedNODATA, item.DisksRepliedNOT_YET, item.DisksRepliedOther, looksLikePhantom}; const auto [it, inserted] = UnreplicatedBlobRecords.try_emplace(item.Id, record); @@ -762,6 +769,7 @@ namespace NKikimr { } else if (record.LooksLikePhantom) { ++ReplCtx->MonGroup.ReplPhantomBlobsWithProblems(); } + UnreplicatedBlobsPtr->push_back(item.Id); } void DropUnreplicatedBlobRecord(const TLogoBlobID& id) { @@ -774,6 +782,12 @@ namespace NKikimr { } } + void AddPhantomBlobRecord(const TRecoveryMachine::TPartSet& item, TIngress ingress, NMatrix::TVectorType partsToRecover) { + STLOG(PRI_INFO, BS_REPL, BSVR33, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "Sending phantom validation query"), + (GroupId, GInfo->GroupID), (CurKey, item.Id)); + PhantomChecksPending.emplace_back(item.Id, partsToRecover, item, ingress); + } + EProcessQueueAction ProcessQueue() { while (!RecoveryQueue.empty()) { auto& front = RecoveryQueue.front(); diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp index d32fc058baa..a4ce0107c9f 100644 --- a/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp +++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp @@ -351,10 +351,19 @@ namespace NKikimr { if (info->Eof) { // when it is the last quantum for some donor, rotate the blob sets BlobsToReplicatePtr = std::move(UnreplicatedBlobsPtr); UnreplicatedBlobsPtr = std::make_shared<TBlobIdQueue>(); + +#ifndef NDEBUG + Y_VERIFY_DEBUG_S(BlobsToReplicatePtr->size() == UnreplicatedBlobRecords.size(), + "BlobsToReplicatePtr->size# " << BlobsToReplicatePtr->size() + << " UnreplicatedBlobRecords.size# " << UnreplicatedBlobRecords.size()); + for (const TLogoBlobID& id : *BlobsToReplicatePtr) { + Y_DEBUG_ABORT_UNLESS(UnreplicatedBlobRecords.contains(id)); + } +#endif + if (BlobsToReplicatePtr->empty()) { // no more blobs to replicate -- consider replication finished finished = true; - Y_DEBUG_ABORT_UNLESS(UnreplicatedBlobRecords.empty()); for (const auto& donor : std::exchange(DonorQueue, {})) { if (donor) { DropDonor(*donor); @@ -478,8 +487,7 @@ namespace NKikimr { const TInstant timeAtEnd = last->End; if (workAtBegin < workAtEnd || timeAtEnd < timeAtBegin) { - Y_DEBUG_ABORT_UNLESS(false); - return {}; + return {}; // can't evaluate } const double workPerSecond = (workAtBegin - workAtEnd) / (timeAtEnd - timeAtBegin).SecondsFloat(); diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h b/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h index e96fcde9e68..59f0b6390b9 100644 --- a/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h +++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h @@ -109,26 +109,18 @@ namespace NKikimr { } }; - enum class ERecoverStatus { - UNKNOWN, - RESTORED, - PHANTOM_CHECK, - RETRY, - }; - public: TRecoveryMachine( std::shared_ptr<TReplCtx> replCtx, - TEvReplFinished::TInfoPtr replInfo, - TBlobIdQueuePtr unreplicatedBlobsPtr) + TEvReplFinished::TInfoPtr replInfo) : ReplCtx(std::move(replCtx)) , ReplInfo(replInfo) - , UnreplicatedBlobsPtr(std::move(unreplicatedBlobsPtr)) , LostVec(TMemoryConsumer(ReplCtx->VCtx->Replication)) , Arena(&TRopeArenaBackend::Allocate) {} - ERecoverStatus Recover(TPartSet& item, TRecoveredBlobsQueue& rbq, NMatrix::TVectorType& parts, TIngress& ingress) { + template<typename TBlobProcessor> + void Recover(TPartSet& item, TRecoveredBlobsQueue& rbq, TBlobProcessor&& processor) { const TLogoBlobID& id = item.Id; Y_ABORT_UNLESS(!id.PartId()); Y_ABORT_UNLESS(!LastRecoveredId || *LastRecoveredId < id); @@ -137,23 +129,22 @@ namespace NKikimr { RecoverMetadata(id, rbq); while (!LostVec.empty() && LostVec.front().Id < id) { - SkipItem(LostVec.front()); + SkipItem(LostVec.front(), processor); LostVec.pop_front(); } if (LostVec.empty() || LostVec.front().Id != id) { STLOG(PRI_ERROR, BS_REPL, BSVR27, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "blob not in LostVec"), (BlobId, id)); - return ERecoverStatus::RESTORED; + return; } const TLost& lost = LostVec.front(); Y_ABORT_UNLESS(lost.Id == id); - const TBlobStorageGroupType groupType = ReplCtx->VCtx->Top->GType; + const NMatrix::TVectorType parts = lost.PartsToRecover; - parts = lost.PartsToRecover; - ingress = lost.Ingress; + const TBlobStorageGroupType groupType = ReplCtx->VCtx->Top->GType; ui32 partsSize = 0; bool hasExactParts = false; @@ -169,19 +160,18 @@ namespace NKikimr { Y_DEBUG_ABORT_UNLESS((item.PartsMask >> groupType.TotalPartCount()) == 0); const ui32 presentParts = PopCount(item.PartsMask); bool canRestore = presentParts >= groupType.MinimalRestorablePartCount(); - ERecoverStatus status = ERecoverStatus::UNKNOWN; // first of all, count present parts and recover only if there are enough of these parts if (!canRestore && needToRestore && !hasExactParts) { if (lost.PossiblePhantom) { - status = ERecoverStatus::PHANTOM_CHECK; + processor.AddPhantomBlobRecord(item, lost.Ingress, lost.PartsToRecover); } else { STLOG(PRI_INFO, BS_REPL, BSVR28, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "not enough data parts to recover"), (BlobId, id), (NumPresentParts, presentParts), (MinParts, groupType.DataParts()), (PartSet, item.ToString()), (Ingress, lost.Ingress.ToString(ReplCtx->VCtx->Top.get(), ReplCtx->VCtx->ShortSelfVDisk, id))); - BlobDone(id, false, true, &TEvReplFinished::TInfo::ItemsNotRecovered); - status = ERecoverStatus::RETRY; + BlobDone(item, false /*success*/, true /*unrecovered*/, &TEvReplFinished::TInfo::ItemsNotRecovered, + lost.Ingress, false /*looksLikePhantom*/, processor); } } else { // recover @@ -235,59 +225,66 @@ namespace NKikimr { ReplInfo->BytesRecovered += partsSize; if (!numMissingParts) { - BlobDone(id, true, false, &TEvReplFinished::TInfo::ItemsRecovered); + BlobDone(item, true /*success*/, false /*unrecovered*/, &TEvReplFinished::TInfo::ItemsRecovered, + {}, false /*looksLikePhantom*/, processor); if (lost.PossiblePhantom) { ++ReplCtx->MonGroup.ReplPhantomLikeRecovered(); } - status = ERecoverStatus::RESTORED; } else if (lost.PossiblePhantom) { - status = ERecoverStatus::PHANTOM_CHECK; // run phantom check for this blob + processor.AddPhantomBlobRecord(item, lost.Ingress, lost.PartsToRecover); } else { - BlobDone(id, false, true, &TEvReplFinished::TInfo::ItemsPartiallyRecovered); - status = ERecoverStatus::RETRY; + BlobDone(item, false /*success*/, true /*unrecovered*/, &TEvReplFinished::TInfo::ItemsPartiallyRecovered, + lost.Ingress, false /*looksLikePhantom*/, processor); } } catch (const std::exception& ex) { ++ReplCtx->MonGroup.ReplRecoveryGroupTypeErrors(); STLOG(PRI_ERROR, BS_REPL, BSVR29, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "recovery exception"), (BlobId, id), (Error, TString(ex.what()))); - BlobDone(id, false, true, &TEvReplFinished::TInfo::ItemsException); - status = ERecoverStatus::RETRY; + BlobDone(item, false /*success*/, true /*unrecovered*/, &TEvReplFinished::TInfo::ItemsException, + lost.Ingress, false /*looksLikePhantom*/, processor); } } LostVec.pop_front(); - Y_ABORT_UNLESS(status != ERecoverStatus::UNKNOWN); - return status; } - void ProcessPhantomBlob(const TLogoBlobID& id, NMatrix::TVectorType parts, bool isPhantom, bool looksLikePhantom) { + template<typename TBlobProcessor> + void ProcessPhantomBlob(const TPartSet& item, NMatrix::TVectorType parts, bool isPhantom, bool looksLikePhantom, + TIngress ingress, TBlobProcessor&& processor) { STLOG(PRI_INFO, BS_REPL, BSVR00, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "phantom check completed"), - (BlobId, id), (Parts, parts), (IsPhantom, isPhantom), (LooksLikePhantom, looksLikePhantom)); + (BlobId, item.Id), (Parts, parts), (IsPhantom, isPhantom), (LooksLikePhantom, looksLikePhantom)); const bool success = isPhantom; // confirmed phantom blob + const bool unrecovered = !looksLikePhantom; // if blob doesn't look like phantom, then it is ordinary unrecovered blob Y_DEBUG_ABORT_UNLESS(isPhantom <= looksLikePhantom); ++(success ? ReplCtx->MonGroup.ReplPhantomLikeDropped() : ReplCtx->MonGroup.ReplPhantomLikeUnrecovered()); - BlobDone(id, success, !looksLikePhantom, looksLikePhantom - ? &TEvReplFinished::TInfo::ItemsPhantom - : &TEvReplFinished::TInfo::ItemsNonPhantom); + auto counter = success ? &TEvReplFinished::TInfo::ItemsPhantom : + looksLikePhantom ? &TEvReplFinished::TInfo::ItemsNonPhantom : + &TEvReplFinished::TInfo::ItemsNotRecovered; + + BlobDone(item, success, unrecovered, counter, ingress, looksLikePhantom, processor); } - void BlobDone(TLogoBlobID id, bool success, bool unrecovered, ui64 TEvReplFinished::TInfo::*counter) { - STLOG(PRI_DEBUG, BS_REPL, BSVR35, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "BlobDone"), (BlobId, id), + template<typename TBlobProcessor> + void BlobDone(const TPartSet& item, bool success, bool unrecovered, ui64 TEvReplFinished::TInfo::*counter, + TIngress ingress, bool looksLikePhantom, TBlobProcessor&& processor) { + STLOG(PRI_DEBUG, BS_REPL, BSVR35, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "BlobDone"), (BlobId, item.Id), (Success, success)); + const ui32 size = item.Id.BlobSize(); if (success) { - ReplInfo->WorkUnitsPerformed += id.BlobSize(); - ReplCtx->MonGroup.ReplWorkUnitsDone() += id.BlobSize(); - ReplCtx->MonGroup.ReplWorkUnitsRemaining() -= id.BlobSize(); + ReplInfo->WorkUnitsPerformed += size; + ReplCtx->MonGroup.ReplWorkUnitsDone() += size; + ReplCtx->MonGroup.ReplWorkUnitsRemaining() -= size; ++ReplCtx->MonGroup.ReplItemsDone(); --ReplCtx->MonGroup.ReplItemsRemaining(); + processor.DropUnreplicatedBlobRecord(item.Id); } else { - UnreplicatedBlobsPtr->push_back(id); + processor.AddUnreplicatedBlobRecord(item, ingress, looksLikePhantom); } ++((*ReplInfo).*counter); @@ -295,15 +292,13 @@ namespace NKikimr { } // finish work - void Finish(TRecoveredBlobsQueue& rbq) { + template<typename TBlobProcessor> + void Finish(TRecoveredBlobsQueue& rbq, TBlobProcessor&& processor) { RecoverMetadata(TLogoBlobID(Max<ui64>(), Max<ui64>(), Max<ui64>()), rbq); for (auto&& item : LostVec) { - SkipItem(item); + SkipItem(item, processor); } LostVec.clear(); - - // sort unreplicated blobs vector as it may contain records in incorrect order due to phantom checking - std::sort(UnreplicatedBlobsPtr->begin(), UnreplicatedBlobsPtr->end()); } // add next task during preparation phase @@ -361,7 +356,6 @@ namespace NKikimr { std::shared_ptr<TReplCtx> ReplCtx; TEvReplFinished::TInfoPtr ReplInfo; - TBlobIdQueuePtr UnreplicatedBlobsPtr; TLostVec LostVec; TDeque<TLogoBlobID> MetadataParts; TRopeArena Arena; @@ -404,14 +398,15 @@ namespace NKikimr { } } - void SkipItem(const TLost& item) { + template<typename TBlobProcessor> + void SkipItem(const TLost& item, TBlobProcessor&& processor) { STLOG(PRI_INFO, BS_REPL, BSVR31, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "TRecoveryMachine::SkipItem"), (BlobId, item.Id)); ++ReplInfo->ItemsNotRecovered; - UnreplicatedBlobsPtr->push_back(item.Id); if (item.PossiblePhantom) { ++ReplCtx->MonGroup.ReplPhantomLikeUnrecovered(); } + processor.AddUnreplicatedBlobRecord({item.Id, ReplCtx->VCtx->Top->GType}, item.Ingress, item.PossiblePhantom); } }; diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp index c6fd481146e..3134c22e136 100644 --- a/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp +++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp @@ -88,7 +88,7 @@ namespace NKikimr { auto info = MakeIntrusive<TEvReplFinished::TInfo>(); info->WorkUnitsPlanned = Max<ui64>(); TBlobIdQueuePtr unreplicatedBlobsPtr = std::make_shared<TBlobIdQueue>(); - NRepl::TRecoveryMachine m(replCtx, info, unreplicatedBlobsPtr); + NRepl::TRecoveryMachine m(replCtx, info); TMap<TLogoBlobID, TVector<TString>> data = GenerateData(10000, 1024, groupInfo, vdisks); for (const auto& pair : data) { const TLogoBlobID& id = pair.first; @@ -149,10 +149,16 @@ namespace NKikimr { p.AddData(0, TLogoBlobID(id, partIndex + 1), NKikimrProto::OK, TRope(v[i])); } NRepl::TRecoveryMachine::TRecoveredBlobsQueue rbq; - NMatrix::TVectorType parts; - TIngress ingress; - const bool success = m.Recover(p, rbq, parts, ingress) == NRepl::TRecoveryMachine::ERecoverStatus::RESTORED; - Y_ABORT_UNLESS(success); + struct { + void AddUnreplicatedBlobRecord(const NRepl::TRecoveryMachine::TPartSet& /*item*/, TIngress /*ingress*/, + bool /*looksLikePhantom*/) {} + void DropUnreplicatedBlobRecord(const TLogoBlobID& /*id*/) {} + void AddPhantomBlobRecord(const NRepl::TRecoveryMachine::TPartSet& /*item*/, TIngress /*ingress*/, + NMatrix::TVectorType /*partsToRecover*/) { + Y_ABORT(); + } + } processor; + m.Recover(p, rbq, processor); ui8 partIndex; for (partIndex = 0; partIndex < groupInfo->Type.BlobSubgroupSize(); ++partIndex) { |