aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-01-24 16:02:16 +0300
committeralexvru <alexvru@ydb.tech>2023-01-24 16:02:16 +0300
commitc4ba73bc1252fb6adf017574a7c9da046d1aa421 (patch)
tree3df885fc6a55de81d06b073ff12ced408ac29f91
parent2921faed8aa81f1b094500a99de13115d1f01aeb (diff)
downloadydb-c4ba73bc1252fb6adf017574a7c9da046d1aa421.tar.gz
Fix replication code
-rw-r--r--ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp24
-rw-r--r--ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h35
-rw-r--r--ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp4
3 files changed, 33 insertions, 30 deletions
diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp
index 437b514a10..3922576af3 100644
--- a/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp
+++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp
@@ -279,7 +279,7 @@ namespace NKikimr {
friend class TActorBootstrapped<THullReplJobActor>;
- std::optional<std::pair<TLogoBlobID, TRecoveryMachine::TPartSet>> CurrentKeyAndParts;
+ std::optional<TRecoveryMachine::TPartSet> CurrentItem;
TLogoBlobID LastProcessedKey;
void Finish() {
@@ -529,16 +529,16 @@ namespace NKikimr {
TimeAccount.SetState(ETimeState::MERGE);
// acquire current key; front item contains the least key
- if (!CurrentKeyAndParts) {
+ if (!CurrentItem) {
const TLogoBlobID id = MergeHeap.front()->GenLogoBlobId();
- CurrentKeyAndParts.emplace(id, ReplCtx->VCtx->Top->GType);
+ CurrentItem.emplace(id, ReplCtx->VCtx->Top->GType);
Y_VERIFY(std::exchange(LastProcessedKey, id) < id);
}
- auto& [currentKey, currentParts] = *CurrentKeyAndParts;
+ auto& item = *CurrentItem;
// find out which proxies carry items with the same key
TVector<TVDiskProxyPtr>::iterator lastIter = MergeHeap.end();
- while (lastIter != MergeHeap.begin() && MergeHeap.front()->GenLogoBlobId() == currentKey) {
+ while (lastIter != MergeHeap.begin() && MergeHeap.front()->GenLogoBlobId() == item.Id) {
PopHeap(MergeHeap.begin(), lastIter, TVDiskProxy::TPtrGreater());
--lastIter;
}
@@ -549,17 +549,17 @@ namespace NKikimr {
while (lastIter != MergeHeap.end()) {
// process all items with specified current key
TVDiskProxyPtr proxy = *lastIter;
- while (proxy->Valid() && proxy->GenLogoBlobId() == currentKey) {
+ while (proxy->Valid() && proxy->GenLogoBlobId() == item.Id) {
TLogoBlobID id;
NKikimrProto::EReplyStatus status;
TTrackableString data(TMemoryConsumer(ReplCtx->VCtx->Replication));
proxy->GetData(&id, &status, &data);
if (status != NKikimrProto::OK || data.size()) {
- currentParts.AddData(ReplCtx->VCtx->Top->GetOrderNumber(proxy->VDiskId), id, status, data.GetBaseConstRef());
+ item.AddData(ReplCtx->VCtx->Top->GetOrderNumber(proxy->VDiskId), id, status, data.GetBaseConstRef());
}
proxy->Next();
}
- Y_VERIFY_DEBUG(!proxy->Valid() || currentKey < proxy->GenLogoBlobId());
+ Y_VERIFY_DEBUG(!proxy->Valid() || item.Id < proxy->GenLogoBlobId());
// if proxy is not exhausted yet, then put it back into merge queue
if (proxy->Valid()) {
@@ -590,12 +590,12 @@ namespace NKikimr {
// recover data
NMatrix::TVectorType parts;
- if (!RecoveryMachine->Recover(currentKey, currentParts, RecoveryQueue, parts)) {
+ if (!RecoveryMachine->Recover(item, RecoveryQueue, parts)) {
STLOG(PRI_INFO, BS_REPL, BSVR33, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "Sending phantom validation query"),
- (GroupId, GInfo->GroupID), (CurKey, currentKey));
- PhantomChecksPending.emplace_back(currentKey, parts);
+ (GroupId, GInfo->GroupID), (CurKey, item.Id));
+ PhantomChecksPending.emplace_back(item.Id, parts);
}
- CurrentKeyAndParts.reset();
+ CurrentItem.reset();
// process recovered items, if any; queueProcessed.first will be false when writer is not ready for new data
EProcessQueueAction action = ProcessQueue();
diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h b/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h
index e3035f5cb9..64dd28a3f5 100644
--- a/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h
+++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h
@@ -61,22 +61,26 @@ namespace NKikimr {
using TRecoveredBlobsQueue = TQueue<TRecoveredBlobInfo>;
struct TPartSet {
+ const TLogoBlobID Id;
TDataPartSet PartSet;
ui32 DisksRepliedOK = 0;
ui32 DisksRepliedNODATA = 0;
ui32 DisksRepliedNOT_YET = 0;
ui32 DisksRepliedOther = 0;
- TPartSet(TBlobStorageGroupType gtype) {
+ TPartSet(TLogoBlobID id, TBlobStorageGroupType gtype)
+ : Id(id)
+ {
+ PartSet.FullDataSize = Id.BlobSize();
PartSet.Parts.resize(gtype.TotalPartCount());
}
void AddData(ui32 diskIdx, const TLogoBlobID& id, NKikimrProto::EReplyStatus status, TString data) {
+ Y_VERIFY(id.FullID() == Id);
switch (status) {
case NKikimrProto::OK: {
const ui8 partIdx = id.PartId() - 1;
Y_VERIFY(partIdx < PartSet.Parts.size());
- PartSet.FullDataSize = id.BlobSize();
PartSet.PartsMask |= 1 << partIdx;
PartSet.Parts[partIdx].ReferenceTo(data);
DisksRepliedOK |= 1 << diskIdx;
@@ -118,7 +122,8 @@ namespace NKikimr {
, Arena(&TRopeArenaBackend::Allocate)
{}
- bool Recover(const TLogoBlobID& id, TPartSet& partSet, TRecoveredBlobsQueue& rbq, NMatrix::TVectorType& parts) {
+ bool Recover(TPartSet& item, TRecoveredBlobsQueue& rbq, NMatrix::TVectorType& parts) {
+ const TLogoBlobID& id = item.Id;
Y_VERIFY(!id.PartId());
Y_VERIFY(!LastRecoveredId || *LastRecoveredId < id);
LastRecoveredId = id;
@@ -147,15 +152,15 @@ namespace NKikimr {
bool hasExactParts = false;
bool needToRestore = false;
for (ui8 i = parts.FirstPosition(); i != parts.GetSize(); i = parts.NextPosition(i)) {
- if (partSet.PartSet.PartsMask & (1 << i)) {
+ if (item.PartSet.PartsMask & (1 << i)) {
hasExactParts = true;
} else {
needToRestore = true;
}
}
- Y_VERIFY_DEBUG((partSet.PartSet.PartsMask >> groupType.TotalPartCount()) == 0);
- const ui32 presentParts = PopCount(partSet.PartSet.PartsMask);
+ Y_VERIFY_DEBUG((item.PartSet.PartsMask >> groupType.TotalPartCount()) == 0);
+ const ui32 presentParts = PopCount(item.PartSet.PartsMask);
bool canRestore = presentParts >= groupType.MinimalRestorablePartCount();
bool nonPhantom = true;
@@ -166,21 +171,19 @@ namespace NKikimr {
} else {
STLOG(PRI_INFO, BS_REPL, BSVR28, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "not enough data parts to recover"),
(BlobId, id), (NumPresentParts, presentParts), (MinParts, groupType.DataParts()),
- (PartSet, partSet.ToString()), (Ingress, lost.Ingress.ToString(ReplCtx->VCtx->Top.get(),
+ (PartSet, item.ToString()), (Ingress, lost.Ingress.ToString(ReplCtx->VCtx->Top.get(),
ReplCtx->VCtx->ShortSelfVDisk, id)));
BlobDone(id, false, &TEvReplFinished::TInfo::ItemsNotRecovered);
}
} else {
// recover
try {
- Y_VERIFY(partSet.PartSet.FullDataSize == id.BlobSize());
+ Y_VERIFY(item.PartSet.FullDataSize == id.BlobSize());
// PartSet contains some data, other data will be restored and written in the same PartSet
- TRope recoveredData;
if (canRestore && needToRestore) {
- groupType.RestoreData((TErasureType::ECrcMode)id.CrcMode(), partSet.PartSet, recoveredData,
- true, false, true);
- partSet.PartSet.PartsMask = (1 << groupType.TotalPartCount()) - 1;
+ groupType.RestoreData((TErasureType::ECrcMode)id.CrcMode(), item.PartSet, true, false, true);
+ item.PartSet.PartsMask = (1 << groupType.TotalPartCount()) - 1;
}
ui32 numSmallParts = 0, numMissingParts = 0, numHuge = 0;
@@ -188,7 +191,7 @@ namespace NKikimr {
NMatrix::TVectorType small(0, parts.GetSize());
for (ui8 i = parts.FirstPosition(); i != parts.GetSize(); i = parts.NextPosition(i)) {
- if (~partSet.PartSet.PartsMask & (1 << i)) {
+ if (~item.PartSet.PartsMask & (1 << i)) {
++numMissingParts; // ignore this missing part
continue;
}
@@ -196,10 +199,10 @@ namespace NKikimr {
const ui32 partSize = groupType.PartSize(partId);
Y_VERIFY(partSize); // no metadata here
partsSize += partSize;
- TRope data(partSet.PartSet.Parts[i].OwnedString); // TODO(alexvru): employ rope in TDataPartSet
+ TRope data(item.PartSet.Parts[i].OwnedString); // TODO(alexvru): employ rope in TDataPartSet
Y_VERIFY(data.GetSize() == partSize);
if (ReplCtx->HugeBlobCtx->IsHugeBlob(groupType, id)) {
- AddBlobToQueue(partId, TDiskBlob::Create(partSet.PartSet.FullDataSize, i + 1,
+ AddBlobToQueue(partId, TDiskBlob::Create(id.BlobSize(), i + 1,
groupType.TotalPartCount(), std::move(data), Arena), {}, true, rbq);
++numHuge;
} else {
@@ -211,7 +214,7 @@ namespace NKikimr {
if (numSmallParts) {
// fill in disk blob buffer
AddBlobToQueue(id, TDiskBlob::CreateFromDistinctParts(&partData[0], &partData[numSmallParts],
- small, partSet.PartSet.FullDataSize, Arena), small, false, rbq);
+ small, id.BlobSize(), Arena), small, false, rbq);
}
ReplInfo->LogoBlobsRecovered += !!numSmallParts;
diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp
index fe835e0eac..45b50d9089 100644
--- a/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp
+++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp
@@ -134,7 +134,7 @@ namespace NKikimr {
TBlobStorageGroupInfo::TServiceIds services;
groupInfo->PickSubgroup(id.Hash(), &varray, &services);
- NRepl::TRecoveryMachine::TPartSet p(groupInfo->Type);
+ NRepl::TRecoveryMachine::TPartSet p(id, groupInfo->Type);
for (ui32 i = 1; i < v.size(); ++i) {
if (v[i].empty()) {
continue;
@@ -150,7 +150,7 @@ namespace NKikimr {
}
NRepl::TRecoveryMachine::TRecoveredBlobsQueue rbq;
NMatrix::TVectorType parts;
- const bool success = m.Recover(id, p, rbq, parts);
+ const bool success = m.Recover(p, rbq, parts);
Y_VERIFY(success);
ui8 partIndex;