aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2022-11-18 11:56:10 +0300
committeralexvru <alexvru@ydb.tech>2022-11-18 11:56:10 +0300
commit551ae898cac3eee67cb5de921f630f485968e470 (patch)
treeb8d0c7083af72c33b7591b925c8d0989e79fcbe2
parent0c7d05761ea0a649f947749448b1eb1bb812fc93 (diff)
downloadydb-551ae898cac3eee67cb5de921f630f485968e470.tar.gz
Improve PhantomCheck logic
-rw-r--r--ydb/core/blobstorage/vdisk/common/vdisk_mongroups.h38
-rw-r--r--ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp175
-rw-r--r--ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp22
-rw-r--r--ydb/core/blobstorage/vdisk/repl/blobstorage_repl.h6
-rw-r--r--ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h91
-rw-r--r--ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp7
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp16
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp18
-rw-r--r--ydb/core/protos/node_whiteboard.proto2
9 files changed, 223 insertions, 152 deletions
diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_mongroups.h b/ydb/core/blobstorage/vdisk/common/vdisk_mongroups.h
index 118cc062cad..3e5b5edc9ba 100644
--- a/ydb/core/blobstorage/vdisk/common/vdisk_mongroups.h
+++ b/ydb/core/blobstorage/vdisk/common/vdisk_mongroups.h
@@ -200,19 +200,16 @@ public:
COUNTER_INIT(ReplHugeBlobsRecovered, true);
COUNTER_INIT(ReplHugeBlobBytesRecovered, true);
COUNTER_INIT(ReplChunksWritten, true);
- COUNTER_INIT(ReplCurrentUnreplicatedParts, false);
- COUNTER_INIT(ReplCurrentUnreplicatedBytes, false);
- COUNTER_INIT(ReplCurrentPhantoms, false);
- COUNTER_INIT(ReplUnreplicatedParts, false);
- COUNTER_INIT(ReplUnreplicatedBytes, false);
- COUNTER_INIT(ReplPhantoms, false);
COUNTER_INIT(ReplUnreplicatedVDisks, false);
- COUNTER_INIT(ReplUnreplicatedBlobs, false);
COUNTER_INIT(ReplVGetBytesReceived, true);
- COUNTER_INIT(ReplCurrentNumUnrecoveredPhantomBlobs, false);
- COUNTER_INIT(ReplCurrentNumUnrecoveredNonPhantomBlobs, false);
- COUNTER_INIT(ReplNumUnrecoveredPhantomBlobs, false);
- COUNTER_INIT(ReplNumUnrecoveredNonPhantomBlobs, false);
+ COUNTER_INIT(ReplPhantomLikeDiscovered, false);
+ COUNTER_INIT(ReplPhantomLikeRecovered, false);
+ COUNTER_INIT(ReplPhantomLikeUnrecovered, false);
+ COUNTER_INIT(ReplPhantomLikeDropped, false);
+ COUNTER_INIT(ReplWorkUnitsDone, false);
+ COUNTER_INIT(ReplWorkUnitsRemaining, false);
+ COUNTER_INIT(ReplUnreplicatedPhantoms, false);
+ COUNTER_INIT(ReplUnreplicatedNonPhantoms, false);
}
COUNTER_DEF(SyncerVSyncMessagesSent);
@@ -222,19 +219,16 @@ public:
COUNTER_DEF(ReplHugeBlobsRecovered);
COUNTER_DEF(ReplHugeBlobBytesRecovered);
COUNTER_DEF(ReplChunksWritten);
- COUNTER_DEF(ReplCurrentUnreplicatedParts);
- COUNTER_DEF(ReplCurrentUnreplicatedBytes);
- COUNTER_DEF(ReplCurrentPhantoms);
- COUNTER_DEF(ReplUnreplicatedParts);
- COUNTER_DEF(ReplUnreplicatedBytes);
- COUNTER_DEF(ReplPhantoms);
COUNTER_DEF(ReplUnreplicatedVDisks);
- COUNTER_DEF(ReplUnreplicatedBlobs);
COUNTER_DEF(ReplVGetBytesReceived);
- COUNTER_DEF(ReplCurrentNumUnrecoveredPhantomBlobs);
- COUNTER_DEF(ReplCurrentNumUnrecoveredNonPhantomBlobs);
- COUNTER_DEF(ReplNumUnrecoveredPhantomBlobs);
- COUNTER_DEF(ReplNumUnrecoveredNonPhantomBlobs);
+ COUNTER_DEF(ReplPhantomLikeDiscovered);
+ COUNTER_DEF(ReplPhantomLikeRecovered);
+ COUNTER_DEF(ReplPhantomLikeUnrecovered);
+ COUNTER_DEF(ReplPhantomLikeDropped);
+ COUNTER_DEF(ReplWorkUnitsDone);
+ COUNTER_DEF(ReplWorkUnitsRemaining);
+ COUNTER_DEF(ReplUnreplicatedPhantoms);
+ COUNTER_DEF(ReplUnreplicatedNonPhantoms);
};
///////////////////////////////////////////////////////////////////////////////////
diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp
index 4f6287387d6..19ed6686f96 100644
--- a/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp
+++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp
@@ -42,6 +42,7 @@ namespace NKikimr {
TIntrusivePtr<TBlobStorageGroupInfo> GInfo;
TActorId Recipient;
TLogoBlobID StartKey;
+ std::optional<TLogoBlobID> ReplyKey;
TEvReplFinished::TInfoPtr ReplInfo;
TBlobIdQueuePtr BlobsToReplicatePtr;
TBlobIdQueuePtr UnreplicatedBlobsPtr;
@@ -72,6 +73,7 @@ namespace NKikimr {
// create iterator for the logoblobs metabase
TLogoBlobsSnapshot::TIndexForwardIterator it(snap.HullCtx, &snap.LogoBlobsSnap);
bool eof = false;
+ bool resume = false;
const ui64 plannedEndTime = GetCycleCountFast() + DurationToCycles(ReplCtx->VDiskCfg->ReplPlanQuantum);
auto going = [&, first = true]() mutable { // the predicate that determines the length of the quantum
@@ -86,13 +88,18 @@ namespace NKikimr {
if (BlobsToReplicatePtr) {
// iterate over queue items and match them with iterator
- for (; !BlobsToReplicatePtr->empty() && going(); BlobsToReplicatePtr->pop()) {
+ for (; !BlobsToReplicatePtr->empty() && going(); BlobsToReplicatePtr->pop_front()) {
const TLogoBlobID& key = BlobsToReplicatePtr->front();
it.Seek(key);
if (it.Valid() && it.GetCurKey().LogoBlobID() == key) {
ProcessItem(it, barriers, allowKeepFlags);
}
}
+ if (RecoveryMachine->FullOfTasks() || QuantumBytes >= ReplCtx->VDiskCfg->ReplMaxQuantumBytes) {
+ for (const TLogoBlobID& id : *BlobsToReplicatePtr) {
+ ReplInfo->WorkUnitsTotal += id.BlobSize();
+ }
+ }
eof = BlobsToReplicatePtr->empty();
} else {
// scan through the index until we have enough blobs to recover or the time is out
@@ -101,18 +108,35 @@ namespace NKikimr {
}
if (it.Valid()) {
StartKey = it.GetCurKey().LogoBlobID(); // we gonna resume later starting from this key
+ if (!ReplyKey) {
+ ReplyKey = StartKey;
+ }
+
+ const TBlobStorageGroupInfo::TTopology& topology = *ReplCtx->VCtx->Top;
+ ui32 counter = 0;
+ for (; it.Valid(); it.Next()) {
+ if (++counter % 1024 == 0 && GetCycleCountFast() > plannedEndTime) {
+ resume = true;
+ break;
+ }
+
+ const TLogoBlobID key = it.GetCurKey().LogoBlobID();
+ const TMemRecLogoBlob memRec = it.GetMemRec();
+ const TIngress ingress = memRec.GetIngress();
+ const auto parts = ingress.PartsWeMustHaveLocally(&topology, ReplCtx->VCtx->ShortSelfVDisk,
+ key) - ingress.LocalParts(topology.GType);
+ if (!parts.Empty()) {
+ ReplInfo->WorkUnitsTotal += key.BlobSize();
+ }
+ }
} else {
eof = true;
}
}
- if (eof || RecoveryMachine->FullOfTasks() || QuantumBytes >= ReplCtx->VDiskCfg->ReplMaxQuantumBytes) {
- // adjust counters
- ReplCtx->MonGroup.ReplCurrentUnreplicatedParts() += QuantumParts;
- ReplCtx->MonGroup.ReplCurrentUnreplicatedBytes() += QuantumBytes;
-
+ if (!resume && (eof || RecoveryMachine->FullOfTasks() || QuantumBytes >= ReplCtx->VDiskCfg->ReplMaxQuantumBytes)) {
// the planning stage has finished, issue reply to the job actor
- Send(Recipient, new TEvReplPlanFinished(std::move(RecoveryMachine), StartKey, eof));
+ Send(Recipient, new TEvReplPlanFinished(std::move(RecoveryMachine), ReplyKey.value_or(TLogoBlobID()), eof));
// finish processing for this actor
PassAway();
@@ -138,19 +162,30 @@ namespace NKikimr {
NMatrix::TVectorType parts = ingress.PartsWeMustHaveLocally(&topology, ReplCtx->VCtx->ShortSelfVDisk,
key) - ingress.LocalParts(topology.GType);
- // scan for metadata parts
- for (ui8 i = parts.FirstPosition(); i != parts.GetSize(); i = parts.NextPosition(i)) {
- const TLogoBlobID id(key, i + 1);
- if (!gtype.PartSize(id)) {
- parts.Clear(i);
- RecoveryMachine->AddMetadataPart(id);
+ if (!parts.Empty()) {
+ // scan for metadata parts
+ for (ui8 i = parts.FirstPosition(); i != parts.GetSize(); i = parts.NextPosition(i)) {
+ const TLogoBlobID id(key, i + 1);
+ if (!gtype.PartSize(id)) {
+ parts.Clear(i);
+ RecoveryMachine->AddMetadataPart(id);
+ }
}
- }
- if (!parts.Empty()) {
const bool phantomLike = !status.KeepByBarrier && ReplInfo->DonorVDiskId == TVDiskID();
RecoveryMachine->AddTask(key, parts, phantomLike, ingress);
+ ReplInfo->WorkUnitsPlanned += key.BlobSize();
+ ReplInfo->WorkUnitsTotal += key.BlobSize();
+ ReplInfo->PhantomLike += phantomLike;
+
+ if (phantomLike) {
+ ++ReplCtx->MonGroup.ReplPhantomLikeDiscovered();
+ ReplCtx->MonGroup.ReplUnreplicatedPhantoms() = 1;
+ } else {
+ ReplCtx->MonGroup.ReplUnreplicatedNonPhantoms() = 1;
+ }
+
// calculate part size and total size to recover
for (ui8 partIdx = parts.FirstPosition(); partIdx != parts.GetSize(); partIdx = parts.NextPosition(partIdx)) {
QuantumBytes += gtype.PartSize(TLogoBlobID(key, partIdx + 1));
@@ -236,7 +271,10 @@ namespace NKikimr {
TVDiskProxySet DiskProxySet;
ui32 NumRunningProxies = 0;
- bool PhantomCheckPending = false;
+ using TPhantomCheck = std::tuple<TLogoBlobID, NMatrix::TVectorType>;
+ std::deque<TPhantomCheck> PhantomChecksPending;
+ std::unordered_multimap<ui64, TPhantomCheck> PhantomChecksInFlight;
+ ui32 LastPhantomCheckId = 0;
TDeque<TLogoBlobID> Phantoms;
THashSet<TChunkIdx> WrittenChunkIdxSet;
@@ -252,10 +290,15 @@ namespace NKikimr {
(LastKey, LastKey), (Eof, Eof));
if (!Phantoms.empty()) {
+ // remove all determined phantom blobs from the UnreplicatedBlobsPtr queue
+ std::sort(Phantoms.begin(), Phantoms.end());
+ auto pred = [this](const auto& id) { return std::binary_search(Phantoms.begin(), Phantoms.end(), id); };
+ UnreplicatedBlobsPtr->erase(std::remove_if(UnreplicatedBlobsPtr->begin(), UnreplicatedBlobsPtr->end(),
+ pred), UnreplicatedBlobsPtr->end());
+
STLOG(PRI_DEBUG, BS_REPL, BSVR06, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "sending phantoms"),
(NumPhantoms, Phantoms.size()));
- Send(ReplCtx->SkeletonId, new TEvDetectedPhantomBlob(std::move(Phantoms)));
- Phantoms.clear();
+ Send(ReplCtx->SkeletonId, new TEvDetectedPhantomBlob(std::exchange(Phantoms, {})));
}
bool dropDonor = true;
@@ -296,6 +339,12 @@ namespace NKikimr {
LastKey = ev->Get()->LastKey;
Eof = ev->Get()->Eof;
+ auto& mon = ReplCtx->MonGroup;
+ mon.ReplPhantomLikeDiscovered() += ReplInfo->PhantomLike;
+ Y_VERIFY_DEBUG_S(mon.ReplWorkUnitsRemaining() == 1 || ReplInfo->WorkUnitsTotal <= (ui64)mon.ReplWorkUnitsRemaining(),
+ "WorkUnitsTotal# " << ReplInfo->WorkUnitsTotal << " ReplWorkUnitsRemaining# " << mon.ReplWorkUnitsRemaining());
+ mon.ReplWorkUnitsRemaining() = ReplInfo->WorkUnitsTotal;
+
if (RecoveryMachine->NoTasks()) {
Finish();
return;
@@ -473,9 +522,9 @@ namespace NKikimr {
return false;
}
- if (PhantomCheckPending) {
- return false; // still waiting for proxy response about phantom validation
- }
+ { Y_DEFER {
+ RunPhantomChecks();
+ };
while (!MergeHeap.empty()) {
TimeAccount.SetState(ETimeState::MERGE);
@@ -544,21 +593,11 @@ namespace NKikimr {
}
// recover data
- TRecoveryMachine::EPhantomState phantom = TRecoveryMachine::EPhantomState::Unknown;
- RecoveryMachine->Recover(*CurrentKey, *CurrentParts, RecoveryQueue, phantom);
- if (phantom == TRecoveryMachine::EPhantomState::Check) {
+ NMatrix::TVectorType parts;
+ if (!RecoveryMachine->Recover(*CurrentKey, *CurrentParts, RecoveryQueue, parts)) {
STLOG(PRI_INFO, BS_REPL, BSVR33, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "Sending phantom validation query"),
(GroupId, GInfo->GroupID), (CurKey, *CurrentKey));
-
- auto ev = std::make_unique<TEvBlobStorage::TEvGet>(*CurrentKey, 0, 0, TInstant::Max(),
- NKikimrBlobStorage::EGetHandleClass::AsyncRead);
- ev->PhantomCheck = true;
- SendToBSProxy(SelfId(), GInfo->GroupID, ev.release());
-
- PhantomCheckPending = true;
-
- TimeAccount.SetState(ETimeState::PHANTOM);
- return false;
+ PhantomChecksPending.emplace_back(*CurrentKey, parts);
}
CurrentKey.reset();
CurrentParts.reset();
@@ -580,6 +619,14 @@ namespace NKikimr {
}
}
+ } // Y_DEFER
+
+ if (!PhantomChecksInFlight.empty()) {
+ TimeAccount.SetState(ETimeState::PHANTOM);
+ return false; // still waiting for proxy response about phantom validation
+ }
+ Y_VERIFY(PhantomChecksPending.empty());
+
Y_VERIFY(!NumRunningProxies && MergeHeap.empty() && RecoveryQueue.empty());
TimeAccount.SetState(ETimeState::OTHER);
@@ -614,27 +661,57 @@ namespace NKikimr {
Y_FAIL("incorrect merger state State# %" PRIu32, ui32(Writer.GetState()));
}
+ void RunPhantomChecks() {
+ while (!PhantomChecksPending.empty() && PhantomChecksInFlight.size() < 32) {
+ const ui64 cookie = ++LastPhantomCheckId;
+
+ size_t numItems = Min<size_t>(PhantomChecksPending.size(), 32);
+ TArrayHolder<TEvBlobStorage::TEvGet::TQuery> queries(new TEvBlobStorage::TEvGet::TQuery[numItems]);
+ for (size_t i = 0; i < numItems; ++i) {
+ auto& pending = PhantomChecksPending.front();
+ auto& [id, parts] = pending;
+ queries[i].Set(id);
+ PhantomChecksInFlight.emplace(cookie, pending);
+ PhantomChecksPending.pop_front();
+ }
+
+ auto ev = std::make_unique<TEvBlobStorage::TEvGet>(queries, numItems, TInstant::Max(),
+ NKikimrBlobStorage::EGetHandleClass::AsyncRead);
+ ev->PhantomCheck = true;
+ SendToBSProxy(SelfId(), GInfo->GroupID, ev.release(), cookie);
+ }
+ }
+
void Handle(TEvBlobStorage::TEvGetResult::TPtr ev) {
STLOG(PRI_INFO, BS_REPL, BSVR34, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "Received phantom validation reply"),
(Msg, ev->Get()->ToString()));
- Y_VERIFY(PhantomCheckPending);
- Y_VERIFY(CurrentKey);
- Y_VERIFY(CurrentParts);
- TRecoveryMachine::EPhantomState phantom = TRecoveryMachine::EPhantomState::NonPhantom;
+
+ auto [begin, end] = PhantomChecksInFlight.equal_range(ev->Cookie);
+ Y_VERIFY(begin != end);
+
+ std::unordered_map<TLogoBlobID, bool> isPhantom;
auto *msg = ev->Get();
- if (msg->Status == NKikimrProto::OK) {
- Y_VERIFY(msg->ResponseSz == 1);
- auto& r = msg->Responses[0];
- Y_VERIFY(r.Id == *CurrentKey);
- if (r.Status == NKikimrProto::NODATA) {
- Phantoms.push_back(r.Id);
- phantom = TRecoveryMachine::EPhantomState::Phantom;
+ for (size_t i = 0; i < msg->ResponseSz; ++i) {
+ auto& r = msg->Responses[i];
+ isPhantom.emplace(r.Id, r.Status == NKikimrProto::NODATA);
+ }
+
+ for (auto it = begin; it != end; ++it) {
+ const auto& [_, item] = *it;
+ const auto& [id, parts] = item;
+ const auto isPhantomIt = isPhantom.find(id);
+ Y_VERIFY(isPhantomIt != isPhantom.end());
+ const bool phantom = isPhantomIt->second;
+ isPhantom.erase(isPhantomIt);
+ RecoveryMachine->ProcessPhantomBlob(id, parts, phantom);
+ if (phantom) {
+ Phantoms.push_back(id);
}
}
- RecoveryMachine->Recover(*CurrentKey, *CurrentParts, RecoveryQueue, phantom);
- PhantomCheckPending = false;
- CurrentKey.reset();
- CurrentParts.reset();
+
+ PhantomChecksInFlight.erase(begin, end);
+ Y_VERIFY(isPhantom.empty());
+
Merge();
}
diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp
index 34be219abce..e23438fa3ce 100644
--- a/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp
+++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp
@@ -71,6 +71,10 @@ namespace NKikimr {
PARAM_V(ReplicaOk);
PARAM_V(RecoveryScheduled);
PARAM_V(IgnoredDueToGC);
+ PARAM_V(WorkUnitsPlanned);
+ PARAM_V(WorkUnitsTotal);
+ PARAM_V(WorkUnitsProcessed);
+ PARAM_V(PhantomLike);
}
GROUP("Plan Execution Stats") {
PARAM_V(DataRecoverySuccess);
@@ -250,12 +254,9 @@ namespace NKikimr {
STLOG(PRI_DEBUG, BS_REPL, BSVR15, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "QUANTUM START"));
LastReplStart = TAppData::TimeProvider->Now();
- ReplCtx->MonGroup.ReplCurrentUnreplicatedParts() = 0;
- ReplCtx->MonGroup.ReplCurrentUnreplicatedBytes() = 0;
- ReplCtx->MonGroup.ReplCurrentPhantoms() = 0;
- ReplCtx->MonGroup.ReplCurrentNumUnrecoveredPhantomBlobs() = 0;
- ReplCtx->MonGroup.ReplCurrentNumUnrecoveredNonPhantomBlobs() = 0;
ReplCtx->MonGroup.ReplUnreplicatedVDisks() = 1;
+ ReplCtx->MonGroup.ReplWorkUnitsRemaining() = 1;
+ ReplCtx->MonGroup.ReplWorkUnitsDone() = 0;
Become(&TThis::StateRepl);
@@ -342,7 +343,6 @@ namespace NKikimr {
bool finished = false;
if (info->Eof) { // when it is the last quantum for some donor, rotate the blob sets
- ReplCtx->MonGroup.ReplUnreplicatedBlobs() = UnreplicatedBlobsPtr->size();
BlobsToReplicatePtr = std::move(UnreplicatedBlobsPtr);
UnreplicatedBlobsPtr = std::make_shared<TBlobIdQueue>();
if (BlobsToReplicatePtr->empty()) {
@@ -371,10 +371,6 @@ namespace NKikimr {
STLOG(PRI_DEBUG, BS_REPL, BSVR17, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "REPL COMPLETED"),
(BlobsToReplicate, BlobsToReplicatePtr->size()));
LastReplEnd = now;
- ReplCtx->MonGroup.ReplUnreplicatedBytes() = ReplCtx->MonGroup.ReplCurrentUnreplicatedBytes();
- ReplCtx->MonGroup.ReplPhantoms() = ReplCtx->MonGroup.ReplCurrentPhantoms();
- ReplCtx->MonGroup.ReplNumUnrecoveredPhantomBlobs() = ReplCtx->MonGroup.ReplCurrentNumUnrecoveredPhantomBlobs();
- ReplCtx->MonGroup.ReplNumUnrecoveredNonPhantomBlobs() = ReplCtx->MonGroup.ReplCurrentNumUnrecoveredNonPhantomBlobs();
if (State == WaitQueues || State == Replication) {
// release token as we have finished replicating
@@ -382,7 +378,7 @@ namespace NKikimr {
}
Become(&TThis::StateRelax);
- if (*BlobsToReplicatePtr) {
+ if (!BlobsToReplicatePtr->empty()) {
// try again for unreplicated blobs in some future
State = Relaxation;
Schedule(ReplCtx->VDiskCfg->ReplTimeInterval, new TEvents::TEvWakeup);
@@ -390,6 +386,10 @@ namespace NKikimr {
// no more blobs to replicate; replication will not resume
State = Finished;
ReplCtx->MonGroup.ReplUnreplicatedVDisks() = 0;
+ ReplCtx->MonGroup.ReplUnreplicatedPhantoms() = 1;
+ ReplCtx->MonGroup.ReplUnreplicatedNonPhantoms() = 1;
+ ReplCtx->MonGroup.ReplWorkUnitsRemaining() = 0;
+ ReplCtx->MonGroup.ReplWorkUnitsDone() = 0;
TActivationContext::Send(new IEventHandle(TEvBlobStorage::EvReplDone, 0, ReplCtx->SkeletonId,
SelfId(), nullptr, 0));
}
diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.h b/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.h
index 629d65bea97..ee6bb57e883 100644
--- a/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.h
+++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.h
@@ -11,7 +11,7 @@ namespace NKikimr {
struct TProxyStat;
};
- using TBlobIdQueue = TQueue<TLogoBlobID>;
+ using TBlobIdQueue = std::deque<TLogoBlobID>;
using TBlobIdQueuePtr = std::shared_ptr<TBlobIdQueue>;
////////////////////////////////////////////////////////////////////////////
@@ -33,6 +33,10 @@ namespace NKikimr {
ui64 ReplicaOk = 0;
ui64 RecoveryScheduled = 0;
ui64 IgnoredDueToGC = 0;
+ ui64 WorkUnitsPlanned = 0;
+ ui64 WorkUnitsTotal = 0;
+ ui64 WorkUnitsProcessed = 0;
+ ui64 PhantomLike = 0;
// plan execution stats
ui64 DataRecoverySuccess = 0;
diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h b/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h
index 3c13238ad54..9dbc1bb4f5f 100644
--- a/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h
+++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h
@@ -118,16 +118,9 @@ namespace NKikimr {
, Arena(&TRopeArenaBackend::Allocate)
{}
- enum class EPhantomState {
- Unknown,
- Check,
- Phantom,
- NonPhantom,
- };
-
- void Recover(const TLogoBlobID& id, TPartSet& partSet, TRecoveredBlobsQueue& rbq, EPhantomState& phantom) {
+ bool Recover(const TLogoBlobID& id, TPartSet& partSet, TRecoveredBlobsQueue& rbq, NMatrix::TVectorType& parts) {
Y_VERIFY(!id.PartId());
- Y_VERIFY(PhantomCheckPending ? *PhantomCheckPending == id : (!LastRecoveredId || *LastRecoveredId < id));
+ Y_VERIFY(!LastRecoveredId || *LastRecoveredId < id);
LastRecoveredId = id;
RecoverMetadata(id, rbq);
@@ -140,7 +133,7 @@ namespace NKikimr {
if (LostVec.empty() || LostVec.front().Id != id) {
STLOG(PRI_ERROR, BS_REPL, BSVR27, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "blob not in LostVec"),
(BlobId, id));
- return;
+ return true;
}
const TLost& lost = LostVec.front();
@@ -148,7 +141,7 @@ namespace NKikimr {
const TBlobStorageGroupType groupType = ReplCtx->VCtx->Top->GType;
- const NMatrix::TVectorType parts = lost.PartsToRecover;
+ parts = lost.PartsToRecover;
ui32 partsSize = 0;
bool hasExactParts = false;
@@ -161,45 +154,28 @@ namespace NKikimr {
}
}
- bool countAsRecovered = false;
-
Y_VERIFY_DEBUG((partSet.PartSet.PartsMask >> groupType.TotalPartCount()) == 0);
const ui32 presentParts = PopCount(partSet.PartSet.PartsMask);
bool canRestore = presentParts >= groupType.MinimalRestorablePartCount();
- if (phantom == EPhantomState::Unknown && lost.PossiblePhantom && needToRestore && !canRestore) {
- phantom = EPhantomState::Check;
+ if (lost.PossiblePhantom && needToRestore && !canRestore) {
+ ReplInfo->DataRecoveryNoParts++;
+ ReplInfo->PartsMissing += parts.CountBits();
++ReplInfo->DataRecoveryPhantomCheck;
- PhantomCheckPending = id;
- return; // reentry expected with the check result
- } else {
- PhantomCheckPending.reset();
+ UnreplicatedBlobsPtr->push_back(id); // treat this blob as non-phantom by default, sort it out later
+ LostVec.pop_front();
+ return false;
}
// first of all, count present parts and recover only if there are enough of these parts
- if ((!canRestore && needToRestore && !hasExactParts) || phantom == EPhantomState::Phantom) {
+ if (!canRestore && needToRestore && !hasExactParts) {
ReplInfo->DataRecoveryNoParts++;
ReplInfo->PartsMissing += parts.CountBits();
STLOG(PRI_INFO, BS_REPL, BSVR28, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "not enough data parts to recover"),
(BlobId, id), (NumPresentParts, presentParts), (MinParts, groupType.DataParts()),
- (PartSet, partSet.ToString()), (IsPhantom, phantom == EPhantomState::Phantom),
- (IsNonPhantom, phantom == EPhantomState::NonPhantom), (PossiblePhantom, lost.PossiblePhantom),
- (Ingress, lost.Ingress.ToString(ReplCtx->VCtx->Top.get(), ReplCtx->VCtx->ShortSelfVDisk, id)));
- ++(phantom == EPhantomState::Phantom
- ? ReplCtx->MonGroup.ReplCurrentNumUnrecoveredPhantomBlobs()
- : ReplCtx->MonGroup.ReplCurrentNumUnrecoveredNonPhantomBlobs());
-
- if (phantom == EPhantomState::Phantom) {
- ++ReplCtx->MonGroup.ReplCurrentPhantoms();
-
- // count phantoms as replicated blobs
- countAsRecovered = true;
- for (ui8 i = parts.FirstPosition(); i != parts.GetSize(); i = parts.NextPosition(i)) {
- partsSize += groupType.PartSize(TLogoBlobID(id, i + 1));
- }
- } else {
- UnreplicatedBlobsPtr->push(id);
- }
+ (PartSet, partSet.ToString()), (Ingress, lost.Ingress.ToString(ReplCtx->VCtx->Top.get(),
+ ReplCtx->VCtx->ShortSelfVDisk, id)));
+ UnreplicatedBlobsPtr->push_back(id);
} else {
// recover
try {
@@ -246,7 +222,7 @@ namespace NKikimr {
if (numMissingParts) {
// this blob is not fully replicated yet
- UnreplicatedBlobsPtr->push(id);
+ UnreplicatedBlobsPtr->push_back(id);
}
if (numSmallParts) {
@@ -259,26 +235,37 @@ namespace NKikimr {
ReplInfo->HugeLogoBlobsRecovered += numHuge;
ReplInfo->BytesRecovered += partsSize;
ReplInfo->PartsMissing += numMissingParts;
-
- // count recovered parts
- countAsRecovered = true;
-
ReplInfo->DataRecoverySuccess++;
+ ReplInfo->WorkUnitsProcessed += id.BlobSize();
+ Y_VERIFY_DEBUG(ReplInfo->WorkUnitsProcessed <= ReplInfo->WorkUnitsPlanned);
+
+ if (!numMissingParts) {
+ ReplCtx->MonGroup.ReplWorkUnitsDone() += id.BlobSize();
+ ReplCtx->MonGroup.ReplWorkUnitsRemaining() -= id.BlobSize();
+ }
+ if (lost.PossiblePhantom) {
+ ++ReplCtx->MonGroup.ReplPhantomLikeRecovered();
+ }
} catch (const std::exception& ex) {
++ReplCtx->MonGroup.ReplRecoveryGroupTypeErrors();
STLOG(PRI_ERROR, BS_REPL, BSVR29, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "recovery exception"),
(BlobId, id), (Error, TString(ex.what())));
ReplInfo->DataRecoveryFailure++;
- UnreplicatedBlobsPtr->push(id);
+ UnreplicatedBlobsPtr->push_back(id);
}
}
- if (countAsRecovered) {
- ReplCtx->MonGroup.ReplCurrentUnreplicatedBytes() -= partsSize;
- ReplCtx->MonGroup.ReplCurrentUnreplicatedParts() -= parts.CountBits();
- }
-
LostVec.pop_front();
+ return true;
+ }
+
+ void ProcessPhantomBlob(const TLogoBlobID& id, NMatrix::TVectorType parts, bool isPhantom) {
+ STLOG(PRI_INFO, BS_REPL, BSVR00, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "phantom check completed"),
+ (BlobId, id), (Parts, parts), (IsPhantom, isPhantom));
+
+ ++(isPhantom
+ ? ReplCtx->MonGroup.ReplPhantomLikeDropped()
+ : ReplCtx->MonGroup.ReplPhantomLikeUnrecovered());
}
// finish work
@@ -350,7 +337,6 @@ namespace NKikimr {
TDeque<TLogoBlobID> MetadataParts;
TRopeArena Arena;
std::optional<TLogoBlobID> LastRecoveredId;
- std::optional<TLogoBlobID> PhantomCheckPending;
void AddBlobToQueue(const TLogoBlobID& id, TRope blob, NMatrix::TVectorType parts, bool isHugeBlob,
TRecoveredBlobsQueue& rbq) {
@@ -393,7 +379,10 @@ namespace NKikimr {
STLOG(PRI_INFO, BS_REPL, BSVR31, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "TRecoveryMachine::SkipItem"),
(BlobId, item.Id));
++ReplInfo->DataRecoverySkip;
- UnreplicatedBlobsPtr->push(item.Id);
+ UnreplicatedBlobsPtr->push_back(item.Id);
+ if (item.PossiblePhantom) {
+ ++ReplCtx->MonGroup.ReplPhantomLikeUnrecovered();
+ }
}
};
diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp
index 83d376047b0..fe835e0eacb 100644
--- a/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp
+++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp
@@ -86,6 +86,7 @@ namespace NKikimr {
auto groupInfo = MakeIntrusive<TBlobStorageGroupInfo>(TBlobStorageGroupType::Erasure4Plus2Block);
auto replCtx = CreateReplCtx(vdisks, groupInfo);
auto info = MakeIntrusive<TEvReplFinished::TInfo>();
+ info->WorkUnitsPlanned = Max<ui64>();
TBlobIdQueuePtr unreplicatedBlobsPtr = std::make_shared<TBlobIdQueue>();
NRepl::TRecoveryMachine m(replCtx, info, unreplicatedBlobsPtr);
TMap<TLogoBlobID, TVector<TString>> data = GenerateData(10000, 1024, groupInfo, vdisks);
@@ -148,9 +149,9 @@ namespace NKikimr {
p.AddData(0, TLogoBlobID(id, partIndex + 1), NKikimrProto::OK, v[i]);
}
NRepl::TRecoveryMachine::TRecoveredBlobsQueue rbq;
- NRepl::TRecoveryMachine::EPhantomState phantom = NRepl::TRecoveryMachine::EPhantomState::Unknown;
- m.Recover(id, p, rbq, phantom);
- Y_VERIFY(phantom == NRepl::TRecoveryMachine::EPhantomState::Unknown);
+ NMatrix::TVectorType parts;
+ const bool success = m.Recover(id, p, rbq, parts);
+ Y_VERIFY(success);
ui8 partIndex;
for (partIndex = 0; partIndex < groupInfo->Type.BlobSubgroupSize(); ++partIndex) {
diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp
index afd133c51b0..dc85118ddfd 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp
+++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp
@@ -86,15 +86,13 @@ namespace NKikimr {
}
ctx.Send(*SkeletonFrontIDPtr, ev.release());
// send VDisk's metric to NodeWarden
- if (OverloadHandler) {
- ctx.Send(NodeWardenServiceId,
- new TEvBlobStorage::TEvControllerUpdateDiskStatus(
- SelfVDiskId,
- OverloadHandler->GetIntegralRankPercent(),
- SelfId().NodeId(),
- Config->BaseInfo.PDiskId,
- Config->BaseInfo.VDiskSlotId));
- }
+ ctx.Send(NodeWardenServiceId,
+ new TEvBlobStorage::TEvControllerUpdateDiskStatus(
+ SelfVDiskId,
+ OverloadHandler ? OverloadHandler->GetIntegralRankPercent() : 0,
+ SelfId().NodeId(),
+ Config->BaseInfo.PDiskId,
+ Config->BaseInfo.VDiskSlotId));
// repeat later
ctx.Schedule(Config->WhiteboardUpdateInterval, new TEvTimeToUpdateWhiteboard());
}
diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp
index 3cd92fbe56d..354302d1d02 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp
+++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp
@@ -903,10 +903,8 @@ namespace NKikimr {
const auto state = VDiskMonGroup.VDiskState();
// replicated?
bool replicated = !ReplMonGroup.ReplUnreplicatedVDisks() && !HasUnreadableBlobs;
- bool unreplicatedPhantoms = ReplMonGroup.ReplCurrentNumUnrecoveredPhantomBlobs() +
- ReplMonGroup.ReplNumUnrecoveredPhantomBlobs();
- bool unreplicatedNonPhantoms = ReplMonGroup.ReplCurrentNumUnrecoveredNonPhantomBlobs() +
- ReplMonGroup.ReplNumUnrecoveredNonPhantomBlobs();
+ bool unreplicatedPhantoms = ReplMonGroup.ReplUnreplicatedPhantoms();
+ bool unreplicatedNonPhantoms = ReplMonGroup.ReplUnreplicatedNonPhantoms();
// unsynced VDisks
ui64 unsyncedVDisks = SyncerMonGroup.SyncerUnsyncedDisks();
// calculate cumulative status of Skeleton Front overload
@@ -917,8 +915,16 @@ namespace NKikimr {
light = Max(light, queue->GetCumulativeLight());
}
// send a message to Whiteboard
- ctx.Send(SelfId(), new NNodeWhiteboard::TEvWhiteboard::TEvVDiskStateUpdate(state, outOfSpaceFlags,
- replicated, unreplicatedPhantoms, unreplicatedNonPhantoms, unsyncedVDisks, light, HasUnreadableBlobs));
+ auto ev = std::make_unique<NNodeWhiteboard::TEvWhiteboard::TEvVDiskStateUpdate>(state, outOfSpaceFlags,
+ replicated, unreplicatedPhantoms, unreplicatedNonPhantoms, unsyncedVDisks, light, HasUnreadableBlobs);
+ if (ReplMonGroup.ReplUnreplicatedVDisks()) {
+ const ui64 a = ReplMonGroup.ReplWorkUnitsDone();
+ const ui64 b = ReplMonGroup.ReplWorkUnitsRemaining();
+ ev->Record.SetReplicationProgress((double)a / (a + b));
+ } else {
+ ev->Record.SetReplicationProgress(1.0);
+ }
+ ctx.Send(SelfId(), ev.release());
// repeat later
if (schedule) {
ctx.Schedule(Config->WhiteboardUpdateInterval, new TEvTimeToUpdateWhiteboard);
diff --git a/ydb/core/protos/node_whiteboard.proto b/ydb/core/protos/node_whiteboard.proto
index b15e8b09929..4e44eea60c9 100644
--- a/ydb/core/protos/node_whiteboard.proto
+++ b/ydb/core/protos/node_whiteboard.proto
@@ -182,6 +182,8 @@ message TVDiskStateInfo {
optional bool UnreplicatedPhantoms = 20 [default = false];
// The same for the non-phantom-like blobs.
optional bool UnreplicatedNonPhantoms = 21 [default = false];
+ // Replication progress (0 to 1). Only for replication, not blob scrubbing.
+ optional float ReplicationProgress = 30;
// How many unsynced VDisks from current BlobStorage group we see
optional uint64 UnsyncedVDisks = 15 [default = 0];
// How much this VDisk have allocated on corresponding PDisk