aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-11-15 14:46:01 +0300
committeralexvru <alexvru@ydb.tech>2023-11-15 16:06:24 +0300
commitcba5bedd1aaa67cdfad2dbdcc98c8a72acd39382 (patch)
tree0cb739c2e8cbd0b02a9a8c7b2e44290c2d6e064a
parent92c4058c74bb6bf043bbb65cb375f5a34540d4b6 (diff)
downloadydb-cba5bedd1aaa67cdfad2dbdcc98c8a72acd39382.tar.gz
Fix replication bug KIKIMR-20008
-rw-r--r--ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp68
-rw-r--r--ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp14
-rw-r--r--ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h91
-rw-r--r--ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp16
4 files changed, 106 insertions, 83 deletions
diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp
index c4a2a28b9ff..c1d4c81ffb4 100644
--- a/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp
+++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp
@@ -60,7 +60,7 @@ namespace NKikimr {
ReplInfo->ItemsTotal += UnreplicatedBlobsPtr->size();
// prepare the recovery machine
- RecoveryMachine = std::make_unique<TRecoveryMachine>(ReplCtx, ReplInfo, std::move(UnreplicatedBlobsPtr));
+ RecoveryMachine = std::make_unique<TRecoveryMachine>(ReplCtx, ReplInfo);
// request for snapshot
Send(ReplCtx->SkeletonId, new TEvTakeHullSnapshot(true));
@@ -322,6 +322,13 @@ namespace NKikimr {
void Bootstrap() {
STLOG(PRI_DEBUG, BS_REPL, BSVR02, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "THullReplJobActor::Bootstrap"));
+
+ if (BlobsToReplicatePtr) {
+ for (const TLogoBlobID& id : *BlobsToReplicatePtr) {
+ Y_ABORT_UNLESS(UnreplicatedBlobRecords.contains(id));
+ }
+ }
+
TimeAccount.SetState(ETimeState::PREPARE_PLAN);
auto actor = std::make_unique<THullReplPlannerActor>(ReplCtx, GInfo, StartKey, ReplInfo, BlobsToReplicatePtr, UnreplicatedBlobsPtr);
auto aid = RunInBatchPool(ActorContext(), actor.release());
@@ -336,6 +343,20 @@ namespace NKikimr {
LastKey = ev->Get()->LastKey;
Eof = ev->Get()->Eof;
+ if (BlobsToReplicatePtr) {
+ TUnreplicatedBlobRecords temp;
+ RecoveryMachine->ForEach([&](TLogoBlobID id, NMatrix::TVectorType /*partsToRecover*/, TIngress /*ingress*/) {
+ auto nh = UnreplicatedBlobRecords.extract(id);
+ Y_ABORT_UNLESS(nh);
+ temp.insert(std::move(nh));
+ });
+ for (auto it = UnreplicatedBlobRecords.begin(); it != UnreplicatedBlobRecords.end(); ) {
+ const auto& [key, value] = *it++;
+ DropUnreplicatedBlobRecord(key);
+ }
+ UnreplicatedBlobRecords = std::move(temp);
+ }
+
auto& mon = ReplCtx->MonGroup;
if ((mon.ReplWorkUnitsRemaining() && ReplInfo->WorkUnitsTotal > (ui64)mon.ReplWorkUnitsRemaining()) ||
@@ -599,26 +620,7 @@ namespace NKikimr {
}
// recover data
- NMatrix::TVectorType parts;
- TIngress ingress;
- switch (RecoveryMachine->Recover(item, RecoveryQueue, parts, ingress)) {
- case TRecoveryMachine::ERecoverStatus::UNKNOWN:
- Y_ABORT();
-
- case TRecoveryMachine::ERecoverStatus::RESTORED:
- UnreplicatedBlobRecords.erase(item.Id);
- break;
-
- case TRecoveryMachine::ERecoverStatus::PHANTOM_CHECK:
- STLOG(PRI_INFO, BS_REPL, BSVR33, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "Sending phantom validation query"),
- (GroupId, GInfo->GroupID), (CurKey, item.Id));
- PhantomChecksPending.emplace_back(item.Id, parts, item, ingress);
- break;
-
- case TRecoveryMachine::ERecoverStatus::RETRY:
- AddUnreplicatedBlobRecord(item, ingress, false);
- break;
- }
+ RecoveryMachine->Recover(item, RecoveryQueue, *this);
CurrentItem.reset();
// process recovered items, if any; queueProcessed.first will be false when writer is not ready for new data
@@ -650,10 +652,13 @@ namespace NKikimr {
TimeAccount.SetState(ETimeState::OTHER);
if (!RecoveryMachineFinished) {
- RecoveryMachine->Finish(RecoveryQueue);
+ RecoveryMachine->Finish(RecoveryQueue, *this);
RecoveryMachineFinished = true;
STLOG(PRI_DEBUG, BS_REPL, BSVR07, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "finished recovery machine"),
(RecoveryQueueSize, RecoveryQueue.size()));
+
+ // sort unreplicated blobs vector as it may contain records in incorrect order due to phantom checking
+ std::sort(UnreplicatedBlobsPtr->begin(), UnreplicatedBlobsPtr->end());
return true;
}
@@ -732,12 +737,9 @@ namespace NKikimr {
auto node = isPhantom.extract(id);
Y_ABORT_UNLESS(node);
auto [phantom, looksLikePhantom] = node.mapped();
- RecoveryMachine->ProcessPhantomBlob(id, parts, phantom, looksLikePhantom);
+ RecoveryMachine->ProcessPhantomBlob(partSet, parts, phantom, looksLikePhantom, ingress, *this);
if (phantom) {
Phantoms.push_back(id);
- AddUnreplicatedBlobRecord(partSet, ingress, true);
- } else {
- UnreplicatedBlobRecords.erase(id);
}
}
@@ -747,7 +749,12 @@ namespace NKikimr {
Merge();
}
- void AddUnreplicatedBlobRecord(const TRecoveryMachine::TPartSet& item, const TIngress& ingress, bool looksLikePhantom) {
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // TRecoveryMachine interoperation
+
+ friend class TRecoveryMachine;
+
+ void AddUnreplicatedBlobRecord(const TRecoveryMachine::TPartSet& item, TIngress ingress, bool looksLikePhantom) {
TUnreplicatedBlobRecord record{ingress, item.PartsMask, item.DisksRepliedOK, item.DisksRepliedNODATA,
item.DisksRepliedNOT_YET, item.DisksRepliedOther, looksLikePhantom};
const auto [it, inserted] = UnreplicatedBlobRecords.try_emplace(item.Id, record);
@@ -762,6 +769,7 @@ namespace NKikimr {
} else if (record.LooksLikePhantom) {
++ReplCtx->MonGroup.ReplPhantomBlobsWithProblems();
}
+ UnreplicatedBlobsPtr->push_back(item.Id);
}
void DropUnreplicatedBlobRecord(const TLogoBlobID& id) {
@@ -774,6 +782,12 @@ namespace NKikimr {
}
}
+ void AddPhantomBlobRecord(const TRecoveryMachine::TPartSet& item, TIngress ingress, NMatrix::TVectorType partsToRecover) {
+ STLOG(PRI_INFO, BS_REPL, BSVR33, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "Sending phantom validation query"),
+ (GroupId, GInfo->GroupID), (CurKey, item.Id));
+ PhantomChecksPending.emplace_back(item.Id, partsToRecover, item, ingress);
+ }
+
EProcessQueueAction ProcessQueue() {
while (!RecoveryQueue.empty()) {
auto& front = RecoveryQueue.front();
diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp
index d32fc058baa..a4ce0107c9f 100644
--- a/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp
+++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp
@@ -351,10 +351,19 @@ namespace NKikimr {
if (info->Eof) { // when it is the last quantum for some donor, rotate the blob sets
BlobsToReplicatePtr = std::move(UnreplicatedBlobsPtr);
UnreplicatedBlobsPtr = std::make_shared<TBlobIdQueue>();
+
+#ifndef NDEBUG
+ Y_VERIFY_DEBUG_S(BlobsToReplicatePtr->size() == UnreplicatedBlobRecords.size(),
+ "BlobsToReplicatePtr->size# " << BlobsToReplicatePtr->size()
+ << " UnreplicatedBlobRecords.size# " << UnreplicatedBlobRecords.size());
+ for (const TLogoBlobID& id : *BlobsToReplicatePtr) {
+ Y_DEBUG_ABORT_UNLESS(UnreplicatedBlobRecords.contains(id));
+ }
+#endif
+
if (BlobsToReplicatePtr->empty()) {
// no more blobs to replicate -- consider replication finished
finished = true;
- Y_DEBUG_ABORT_UNLESS(UnreplicatedBlobRecords.empty());
for (const auto& donor : std::exchange(DonorQueue, {})) {
if (donor) {
DropDonor(*donor);
@@ -478,8 +487,7 @@ namespace NKikimr {
const TInstant timeAtEnd = last->End;
if (workAtBegin < workAtEnd || timeAtEnd < timeAtBegin) {
- Y_DEBUG_ABORT_UNLESS(false);
- return {};
+ return {}; // can't evaluate
}
const double workPerSecond = (workAtBegin - workAtEnd) / (timeAtEnd - timeAtBegin).SecondsFloat();
diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h b/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h
index e96fcde9e68..59f0b6390b9 100644
--- a/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h
+++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h
@@ -109,26 +109,18 @@ namespace NKikimr {
}
};
- enum class ERecoverStatus {
- UNKNOWN,
- RESTORED,
- PHANTOM_CHECK,
- RETRY,
- };
-
public:
TRecoveryMachine(
std::shared_ptr<TReplCtx> replCtx,
- TEvReplFinished::TInfoPtr replInfo,
- TBlobIdQueuePtr unreplicatedBlobsPtr)
+ TEvReplFinished::TInfoPtr replInfo)
: ReplCtx(std::move(replCtx))
, ReplInfo(replInfo)
- , UnreplicatedBlobsPtr(std::move(unreplicatedBlobsPtr))
, LostVec(TMemoryConsumer(ReplCtx->VCtx->Replication))
, Arena(&TRopeArenaBackend::Allocate)
{}
- ERecoverStatus Recover(TPartSet& item, TRecoveredBlobsQueue& rbq, NMatrix::TVectorType& parts, TIngress& ingress) {
+ template<typename TBlobProcessor>
+ void Recover(TPartSet& item, TRecoveredBlobsQueue& rbq, TBlobProcessor&& processor) {
const TLogoBlobID& id = item.Id;
Y_ABORT_UNLESS(!id.PartId());
Y_ABORT_UNLESS(!LastRecoveredId || *LastRecoveredId < id);
@@ -137,23 +129,22 @@ namespace NKikimr {
RecoverMetadata(id, rbq);
while (!LostVec.empty() && LostVec.front().Id < id) {
- SkipItem(LostVec.front());
+ SkipItem(LostVec.front(), processor);
LostVec.pop_front();
}
if (LostVec.empty() || LostVec.front().Id != id) {
STLOG(PRI_ERROR, BS_REPL, BSVR27, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "blob not in LostVec"),
(BlobId, id));
- return ERecoverStatus::RESTORED;
+ return;
}
const TLost& lost = LostVec.front();
Y_ABORT_UNLESS(lost.Id == id);
- const TBlobStorageGroupType groupType = ReplCtx->VCtx->Top->GType;
+ const NMatrix::TVectorType parts = lost.PartsToRecover;
- parts = lost.PartsToRecover;
- ingress = lost.Ingress;
+ const TBlobStorageGroupType groupType = ReplCtx->VCtx->Top->GType;
ui32 partsSize = 0;
bool hasExactParts = false;
@@ -169,19 +160,18 @@ namespace NKikimr {
Y_DEBUG_ABORT_UNLESS((item.PartsMask >> groupType.TotalPartCount()) == 0);
const ui32 presentParts = PopCount(item.PartsMask);
bool canRestore = presentParts >= groupType.MinimalRestorablePartCount();
- ERecoverStatus status = ERecoverStatus::UNKNOWN;
// first of all, count present parts and recover only if there are enough of these parts
if (!canRestore && needToRestore && !hasExactParts) {
if (lost.PossiblePhantom) {
- status = ERecoverStatus::PHANTOM_CHECK;
+ processor.AddPhantomBlobRecord(item, lost.Ingress, lost.PartsToRecover);
} else {
STLOG(PRI_INFO, BS_REPL, BSVR28, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "not enough data parts to recover"),
(BlobId, id), (NumPresentParts, presentParts), (MinParts, groupType.DataParts()),
(PartSet, item.ToString()), (Ingress, lost.Ingress.ToString(ReplCtx->VCtx->Top.get(),
ReplCtx->VCtx->ShortSelfVDisk, id)));
- BlobDone(id, false, true, &TEvReplFinished::TInfo::ItemsNotRecovered);
- status = ERecoverStatus::RETRY;
+ BlobDone(item, false /*success*/, true /*unrecovered*/, &TEvReplFinished::TInfo::ItemsNotRecovered,
+ lost.Ingress, false /*looksLikePhantom*/, processor);
}
} else {
// recover
@@ -235,59 +225,66 @@ namespace NKikimr {
ReplInfo->BytesRecovered += partsSize;
if (!numMissingParts) {
- BlobDone(id, true, false, &TEvReplFinished::TInfo::ItemsRecovered);
+ BlobDone(item, true /*success*/, false /*unrecovered*/, &TEvReplFinished::TInfo::ItemsRecovered,
+ {}, false /*looksLikePhantom*/, processor);
if (lost.PossiblePhantom) {
++ReplCtx->MonGroup.ReplPhantomLikeRecovered();
}
- status = ERecoverStatus::RESTORED;
} else if (lost.PossiblePhantom) {
- status = ERecoverStatus::PHANTOM_CHECK; // run phantom check for this blob
+ processor.AddPhantomBlobRecord(item, lost.Ingress, lost.PartsToRecover);
} else {
- BlobDone(id, false, true, &TEvReplFinished::TInfo::ItemsPartiallyRecovered);
- status = ERecoverStatus::RETRY;
+ BlobDone(item, false /*success*/, true /*unrecovered*/, &TEvReplFinished::TInfo::ItemsPartiallyRecovered,
+ lost.Ingress, false /*looksLikePhantom*/, processor);
}
} catch (const std::exception& ex) {
++ReplCtx->MonGroup.ReplRecoveryGroupTypeErrors();
STLOG(PRI_ERROR, BS_REPL, BSVR29, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "recovery exception"),
(BlobId, id), (Error, TString(ex.what())));
- BlobDone(id, false, true, &TEvReplFinished::TInfo::ItemsException);
- status = ERecoverStatus::RETRY;
+ BlobDone(item, false /*success*/, true /*unrecovered*/, &TEvReplFinished::TInfo::ItemsException,
+ lost.Ingress, false /*looksLikePhantom*/, processor);
}
}
LostVec.pop_front();
- Y_ABORT_UNLESS(status != ERecoverStatus::UNKNOWN);
- return status;
}
- void ProcessPhantomBlob(const TLogoBlobID& id, NMatrix::TVectorType parts, bool isPhantom, bool looksLikePhantom) {
+ template<typename TBlobProcessor>
+ void ProcessPhantomBlob(const TPartSet& item, NMatrix::TVectorType parts, bool isPhantom, bool looksLikePhantom,
+ TIngress ingress, TBlobProcessor&& processor) {
STLOG(PRI_INFO, BS_REPL, BSVR00, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "phantom check completed"),
- (BlobId, id), (Parts, parts), (IsPhantom, isPhantom), (LooksLikePhantom, looksLikePhantom));
+ (BlobId, item.Id), (Parts, parts), (IsPhantom, isPhantom), (LooksLikePhantom, looksLikePhantom));
const bool success = isPhantom; // confirmed phantom blob
+ const bool unrecovered = !looksLikePhantom; // if blob doesn't look like phantom, then it is ordinary unrecovered blob
Y_DEBUG_ABORT_UNLESS(isPhantom <= looksLikePhantom);
++(success
? ReplCtx->MonGroup.ReplPhantomLikeDropped()
: ReplCtx->MonGroup.ReplPhantomLikeUnrecovered());
- BlobDone(id, success, !looksLikePhantom, looksLikePhantom
- ? &TEvReplFinished::TInfo::ItemsPhantom
- : &TEvReplFinished::TInfo::ItemsNonPhantom);
+ auto counter = success ? &TEvReplFinished::TInfo::ItemsPhantom :
+ looksLikePhantom ? &TEvReplFinished::TInfo::ItemsNonPhantom :
+ &TEvReplFinished::TInfo::ItemsNotRecovered;
+
+ BlobDone(item, success, unrecovered, counter, ingress, looksLikePhantom, processor);
}
- void BlobDone(TLogoBlobID id, bool success, bool unrecovered, ui64 TEvReplFinished::TInfo::*counter) {
- STLOG(PRI_DEBUG, BS_REPL, BSVR35, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "BlobDone"), (BlobId, id),
+ template<typename TBlobProcessor>
+ void BlobDone(const TPartSet& item, bool success, bool unrecovered, ui64 TEvReplFinished::TInfo::*counter,
+ TIngress ingress, bool looksLikePhantom, TBlobProcessor&& processor) {
+ STLOG(PRI_DEBUG, BS_REPL, BSVR35, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "BlobDone"), (BlobId, item.Id),
(Success, success));
+ const ui32 size = item.Id.BlobSize();
if (success) {
- ReplInfo->WorkUnitsPerformed += id.BlobSize();
- ReplCtx->MonGroup.ReplWorkUnitsDone() += id.BlobSize();
- ReplCtx->MonGroup.ReplWorkUnitsRemaining() -= id.BlobSize();
+ ReplInfo->WorkUnitsPerformed += size;
+ ReplCtx->MonGroup.ReplWorkUnitsDone() += size;
+ ReplCtx->MonGroup.ReplWorkUnitsRemaining() -= size;
++ReplCtx->MonGroup.ReplItemsDone();
--ReplCtx->MonGroup.ReplItemsRemaining();
+ processor.DropUnreplicatedBlobRecord(item.Id);
} else {
- UnreplicatedBlobsPtr->push_back(id);
+ processor.AddUnreplicatedBlobRecord(item, ingress, looksLikePhantom);
}
++((*ReplInfo).*counter);
@@ -295,15 +292,13 @@ namespace NKikimr {
}
// finish work
- void Finish(TRecoveredBlobsQueue& rbq) {
+ template<typename TBlobProcessor>
+ void Finish(TRecoveredBlobsQueue& rbq, TBlobProcessor&& processor) {
RecoverMetadata(TLogoBlobID(Max<ui64>(), Max<ui64>(), Max<ui64>()), rbq);
for (auto&& item : LostVec) {
- SkipItem(item);
+ SkipItem(item, processor);
}
LostVec.clear();
-
- // sort unreplicated blobs vector as it may contain records in incorrect order due to phantom checking
- std::sort(UnreplicatedBlobsPtr->begin(), UnreplicatedBlobsPtr->end());
}
// add next task during preparation phase
@@ -361,7 +356,6 @@ namespace NKikimr {
std::shared_ptr<TReplCtx> ReplCtx;
TEvReplFinished::TInfoPtr ReplInfo;
- TBlobIdQueuePtr UnreplicatedBlobsPtr;
TLostVec LostVec;
TDeque<TLogoBlobID> MetadataParts;
TRopeArena Arena;
@@ -404,14 +398,15 @@ namespace NKikimr {
}
}
- void SkipItem(const TLost& item) {
+ template<typename TBlobProcessor>
+ void SkipItem(const TLost& item, TBlobProcessor&& processor) {
STLOG(PRI_INFO, BS_REPL, BSVR31, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "TRecoveryMachine::SkipItem"),
(BlobId, item.Id));
++ReplInfo->ItemsNotRecovered;
- UnreplicatedBlobsPtr->push_back(item.Id);
if (item.PossiblePhantom) {
++ReplCtx->MonGroup.ReplPhantomLikeUnrecovered();
}
+ processor.AddUnreplicatedBlobRecord({item.Id, ReplCtx->VCtx->Top->GType}, item.Ingress, item.PossiblePhantom);
}
};
diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp
index c6fd481146e..3134c22e136 100644
--- a/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp
+++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp
@@ -88,7 +88,7 @@ namespace NKikimr {
auto info = MakeIntrusive<TEvReplFinished::TInfo>();
info->WorkUnitsPlanned = Max<ui64>();
TBlobIdQueuePtr unreplicatedBlobsPtr = std::make_shared<TBlobIdQueue>();
- NRepl::TRecoveryMachine m(replCtx, info, unreplicatedBlobsPtr);
+ NRepl::TRecoveryMachine m(replCtx, info);
TMap<TLogoBlobID, TVector<TString>> data = GenerateData(10000, 1024, groupInfo, vdisks);
for (const auto& pair : data) {
const TLogoBlobID& id = pair.first;
@@ -149,10 +149,16 @@ namespace NKikimr {
p.AddData(0, TLogoBlobID(id, partIndex + 1), NKikimrProto::OK, TRope(v[i]));
}
NRepl::TRecoveryMachine::TRecoveredBlobsQueue rbq;
- NMatrix::TVectorType parts;
- TIngress ingress;
- const bool success = m.Recover(p, rbq, parts, ingress) == NRepl::TRecoveryMachine::ERecoverStatus::RESTORED;
- Y_ABORT_UNLESS(success);
+ struct {
+ void AddUnreplicatedBlobRecord(const NRepl::TRecoveryMachine::TPartSet& /*item*/, TIngress /*ingress*/,
+ bool /*looksLikePhantom*/) {}
+ void DropUnreplicatedBlobRecord(const TLogoBlobID& /*id*/) {}
+ void AddPhantomBlobRecord(const NRepl::TRecoveryMachine::TPartSet& /*item*/, TIngress /*ingress*/,
+ NMatrix::TVectorType /*partsToRecover*/) {
+ Y_ABORT();
+ }
+ } processor;
+ m.Recover(p, rbq, processor);
ui8 partIndex;
for (partIndex = 0; partIndex < groupInfo->Type.BlobSubgroupSize(); ++partIndex) {