aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-sumin <a-sumin@yandex-team.com>2023-05-02 18:09:13 +0300
committera-sumin <a-sumin@yandex-team.com>2023-05-02 18:09:13 +0300
commit11aff5b3add429c636e48bc9cd4198448e95a5e4 (patch)
tree98c6e68a12394e3d8990d3984c91c38784d460f8
parent4e8ead9a9a16fed820ecd8030e67dc878e03bc74 (diff)
downloadydb-11aff5b3add429c636e48bc9cd4198448e95a5e4.tar.gz
Handle read errors during defrag
-rw-r--r--ydb/core/blobstorage/ut_blobstorage/defrag.cpp100
-rw-r--r--ydb/core/blobstorage/vdisk/defrag/defrag_rewriter.cpp33
-rw-r--r--ydb/core/blobstorage/vdisk/defrag/defrag_rewriter.h2
3 files changed, 130 insertions, 5 deletions
diff --git a/ydb/core/blobstorage/ut_blobstorage/defrag.cpp b/ydb/core/blobstorage/ut_blobstorage/defrag.cpp
index f17f60066c5..d0face737ed 100644
--- a/ydb/core/blobstorage/ut_blobstorage/defrag.cpp
+++ b/ydb/core/blobstorage/ut_blobstorage/defrag.cpp
@@ -1,5 +1,7 @@
#include <ydb/core/blobstorage/ut_blobstorage/lib/env.h>
#include <ydb/core/blobstorage/vdisk/common/vdisk_private_events.h>
+#include <ydb/core/blobstorage/vdisk/defrag/defrag_rewriter.h>
+#include <ydb/core/blobstorage/vdisk/scrub/restore_corrupted_blob_actor.h>
static TIntrusivePtr<TBlobStorageGroupInfo> PrepareEnv(TEnvironmentSetup& env, TVector<TLogoBlobID> *keep) {
env.CreateBoxAndPool(1, 1);
@@ -233,4 +235,102 @@ Y_UNIT_TEST_SUITE(Defragmentation) {
env.Sim(TDuration::Minutes(5));
UNIT_ASSERT_VALUES_EQUAL(putCounter, 1);
}
+
+ void TestReadErrorHandlingBase(std::function<NPDisk::TEvChunkReadResult*(const NPDisk::TEvChunkReadResult*)> messChunkReadResult) {
+ TEnvironmentSetup env(TEnvironmentSetup::TSettings{
+ .NodeCount = 8,
+ .Erasure = TBlobStorageGroupType::ErasureMirror3of4,
+ });
+
+ TVector<TLogoBlobID> keep;
+ TIntrusivePtr<TBlobStorageGroupInfo> info = PrepareEnv(env, &keep);
+ if (!info) {
+ return;
+ }
+
+ std::optional<TActorId> rewriterActorId;
+ TAutoPtr<NPDisk::TEvChunkReadResult> readMsg;
+ bool caughtRestore = false;
+ bool caughtDone = false;
+ env.Runtime->FilterFunction = [&](ui32 nodeId, std::unique_ptr<IEventHandle>& ev) {
+ Y_UNUSED(nodeId);
+ switch(ev->Type) {
+ case TEvBlobStorage::EvChunkReadResult:
+ if (!rewriterActorId.has_value() && IsDefragRewriter(env.Runtime->GetActor(ev->Recipient))) {
+ rewriterActorId = ev->Recipient;
+
+ readMsg = ev->Release<NPDisk::TEvChunkReadResult>();
+ env.Runtime->Send(
+ new IEventHandle(
+ *rewriterActorId,
+ ev->Sender,
+ messChunkReadResult(readMsg.Get())),
+ ev->Sender.NodeId());
+
+ return false;
+ }
+ return true;
+ case TEvBlobStorage::EvDefragRewritten:
+ if (rewriterActorId == ev->Sender) {
+ UNIT_ASSERT_VALUES_EQUAL(caughtDone, false);
+ caughtDone = true;
+
+ const TEvDefragRewritten* msg = ev->Get<TEvDefragRewritten>();
+ UNIT_ASSERT_VALUES_EQUAL(msg->RewrittenRecs, 18);
+ UNIT_ASSERT_VALUES_EQUAL(msg->RewrittenBytes, 9961567);
+ }
+ return true;
+ case TEvBlobStorage::EvRestoreCorruptedBlob:
+ if (rewriterActorId == ev->Sender) {
+ UNIT_ASSERT_VALUES_EQUAL(caughtRestore, false);
+ caughtRestore = true;
+
+ const TEvRestoreCorruptedBlob* msg = ev->Get<TEvRestoreCorruptedBlob>();
+ UNIT_ASSERT_VALUES_EQUAL(msg->WriteRestoredParts, true);
+ UNIT_ASSERT_VALUES_EQUAL(msg->ReportNonrestoredParts, false);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Items.size(), 1);
+ const TEvRecoverBlob::TItem& item = msg->Items[0];
+ UNIT_ASSERT_VALUES_EQUAL(item.CorruptedPart.ChunkIdx, readMsg->ChunkIdx);
+ UNIT_ASSERT_VALUES_EQUAL(item.CorruptedPart.Offset, readMsg->Offset);
+ UNIT_ASSERT_VALUES_EQUAL(item.CorruptedPart.Size, readMsg->Data.Size());
+ }
+ default:
+ return true;
+ }
+ };
+ while (!caughtDone || !caughtRestore) {
+ env.Sim(TDuration::Minutes(1));
+ }
+ }
+
+ Y_UNIT_TEST(CorruptedReadHandling) {
+ TestReadErrorHandlingBase([] (const NPDisk::TEvChunkReadResult* msg) {
+ return new NPDisk::TEvChunkReadResult(
+ NKikimrProto::EReplyStatus::CORRUPTED,
+ msg->ChunkIdx,
+ msg->Offset,
+ msg->Cookie,
+ msg->StatusFlags,
+ msg->ErrorReason
+ );
+ });
+ }
+
+ Y_UNIT_TEST(GappedReadHandling) {
+ TestReadErrorHandlingBase([] (const NPDisk::TEvChunkReadResult* msg) {
+ NPDisk::TEvChunkReadResult* res = new NPDisk::TEvChunkReadResult(
+ NKikimrProto::EReplyStatus::OK,
+ msg->ChunkIdx,
+ msg->Offset,
+ msg->Cookie,
+ msg->StatusFlags,
+ msg->ErrorReason
+ );
+
+ res->Data.SetData(msg->Data.ToString());
+ res->Data.AddGap(0, res->Data.Size());
+
+ return res;
+ });
+ }
}
diff --git a/ydb/core/blobstorage/vdisk/defrag/defrag_rewriter.cpp b/ydb/core/blobstorage/vdisk/defrag/defrag_rewriter.cpp
index 6978dfc4bf1..3e9b4d936f5 100644
--- a/ydb/core/blobstorage/vdisk/defrag/defrag_rewriter.cpp
+++ b/ydb/core/blobstorage/vdisk/defrag/defrag_rewriter.cpp
@@ -1,4 +1,5 @@
#include "defrag_rewriter.h"
+#include <ydb/core/blobstorage/vdisk/scrub/restore_corrupted_blob_actor.h>
#include <ydb/core/blobstorage/vdisk/skeleton/blobstorage_takedbsnap.h>
namespace NKikimr {
@@ -18,6 +19,7 @@ namespace NKikimr {
std::vector<TDefragRecord> Recs;
size_t RecToReadIdx = 0;
size_t RewrittenRecsCounter = 0;
+ size_t SkippedRecsCounter = 0;
size_t RewrittenBytes = 0;
struct TCheckLocationMerger {
@@ -63,7 +65,7 @@ namespace NKikimr {
void SendNextRead(const TActorContext &ctx) {
if (RecToReadIdx < Recs.size()) {
ctx.Send(DCtx->SkeletonId, new TEvTakeHullSnapshot(false));
- } else if (RewrittenRecsCounter == Recs.size()) {
+ } else {
ctx.Send(NotifyId, new TEvDefragRewritten(RewrittenRecsCounter, RewrittenBytes));
Die(ctx);
}
@@ -93,19 +95,36 @@ namespace NKikimr {
}
++RecToReadIdx;
- ++RewrittenRecsCounter;
+ ++SkippedRecsCounter;
SendNextRead(ctx);
}
void Handle(NPDisk::TEvChunkReadResult::TPtr ev, const TActorContext& ctx) {
FullSnap.reset();
- // FIXME: handle read errors gracefully
- CHECK_PDISK_RESPONSE_READABLE(DCtx->VCtx, ev, ctx);
-
auto *msg = ev->Get();
const TDefragRecord &rec = Recs[RecToReadIdx++];
+ if (msg->Status == NKikimrProto::CORRUPTED || (msg->Status == NKikimrProto::OK && !msg->Data.IsReadable())) {
+ LOG_WARN_S(ctx, NKikimrServices::BS_VDISK_DEFRAG,
+ "Defrag skipping corrupted blob #" << rec.LogoBlobId << " on " << rec.OldDiskPart.ToString());
+ const TBlobStorageGroupType gtype = DCtx->VCtx->Top->GType;
+ Send(DCtx->SkeletonId,
+ new TEvRestoreCorruptedBlob(
+ ctx.Now() + TDuration::Minutes(2),
+ {TEvRestoreCorruptedBlob::TItem(
+ rec.LogoBlobId.FullID(),
+ NMatrix::TVectorType::MakeOneHot(rec.LogoBlobId.PartId() - 1, gtype.TotalPartCount()),
+ gtype,
+ rec.OldDiskPart)},
+ true, false));
+
+ ++SkippedRecsCounter;
+ SendNextRead(ctx);
+ return;
+ }
+ CHECK_PDISK_RESPONSE_READABLE(DCtx->VCtx, ev, ctx);
+
const auto &gtype = DCtx->VCtx->Top->GType;
ui8 partId = rec.LogoBlobId.PartId();
Y_VERIFY(partId);
@@ -182,4 +201,8 @@ namespace NKikimr {
return new TDefragRewriter(dCtx, selfVDiskId, notifyId, std::move(recs));
}
+ bool IsDefragRewriter(const IActor* actor) {
+ return dynamic_cast<const TDefragRewriter*>(actor) != nullptr;
+ }
+
} // NKikimr
diff --git a/ydb/core/blobstorage/vdisk/defrag/defrag_rewriter.h b/ydb/core/blobstorage/vdisk/defrag/defrag_rewriter.h
index dba032d87ae..537f6b49fe7 100644
--- a/ydb/core/blobstorage/vdisk/defrag/defrag_rewriter.h
+++ b/ydb/core/blobstorage/vdisk/defrag/defrag_rewriter.h
@@ -38,5 +38,7 @@ namespace NKikimr {
const TVDiskID &selfVDiskId,
const TActorId &notifyId,
std::vector<TDefragRecord> &&recs);
+
+ bool IsDefragRewriter(const IActor* actor);
} // NKikimr