summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexander Rutkovsky <[email protected]>2024-07-10 13:52:02 +0300
committerGitHub <[email protected]>2024-07-10 13:52:02 +0300
commitd32d5d0eb8e88a69698721f39935990a6f0d3085 (patch)
tree17d2b9af0e762f375be296583157b2923dea91a1
parent456c93bd184bd7bc77875e93d5a6a3784a87eeae (diff)
Improve replication code (#6501)
-rw-r--r--ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp28
-rw-r--r--ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp12
-rw-r--r--ydb/core/blobstorage/vdisk/repl/blobstorage_repl.h38
3 files changed, 54 insertions, 24 deletions
diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp
index 58a54e939d0..b77498cc6c1 100644
--- a/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp
+++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp
@@ -58,10 +58,8 @@ namespace NKikimr {
Recipient = parentId;
// count unreplicated so far blobs in this work too
- for (const TLogoBlobID& id : *UnreplicatedBlobsPtr) {
- ReplInfo->WorkUnitsTotal += id.BlobSize();
- }
- ReplInfo->ItemsTotal += UnreplicatedBlobsPtr->size();
+ ReplInfo->WorkUnitsTotal += UnreplicatedBlobsPtr->GetNumWorkUnits();
+ ReplInfo->ItemsTotal += UnreplicatedBlobsPtr->GetNumItems();
// prepare the recovery machine
RecoveryMachine = std::make_unique<TRecoveryMachine>(ReplCtx, ReplInfo);
@@ -88,12 +86,12 @@ namespace NKikimr {
if (BlobsToReplicatePtr) {
// iterate over queue items and match them with iterator
- for (; !BlobsToReplicatePtr->empty() && AddingTasks; BlobsToReplicatePtr->pop_front()) {
+ for (; !BlobsToReplicatePtr->IsEmpty() && AddingTasks; BlobsToReplicatePtr->PopFront()) {
if (++counter % 1024 == 0 && GetCycleCountFast() >= plannedEndTime) {
Send(ReplCtx->SkeletonId, new TEvTakeHullSnapshot(true));
return;
} else {
- const TLogoBlobID& key = BlobsToReplicatePtr->front();
+ const TLogoBlobID& key = BlobsToReplicatePtr->Front();
it.Seek(key);
const bool processed = it.Valid() && it.GetCurKey().LogoBlobID() == key &&
ProcessItem(it, *barriers, allowKeepFlags);
@@ -102,13 +100,9 @@ namespace NKikimr {
}
}
}
- if (!AddingTasks) {
- for (const TLogoBlobID& key : *BlobsToReplicatePtr) {
- ReplInfo->WorkUnitsTotal += key.BlobSize();
- }
- ReplInfo->ItemsTotal += BlobsToReplicatePtr->size();
- }
- eof = BlobsToReplicatePtr->empty();
+ ReplInfo->WorkUnitsTotal += BlobsToReplicatePtr->GetNumWorkUnits();
+ ReplInfo->ItemsTotal += BlobsToReplicatePtr->GetNumItems();
+ eof = BlobsToReplicatePtr->IsEmpty();
} else {
// scan through the index until we have enough blobs to recover or the time is out
const TBlobStorageGroupInfo::TTopology& topology = *ReplCtx->VCtx->Top;
@@ -360,8 +354,8 @@ namespace NKikimr {
(ReplItemsRemaining, (ui64)mon.ReplItemsRemaining()),
(LastKey, LastKey),
(Eof, Eof),
- (BlobsToReplicatePtr.size, ssize_t(BlobsToReplicatePtr ? BlobsToReplicatePtr->size() : (ssize_t)-1)),
- (UnreplicatedBlobsPtr.size, UnreplicatedBlobsPtr->size()));
+ (BlobsToReplicatePtr.size, ssize_t(BlobsToReplicatePtr ? BlobsToReplicatePtr->GetNumItems() : (ssize_t)-1)),
+ (UnreplicatedBlobsPtr.size, UnreplicatedBlobsPtr->GetNumItems()));
}
mon.ReplWorkUnitsRemaining() = ReplInfo->WorkUnitsTotal;
@@ -651,7 +645,7 @@ namespace NKikimr {
(RecoveryQueueSize, RecoveryQueue.size()));
// sort unreplicated blobs vector as it may contain records in incorrect order due to phantom checking
- std::sort(UnreplicatedBlobsPtr->begin(), UnreplicatedBlobsPtr->end());
+ UnreplicatedBlobsPtr->Sort();
return true;
}
@@ -762,7 +756,7 @@ namespace NKikimr {
} else if (record.LooksLikePhantom) {
++ReplCtx->MonGroup.ReplPhantomBlobsWithProblems();
}
- UnreplicatedBlobsPtr->push_back(item.Id);
+ UnreplicatedBlobsPtr->Push(item.Id);
}
void DropUnreplicatedBlobRecord(const TLogoBlobID& id) {
diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp
index 45f7f4250a0..c7429732009 100644
--- a/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp
+++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp
@@ -367,15 +367,15 @@ namespace NKikimr {
BlobsToReplicatePtr = std::exchange(UnreplicatedBlobsPtr, std::make_shared<TBlobIdQueue>());
#ifndef NDEBUG
- Y_VERIFY_DEBUG_S(BlobsToReplicatePtr->size() == UnreplicatedBlobRecords.size(),
- "BlobsToReplicatePtr->size# " << BlobsToReplicatePtr->size()
+ Y_VERIFY_DEBUG_S(BlobsToReplicatePtr->GetNumItems() == UnreplicatedBlobRecords.size(),
+ "BlobsToReplicatePtr->size# " << BlobsToReplicatePtr->GetNumItems()
<< " UnreplicatedBlobRecords.size# " << UnreplicatedBlobRecords.size());
- for (const TLogoBlobID& id : *BlobsToReplicatePtr) {
+ for (const TLogoBlobID& id : BlobsToReplicatePtr->Queue) {
Y_DEBUG_ABORT_UNLESS(UnreplicatedBlobRecords.contains(id));
}
#endif
- if (BlobsToReplicatePtr->empty()) {
+ if (BlobsToReplicatePtr->IsEmpty()) {
// no more blobs to replicate -- consider replication finished
finished = true;
for (const auto& donor : std::exchange(DonorQueue, {})) {
@@ -402,7 +402,7 @@ namespace NKikimr {
if (finished) {
STLOG(PRI_DEBUG, BS_REPL, BSVR17, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "REPL COMPLETED"),
- (BlobsToReplicate, BlobsToReplicatePtr->size()));
+ (BlobsToReplicate, BlobsToReplicatePtr->GetNumItems()));
LastReplEnd = now;
if (State == WaitQueues || State == Replication) {
@@ -412,7 +412,7 @@ namespace NKikimr {
ResetReplProgressTimer(true);
Become(&TThis::StateRelax);
- if (!BlobsToReplicatePtr->empty()) {
+ if (!BlobsToReplicatePtr->IsEmpty()) {
// try again for unreplicated blobs in some future
State = Relaxation;
Schedule(ReplCtx->VDiskCfg->ReplTimeInterval, new TEvents::TEvWakeup);
diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.h b/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.h
index 00c0a792555..acfcbc6d74b 100644
--- a/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.h
+++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.h
@@ -11,7 +11,43 @@ namespace NKikimr {
struct TProxyStat;
};
- using TBlobIdQueue = std::deque<TLogoBlobID>;
+ struct TBlobIdQueue {
+ std::deque<TLogoBlobID> Queue;
+ ui64 WorkUnits = 0;
+
+ void Push(const TLogoBlobID& id) {
+ WorkUnits += id.BlobSize();
+ Queue.push_back(id);
+ }
+
+ void PopFront() {
+ WorkUnits -= Queue.front().BlobSize();
+ Queue.pop_front();
+ }
+
+ bool IsEmpty() const {
+ return Queue.empty();
+ }
+
+ TLogoBlobID Front() const {
+ return Queue.front();
+ }
+
+ size_t GetNumItems() const {
+ return Queue.size();
+ }
+
+ ui64 GetNumWorkUnits() const {
+ return WorkUnits;
+ }
+
+ void Sort() {
+ if (!std::is_sorted(Queue.begin(), Queue.end())) {
+ std::sort(Queue.begin(), Queue.end());
+ }
+ }
+ };
+
using TBlobIdQueuePtr = std::shared_ptr<TBlobIdQueue>;
struct TUnreplicatedBlobRecord { // for monitoring purposes