diff options
| author | Alexander Rutkovsky <[email protected]> | 2024-07-10 13:52:02 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-07-10 13:52:02 +0300 |
| commit | d32d5d0eb8e88a69698721f39935990a6f0d3085 (patch) | |
| tree | 17d2b9af0e762f375be296583157b2923dea91a1 | |
| parent | 456c93bd184bd7bc77875e93d5a6a3784a87eeae (diff) | |
Improve replication code (#6501)
| -rw-r--r-- | ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp | 28 | ||||
| -rw-r--r-- | ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp | 12 | ||||
| -rw-r--r-- | ydb/core/blobstorage/vdisk/repl/blobstorage_repl.h | 38 |
3 files changed, 54 insertions, 24 deletions
diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp index 58a54e939d0..b77498cc6c1 100644 --- a/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp +++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp @@ -58,10 +58,8 @@ namespace NKikimr { Recipient = parentId; // count unreplicated so far blobs in this work too - for (const TLogoBlobID& id : *UnreplicatedBlobsPtr) { - ReplInfo->WorkUnitsTotal += id.BlobSize(); - } - ReplInfo->ItemsTotal += UnreplicatedBlobsPtr->size(); + ReplInfo->WorkUnitsTotal += UnreplicatedBlobsPtr->GetNumWorkUnits(); + ReplInfo->ItemsTotal += UnreplicatedBlobsPtr->GetNumItems(); // prepare the recovery machine RecoveryMachine = std::make_unique<TRecoveryMachine>(ReplCtx, ReplInfo); @@ -88,12 +86,12 @@ namespace NKikimr { if (BlobsToReplicatePtr) { // iterate over queue items and match them with iterator - for (; !BlobsToReplicatePtr->empty() && AddingTasks; BlobsToReplicatePtr->pop_front()) { + for (; !BlobsToReplicatePtr->IsEmpty() && AddingTasks; BlobsToReplicatePtr->PopFront()) { if (++counter % 1024 == 0 && GetCycleCountFast() >= plannedEndTime) { Send(ReplCtx->SkeletonId, new TEvTakeHullSnapshot(true)); return; } else { - const TLogoBlobID& key = BlobsToReplicatePtr->front(); + const TLogoBlobID& key = BlobsToReplicatePtr->Front(); it.Seek(key); const bool processed = it.Valid() && it.GetCurKey().LogoBlobID() == key && ProcessItem(it, *barriers, allowKeepFlags); @@ -102,13 +100,9 @@ namespace NKikimr { } } } - if (!AddingTasks) { - for (const TLogoBlobID& key : *BlobsToReplicatePtr) { - ReplInfo->WorkUnitsTotal += key.BlobSize(); - } - ReplInfo->ItemsTotal += BlobsToReplicatePtr->size(); - } - eof = BlobsToReplicatePtr->empty(); + ReplInfo->WorkUnitsTotal += BlobsToReplicatePtr->GetNumWorkUnits(); + ReplInfo->ItemsTotal += BlobsToReplicatePtr->GetNumItems(); + eof = BlobsToReplicatePtr->IsEmpty(); } else { // scan through the index until we have enough blobs to recover or the time is out const TBlobStorageGroupInfo::TTopology& topology = *ReplCtx->VCtx->Top; @@ -360,8 +354,8 @@ namespace NKikimr { (ReplItemsRemaining, (ui64)mon.ReplItemsRemaining()), (LastKey, LastKey), (Eof, Eof), - (BlobsToReplicatePtr.size, ssize_t(BlobsToReplicatePtr ? BlobsToReplicatePtr->size() : (ssize_t)-1)), - (UnreplicatedBlobsPtr.size, UnreplicatedBlobsPtr->size())); + (BlobsToReplicatePtr.size, ssize_t(BlobsToReplicatePtr ? BlobsToReplicatePtr->GetNumItems() : (ssize_t)-1)), + (UnreplicatedBlobsPtr.size, UnreplicatedBlobsPtr->GetNumItems())); } mon.ReplWorkUnitsRemaining() = ReplInfo->WorkUnitsTotal; @@ -651,7 +645,7 @@ namespace NKikimr { (RecoveryQueueSize, RecoveryQueue.size())); // sort unreplicated blobs vector as it may contain records in incorrect order due to phantom checking - std::sort(UnreplicatedBlobsPtr->begin(), UnreplicatedBlobsPtr->end()); + UnreplicatedBlobsPtr->Sort(); return true; } @@ -762,7 +756,7 @@ namespace NKikimr { } else if (record.LooksLikePhantom) { ++ReplCtx->MonGroup.ReplPhantomBlobsWithProblems(); } - UnreplicatedBlobsPtr->push_back(item.Id); + UnreplicatedBlobsPtr->Push(item.Id); } void DropUnreplicatedBlobRecord(const TLogoBlobID& id) { diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp index 45f7f4250a0..c7429732009 100644 --- a/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp +++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp @@ -367,15 +367,15 @@ namespace NKikimr { BlobsToReplicatePtr = std::exchange(UnreplicatedBlobsPtr, std::make_shared<TBlobIdQueue>()); #ifndef NDEBUG - Y_VERIFY_DEBUG_S(BlobsToReplicatePtr->size() == UnreplicatedBlobRecords.size(), - "BlobsToReplicatePtr->size# " << BlobsToReplicatePtr->size() + Y_VERIFY_DEBUG_S(BlobsToReplicatePtr->GetNumItems() == UnreplicatedBlobRecords.size(), + "BlobsToReplicatePtr->size# " << BlobsToReplicatePtr->GetNumItems() << " UnreplicatedBlobRecords.size# " << UnreplicatedBlobRecords.size()); - for (const TLogoBlobID& id : *BlobsToReplicatePtr) { + for (const TLogoBlobID& id : BlobsToReplicatePtr->Queue) { Y_DEBUG_ABORT_UNLESS(UnreplicatedBlobRecords.contains(id)); } #endif - if (BlobsToReplicatePtr->empty()) { + if (BlobsToReplicatePtr->IsEmpty()) { // no more blobs to replicate -- consider replication finished finished = true; for (const auto& donor : std::exchange(DonorQueue, {})) { @@ -402,7 +402,7 @@ namespace NKikimr { if (finished) { STLOG(PRI_DEBUG, BS_REPL, BSVR17, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "REPL COMPLETED"), - (BlobsToReplicate, BlobsToReplicatePtr->size())); + (BlobsToReplicate, BlobsToReplicatePtr->GetNumItems())); LastReplEnd = now; if (State == WaitQueues || State == Replication) { @@ -412,7 +412,7 @@ namespace NKikimr { ResetReplProgressTimer(true); Become(&TThis::StateRelax); - if (!BlobsToReplicatePtr->empty()) { + if (!BlobsToReplicatePtr->IsEmpty()) { // try again for unreplicated blobs in some future State = Relaxation; Schedule(ReplCtx->VDiskCfg->ReplTimeInterval, new TEvents::TEvWakeup); diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.h b/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.h index 00c0a792555..acfcbc6d74b 100644 --- a/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.h +++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.h @@ -11,7 +11,43 @@ namespace NKikimr { struct TProxyStat; }; - using TBlobIdQueue = std::deque<TLogoBlobID>; + struct TBlobIdQueue { + std::deque<TLogoBlobID> Queue; + ui64 WorkUnits = 0; + + void Push(const TLogoBlobID& id) { + WorkUnits += id.BlobSize(); + Queue.push_back(id); + } + + void PopFront() { + WorkUnits -= Queue.front().BlobSize(); + Queue.pop_front(); + } + + bool IsEmpty() const { + return Queue.empty(); + } + + TLogoBlobID Front() const { + return Queue.front(); + } + + size_t GetNumItems() const { + return Queue.size(); + } + + ui64 GetNumWorkUnits() const { + return WorkUnits; + } + + void Sort() { + if (!std::is_sorted(Queue.begin(), Queue.end())) { + std::sort(Queue.begin(), Queue.end()); + } + } + }; + using TBlobIdQueuePtr = std::shared_ptr<TBlobIdQueue>; struct TUnreplicatedBlobRecord { // for monitoring purposes |
