diff options
author | alexvru <alexvru@ydb.tech> | 2023-01-24 16:02:16 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-01-24 16:02:16 +0300 |
commit | c4ba73bc1252fb6adf017574a7c9da046d1aa421 (patch) | |
tree | 3df885fc6a55de81d06b073ff12ced408ac29f91 | |
parent | 2921faed8aa81f1b094500a99de13115d1f01aeb (diff) | |
download | ydb-c4ba73bc1252fb6adf017574a7c9da046d1aa421.tar.gz |
Fix replication code
3 files changed, 33 insertions, 30 deletions
diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp index 437b514a10..3922576af3 100644 --- a/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp +++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp @@ -279,7 +279,7 @@ namespace NKikimr { friend class TActorBootstrapped<THullReplJobActor>; - std::optional<std::pair<TLogoBlobID, TRecoveryMachine::TPartSet>> CurrentKeyAndParts; + std::optional<TRecoveryMachine::TPartSet> CurrentItem; TLogoBlobID LastProcessedKey; void Finish() { @@ -529,16 +529,16 @@ namespace NKikimr { TimeAccount.SetState(ETimeState::MERGE); // acquire current key; front item contains the least key - if (!CurrentKeyAndParts) { + if (!CurrentItem) { const TLogoBlobID id = MergeHeap.front()->GenLogoBlobId(); - CurrentKeyAndParts.emplace(id, ReplCtx->VCtx->Top->GType); + CurrentItem.emplace(id, ReplCtx->VCtx->Top->GType); Y_VERIFY(std::exchange(LastProcessedKey, id) < id); } - auto& [currentKey, currentParts] = *CurrentKeyAndParts; + auto& item = *CurrentItem; // find out which proxies carry items with the same key TVector<TVDiskProxyPtr>::iterator lastIter = MergeHeap.end(); - while (lastIter != MergeHeap.begin() && MergeHeap.front()->GenLogoBlobId() == currentKey) { + while (lastIter != MergeHeap.begin() && MergeHeap.front()->GenLogoBlobId() == item.Id) { PopHeap(MergeHeap.begin(), lastIter, TVDiskProxy::TPtrGreater()); --lastIter; } @@ -549,17 +549,17 @@ namespace NKikimr { while (lastIter != MergeHeap.end()) { // process all items with specified current key TVDiskProxyPtr proxy = *lastIter; - while (proxy->Valid() && proxy->GenLogoBlobId() == currentKey) { + while (proxy->Valid() && proxy->GenLogoBlobId() == item.Id) { TLogoBlobID id; NKikimrProto::EReplyStatus status; TTrackableString data(TMemoryConsumer(ReplCtx->VCtx->Replication)); proxy->GetData(&id, &status, &data); if (status != NKikimrProto::OK || data.size()) { - currentParts.AddData(ReplCtx->VCtx->Top->GetOrderNumber(proxy->VDiskId), id, status, data.GetBaseConstRef()); + item.AddData(ReplCtx->VCtx->Top->GetOrderNumber(proxy->VDiskId), id, status, data.GetBaseConstRef()); } proxy->Next(); } - Y_VERIFY_DEBUG(!proxy->Valid() || currentKey < proxy->GenLogoBlobId()); + Y_VERIFY_DEBUG(!proxy->Valid() || item.Id < proxy->GenLogoBlobId()); // if proxy is not exhausted yet, then put it back into merge queue if (proxy->Valid()) { @@ -590,12 +590,12 @@ namespace NKikimr { // recover data NMatrix::TVectorType parts; - if (!RecoveryMachine->Recover(currentKey, currentParts, RecoveryQueue, parts)) { + if (!RecoveryMachine->Recover(item, RecoveryQueue, parts)) { STLOG(PRI_INFO, BS_REPL, BSVR33, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "Sending phantom validation query"), - (GroupId, GInfo->GroupID), (CurKey, currentKey)); - PhantomChecksPending.emplace_back(currentKey, parts); + (GroupId, GInfo->GroupID), (CurKey, item.Id)); + PhantomChecksPending.emplace_back(item.Id, parts); } - CurrentKeyAndParts.reset(); + CurrentItem.reset(); // process recovered items, if any; queueProcessed.first will be false when writer is not ready for new data EProcessQueueAction action = ProcessQueue(); diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h b/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h index e3035f5cb9..64dd28a3f5 100644 --- a/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h +++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h @@ -61,22 +61,26 @@ namespace NKikimr { using TRecoveredBlobsQueue = TQueue<TRecoveredBlobInfo>; struct TPartSet { + const TLogoBlobID Id; TDataPartSet PartSet; ui32 DisksRepliedOK = 0; ui32 DisksRepliedNODATA = 0; ui32 DisksRepliedNOT_YET = 0; ui32 DisksRepliedOther = 0; - TPartSet(TBlobStorageGroupType gtype) { + TPartSet(TLogoBlobID id, TBlobStorageGroupType gtype) + : Id(id) + { + PartSet.FullDataSize = Id.BlobSize(); PartSet.Parts.resize(gtype.TotalPartCount()); } void AddData(ui32 diskIdx, const TLogoBlobID& id, NKikimrProto::EReplyStatus status, TString data) { + Y_VERIFY(id.FullID() == Id); switch (status) { case NKikimrProto::OK: { const ui8 partIdx = id.PartId() - 1; Y_VERIFY(partIdx < PartSet.Parts.size()); - PartSet.FullDataSize = id.BlobSize(); PartSet.PartsMask |= 1 << partIdx; PartSet.Parts[partIdx].ReferenceTo(data); DisksRepliedOK |= 1 << diskIdx; @@ -118,7 +122,8 @@ namespace NKikimr { , Arena(&TRopeArenaBackend::Allocate) {} - bool Recover(const TLogoBlobID& id, TPartSet& partSet, TRecoveredBlobsQueue& rbq, NMatrix::TVectorType& parts) { + bool Recover(TPartSet& item, TRecoveredBlobsQueue& rbq, NMatrix::TVectorType& parts) { + const TLogoBlobID& id = item.Id; Y_VERIFY(!id.PartId()); Y_VERIFY(!LastRecoveredId || *LastRecoveredId < id); LastRecoveredId = id; @@ -147,15 +152,15 @@ namespace NKikimr { bool hasExactParts = false; bool needToRestore = false; for (ui8 i = parts.FirstPosition(); i != parts.GetSize(); i = parts.NextPosition(i)) { - if (partSet.PartSet.PartsMask & (1 << i)) { + if (item.PartSet.PartsMask & (1 << i)) { hasExactParts = true; } else { needToRestore = true; } } - Y_VERIFY_DEBUG((partSet.PartSet.PartsMask >> groupType.TotalPartCount()) == 0); - const ui32 presentParts = PopCount(partSet.PartSet.PartsMask); + Y_VERIFY_DEBUG((item.PartSet.PartsMask >> groupType.TotalPartCount()) == 0); + const ui32 presentParts = PopCount(item.PartSet.PartsMask); bool canRestore = presentParts >= groupType.MinimalRestorablePartCount(); bool nonPhantom = true; @@ -166,21 +171,19 @@ namespace NKikimr { } else { STLOG(PRI_INFO, BS_REPL, BSVR28, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "not enough data parts to recover"), (BlobId, id), (NumPresentParts, presentParts), (MinParts, groupType.DataParts()), - (PartSet, partSet.ToString()), (Ingress, lost.Ingress.ToString(ReplCtx->VCtx->Top.get(), + (PartSet, item.ToString()), (Ingress, lost.Ingress.ToString(ReplCtx->VCtx->Top.get(), ReplCtx->VCtx->ShortSelfVDisk, id))); BlobDone(id, false, &TEvReplFinished::TInfo::ItemsNotRecovered); } } else { // recover try { - Y_VERIFY(partSet.PartSet.FullDataSize == id.BlobSize()); + Y_VERIFY(item.PartSet.FullDataSize == id.BlobSize()); // PartSet contains some data, other data will be restored and written in the same PartSet - TRope recoveredData; if (canRestore && needToRestore) { - groupType.RestoreData((TErasureType::ECrcMode)id.CrcMode(), partSet.PartSet, recoveredData, - true, false, true); - partSet.PartSet.PartsMask = (1 << groupType.TotalPartCount()) - 1; + groupType.RestoreData((TErasureType::ECrcMode)id.CrcMode(), item.PartSet, true, false, true); + item.PartSet.PartsMask = (1 << groupType.TotalPartCount()) - 1; } ui32 numSmallParts = 0, numMissingParts = 0, numHuge = 0; @@ -188,7 +191,7 @@ namespace NKikimr { NMatrix::TVectorType small(0, parts.GetSize()); for (ui8 i = parts.FirstPosition(); i != parts.GetSize(); i = parts.NextPosition(i)) { - if (~partSet.PartSet.PartsMask & (1 << i)) { + if (~item.PartSet.PartsMask & (1 << i)) { ++numMissingParts; // ignore this missing part continue; } @@ -196,10 +199,10 @@ namespace NKikimr { const ui32 partSize = groupType.PartSize(partId); Y_VERIFY(partSize); // no metadata here partsSize += partSize; - TRope data(partSet.PartSet.Parts[i].OwnedString); // TODO(alexvru): employ rope in TDataPartSet + TRope data(item.PartSet.Parts[i].OwnedString); // TODO(alexvru): employ rope in TDataPartSet Y_VERIFY(data.GetSize() == partSize); if (ReplCtx->HugeBlobCtx->IsHugeBlob(groupType, id)) { - AddBlobToQueue(partId, TDiskBlob::Create(partSet.PartSet.FullDataSize, i + 1, + AddBlobToQueue(partId, TDiskBlob::Create(id.BlobSize(), i + 1, groupType.TotalPartCount(), std::move(data), Arena), {}, true, rbq); ++numHuge; } else { @@ -211,7 +214,7 @@ namespace NKikimr { if (numSmallParts) { // fill in disk blob buffer AddBlobToQueue(id, TDiskBlob::CreateFromDistinctParts(&partData[0], &partData[numSmallParts], - small, partSet.PartSet.FullDataSize, Arena), small, false, rbq); + small, id.BlobSize(), Arena), small, false, rbq); } ReplInfo->LogoBlobsRecovered += !!numSmallParts; diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp index fe835e0eac..45b50d9089 100644 --- a/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp +++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp @@ -134,7 +134,7 @@ namespace NKikimr { TBlobStorageGroupInfo::TServiceIds services; groupInfo->PickSubgroup(id.Hash(), &varray, &services); - NRepl::TRecoveryMachine::TPartSet p(groupInfo->Type); + NRepl::TRecoveryMachine::TPartSet p(id, groupInfo->Type); for (ui32 i = 1; i < v.size(); ++i) { if (v[i].empty()) { continue; @@ -150,7 +150,7 @@ namespace NKikimr { } NRepl::TRecoveryMachine::TRecoveredBlobsQueue rbq; NMatrix::TVectorType parts; - const bool success = m.Recover(id, p, rbq, parts); + const bool success = m.Recover(p, rbq, parts); Y_VERIFY(success); ui8 partIndex; |