diff options
author | alexvru <alexvru@ydb.tech> | 2023-11-15 20:38:59 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-11-15 21:39:49 +0300 |
commit | cb57bb54c10bb28c7623fa3f6e4d928172d9c17e (patch) | |
tree | 34f71bd2839b7c3ec5de6011b4b657eb7e2cefc3 | |
parent | f2ed702991f7f223ce4e47c08976553dd9a3c6cc (diff) | |
download | ydb-cb57bb54c10bb28c7623fa3f6e4d928172d9c17e.tar.gz |
Fix replication counters KIKIMR-20008
-rw-r--r-- | ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp | 41 | ||||
-rw-r--r-- | ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp | 3 |
2 files changed, 18 insertions, 26 deletions
diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp index c1d4c81ffb..2486d9530d 100644 --- a/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp +++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp @@ -25,11 +25,14 @@ namespace NKikimr { std::unique_ptr<TRecoveryMachine> RecoveryMachine; TLogoBlobID LastKey; bool Eof; + std::deque<TLogoBlobID> DroppedBlobs; - TEvReplPlanFinished(std::unique_ptr<TRecoveryMachine>&& recoveryMachine, const TLogoBlobID& lastKey, bool eof) + TEvReplPlanFinished(std::unique_ptr<TRecoveryMachine>&& recoveryMachine, const TLogoBlobID& lastKey, bool eof, + std::deque<TLogoBlobID>&& droppedBlobs) : RecoveryMachine(std::move(recoveryMachine)) , LastKey(lastKey) , Eof(eof) + , DroppedBlobs(std::move(droppedBlobs)) {} }; @@ -46,6 +49,7 @@ namespace NKikimr { TEvReplFinished::TInfoPtr ReplInfo; TBlobIdQueuePtr BlobsToReplicatePtr; TBlobIdQueuePtr UnreplicatedBlobsPtr; + std::deque<TLogoBlobID> DroppedBlobs; ui64 QuantumBytes = 0; bool AddingTasks = true; @@ -91,8 +95,10 @@ namespace NKikimr { } else { const TLogoBlobID& key = BlobsToReplicatePtr->front(); it.Seek(key); - if (it.Valid() && it.GetCurKey().LogoBlobID() == key) { + const bool processed = it.Valid() && it.GetCurKey().LogoBlobID() == key && ProcessItem(it, *barriers, allowKeepFlags); + if (!processed) { + DroppedBlobs.push_back(key); } } } @@ -141,13 +147,14 @@ namespace NKikimr { } // the planning stage has finished, issue reply to the job actor - Send(Recipient, new TEvReplPlanFinished(std::move(RecoveryMachine), KeyToResumeNextTime.value_or(TLogoBlobID()), eof)); + Send(Recipient, new TEvReplPlanFinished(std::move(RecoveryMachine), KeyToResumeNextTime.value_or(TLogoBlobID()), + eof, std::move(DroppedBlobs))); // finish processing for this actor PassAway(); } - void ProcessItem(const TLogoBlobsSnapshot::TIndexForwardIterator& it, + bool ProcessItem(const TLogoBlobsSnapshot::TIndexForwardIterator& it, TBarriersSnapshot::TBarriersEssence& barriers, bool allowKeepFlags) { // aliases for convenient access const TBlobStorageGroupInfo::TTopology& topology = *ReplCtx->VCtx->Top; @@ -158,13 +165,13 @@ namespace NKikimr { NMatrix::TVectorType parts = ingress.PartsWeMustHaveLocally(&topology, ReplCtx->VCtx->ShortSelfVDisk, key) - ingress.LocalParts(topology.GType); if (parts.Empty()) { - return; // nothing to recover + return false; // nothing to recover } const NGc::TKeepStatus status = barriers.Keep(key, it.GetMemRec(), it.GetMemRecsMerged(), allowKeepFlags, true /*allowGarbageCollection*/); if (!status.KeepData) { - return; // no need to recover + return false; // no need to recover } // scan for metadata parts @@ -200,6 +207,8 @@ namespace NKikimr { if (RecoveryMachine->FullOfTasks() || QuantumBytes >= ReplCtx->VDiskCfg->ReplMaxQuantumBytes) { AddingTasks = false; } + + return true; } STRICT_STFUNC(StateFunc, @@ -323,12 +332,6 @@ namespace NKikimr { void Bootstrap() { STLOG(PRI_DEBUG, BS_REPL, BSVR02, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "THullReplJobActor::Bootstrap")); - if (BlobsToReplicatePtr) { - for (const TLogoBlobID& id : *BlobsToReplicatePtr) { - Y_ABORT_UNLESS(UnreplicatedBlobRecords.contains(id)); - } - } - TimeAccount.SetState(ETimeState::PREPARE_PLAN); auto actor = std::make_unique<THullReplPlannerActor>(ReplCtx, GInfo, StartKey, ReplInfo, BlobsToReplicatePtr, UnreplicatedBlobsPtr); auto aid = RunInBatchPool(ActorContext(), actor.release()); @@ -343,18 +346,8 @@ namespace NKikimr { LastKey = ev->Get()->LastKey; Eof = ev->Get()->Eof; - if (BlobsToReplicatePtr) { - TUnreplicatedBlobRecords temp; - RecoveryMachine->ForEach([&](TLogoBlobID id, NMatrix::TVectorType /*partsToRecover*/, TIngress /*ingress*/) { - auto nh = UnreplicatedBlobRecords.extract(id); - Y_ABORT_UNLESS(nh); - temp.insert(std::move(nh)); - }); - for (auto it = UnreplicatedBlobRecords.begin(); it != UnreplicatedBlobRecords.end(); ) { - const auto& [key, value] = *it++; - DropUnreplicatedBlobRecord(key); - } - UnreplicatedBlobRecords = std::move(temp); + for (const TLogoBlobID& id : ev->Get()->DroppedBlobs) { + DropUnreplicatedBlobRecord(id); } auto& mon = ReplCtx->MonGroup; diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp index a4ce0107c9..3c88c998a3 100644 --- a/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp +++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp @@ -349,8 +349,7 @@ namespace NKikimr { bool finished = false; if (info->Eof) { // when it is the last quantum for some donor, rotate the blob sets - BlobsToReplicatePtr = std::move(UnreplicatedBlobsPtr); - UnreplicatedBlobsPtr = std::make_shared<TBlobIdQueue>(); + BlobsToReplicatePtr = std::exchange(UnreplicatedBlobsPtr, std::make_shared<TBlobIdQueue>()); #ifndef NDEBUG Y_VERIFY_DEBUG_S(BlobsToReplicatePtr->size() == UnreplicatedBlobRecords.size(), |