aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-11-15 20:38:59 +0300
committeralexvru <alexvru@ydb.tech>2023-11-15 21:39:49 +0300
commitcb57bb54c10bb28c7623fa3f6e4d928172d9c17e (patch)
tree34f71bd2839b7c3ec5de6011b4b657eb7e2cefc3
parentf2ed702991f7f223ce4e47c08976553dd9a3c6cc (diff)
downloadydb-cb57bb54c10bb28c7623fa3f6e4d928172d9c17e.tar.gz
Fix replication counters KIKIMR-20008
-rw-r--r--ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp41
-rw-r--r--ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp3
2 files changed, 18 insertions, 26 deletions
diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp
index c1d4c81ffb..2486d9530d 100644
--- a/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp
+++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp
@@ -25,11 +25,14 @@ namespace NKikimr {
std::unique_ptr<TRecoveryMachine> RecoveryMachine;
TLogoBlobID LastKey;
bool Eof;
+ std::deque<TLogoBlobID> DroppedBlobs;
- TEvReplPlanFinished(std::unique_ptr<TRecoveryMachine>&& recoveryMachine, const TLogoBlobID& lastKey, bool eof)
+ TEvReplPlanFinished(std::unique_ptr<TRecoveryMachine>&& recoveryMachine, const TLogoBlobID& lastKey, bool eof,
+ std::deque<TLogoBlobID>&& droppedBlobs)
: RecoveryMachine(std::move(recoveryMachine))
, LastKey(lastKey)
, Eof(eof)
+ , DroppedBlobs(std::move(droppedBlobs))
{}
};
@@ -46,6 +49,7 @@ namespace NKikimr {
TEvReplFinished::TInfoPtr ReplInfo;
TBlobIdQueuePtr BlobsToReplicatePtr;
TBlobIdQueuePtr UnreplicatedBlobsPtr;
+ std::deque<TLogoBlobID> DroppedBlobs;
ui64 QuantumBytes = 0;
bool AddingTasks = true;
@@ -91,8 +95,10 @@ namespace NKikimr {
} else {
const TLogoBlobID& key = BlobsToReplicatePtr->front();
it.Seek(key);
- if (it.Valid() && it.GetCurKey().LogoBlobID() == key) {
+ const bool processed = it.Valid() && it.GetCurKey().LogoBlobID() == key &&
ProcessItem(it, *barriers, allowKeepFlags);
+ if (!processed) {
+ DroppedBlobs.push_back(key);
}
}
}
@@ -141,13 +147,14 @@ namespace NKikimr {
}
// the planning stage has finished, issue reply to the job actor
- Send(Recipient, new TEvReplPlanFinished(std::move(RecoveryMachine), KeyToResumeNextTime.value_or(TLogoBlobID()), eof));
+ Send(Recipient, new TEvReplPlanFinished(std::move(RecoveryMachine), KeyToResumeNextTime.value_or(TLogoBlobID()),
+ eof, std::move(DroppedBlobs)));
// finish processing for this actor
PassAway();
}
- void ProcessItem(const TLogoBlobsSnapshot::TIndexForwardIterator& it,
+ bool ProcessItem(const TLogoBlobsSnapshot::TIndexForwardIterator& it,
TBarriersSnapshot::TBarriersEssence& barriers, bool allowKeepFlags) {
// aliases for convenient access
const TBlobStorageGroupInfo::TTopology& topology = *ReplCtx->VCtx->Top;
@@ -158,13 +165,13 @@ namespace NKikimr {
NMatrix::TVectorType parts = ingress.PartsWeMustHaveLocally(&topology, ReplCtx->VCtx->ShortSelfVDisk,
key) - ingress.LocalParts(topology.GType);
if (parts.Empty()) {
- return; // nothing to recover
+ return false; // nothing to recover
}
const NGc::TKeepStatus status = barriers.Keep(key, it.GetMemRec(), it.GetMemRecsMerged(), allowKeepFlags,
true /*allowGarbageCollection*/);
if (!status.KeepData) {
- return; // no need to recover
+ return false; // no need to recover
}
// scan for metadata parts
@@ -200,6 +207,8 @@ namespace NKikimr {
if (RecoveryMachine->FullOfTasks() || QuantumBytes >= ReplCtx->VDiskCfg->ReplMaxQuantumBytes) {
AddingTasks = false;
}
+
+ return true;
}
STRICT_STFUNC(StateFunc,
@@ -323,12 +332,6 @@ namespace NKikimr {
void Bootstrap() {
STLOG(PRI_DEBUG, BS_REPL, BSVR02, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "THullReplJobActor::Bootstrap"));
- if (BlobsToReplicatePtr) {
- for (const TLogoBlobID& id : *BlobsToReplicatePtr) {
- Y_ABORT_UNLESS(UnreplicatedBlobRecords.contains(id));
- }
- }
-
TimeAccount.SetState(ETimeState::PREPARE_PLAN);
auto actor = std::make_unique<THullReplPlannerActor>(ReplCtx, GInfo, StartKey, ReplInfo, BlobsToReplicatePtr, UnreplicatedBlobsPtr);
auto aid = RunInBatchPool(ActorContext(), actor.release());
@@ -343,18 +346,8 @@ namespace NKikimr {
LastKey = ev->Get()->LastKey;
Eof = ev->Get()->Eof;
- if (BlobsToReplicatePtr) {
- TUnreplicatedBlobRecords temp;
- RecoveryMachine->ForEach([&](TLogoBlobID id, NMatrix::TVectorType /*partsToRecover*/, TIngress /*ingress*/) {
- auto nh = UnreplicatedBlobRecords.extract(id);
- Y_ABORT_UNLESS(nh);
- temp.insert(std::move(nh));
- });
- for (auto it = UnreplicatedBlobRecords.begin(); it != UnreplicatedBlobRecords.end(); ) {
- const auto& [key, value] = *it++;
- DropUnreplicatedBlobRecord(key);
- }
- UnreplicatedBlobRecords = std::move(temp);
+ for (const TLogoBlobID& id : ev->Get()->DroppedBlobs) {
+ DropUnreplicatedBlobRecord(id);
}
auto& mon = ReplCtx->MonGroup;
diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp
index a4ce0107c9..3c88c998a3 100644
--- a/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp
+++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp
@@ -349,8 +349,7 @@ namespace NKikimr {
bool finished = false;
if (info->Eof) { // when it is the last quantum for some donor, rotate the blob sets
- BlobsToReplicatePtr = std::move(UnreplicatedBlobsPtr);
- UnreplicatedBlobsPtr = std::make_shared<TBlobIdQueue>();
+ BlobsToReplicatePtr = std::exchange(UnreplicatedBlobsPtr, std::make_shared<TBlobIdQueue>());
#ifndef NDEBUG
Y_VERIFY_DEBUG_S(BlobsToReplicatePtr->size() == UnreplicatedBlobRecords.size(),