diff options
author | a-sumin <a-sumin@yandex-team.com> | 2023-05-02 18:09:13 +0300 |
---|---|---|
committer | a-sumin <a-sumin@yandex-team.com> | 2023-05-02 18:09:13 +0300 |
commit | 11aff5b3add429c636e48bc9cd4198448e95a5e4 (patch) | |
tree | 98c6e68a12394e3d8990d3984c91c38784d460f8 | |
parent | 4e8ead9a9a16fed820ecd8030e67dc878e03bc74 (diff) | |
download | ydb-11aff5b3add429c636e48bc9cd4198448e95a5e4.tar.gz |
Handle read errors during defrag
-rw-r--r-- | ydb/core/blobstorage/ut_blobstorage/defrag.cpp | 100 | ||||
-rw-r--r-- | ydb/core/blobstorage/vdisk/defrag/defrag_rewriter.cpp | 33 | ||||
-rw-r--r-- | ydb/core/blobstorage/vdisk/defrag/defrag_rewriter.h | 2 |
3 files changed, 130 insertions, 5 deletions
diff --git a/ydb/core/blobstorage/ut_blobstorage/defrag.cpp b/ydb/core/blobstorage/ut_blobstorage/defrag.cpp index f17f60066c5..d0face737ed 100644 --- a/ydb/core/blobstorage/ut_blobstorage/defrag.cpp +++ b/ydb/core/blobstorage/ut_blobstorage/defrag.cpp @@ -1,5 +1,7 @@ #include <ydb/core/blobstorage/ut_blobstorage/lib/env.h> #include <ydb/core/blobstorage/vdisk/common/vdisk_private_events.h> +#include <ydb/core/blobstorage/vdisk/defrag/defrag_rewriter.h> +#include <ydb/core/blobstorage/vdisk/scrub/restore_corrupted_blob_actor.h> static TIntrusivePtr<TBlobStorageGroupInfo> PrepareEnv(TEnvironmentSetup& env, TVector<TLogoBlobID> *keep) { env.CreateBoxAndPool(1, 1); @@ -233,4 +235,102 @@ Y_UNIT_TEST_SUITE(Defragmentation) { env.Sim(TDuration::Minutes(5)); UNIT_ASSERT_VALUES_EQUAL(putCounter, 1); } + + void TestReadErrorHandlingBase(std::function<NPDisk::TEvChunkReadResult*(const NPDisk::TEvChunkReadResult*)> messChunkReadResult) { + TEnvironmentSetup env(TEnvironmentSetup::TSettings{ + .NodeCount = 8, + .Erasure = TBlobStorageGroupType::ErasureMirror3of4, + }); + + TVector<TLogoBlobID> keep; + TIntrusivePtr<TBlobStorageGroupInfo> info = PrepareEnv(env, &keep); + if (!info) { + return; + } + + std::optional<TActorId> rewriterActorId; + TAutoPtr<NPDisk::TEvChunkReadResult> readMsg; + bool caughtRestore = false; + bool caughtDone = false; + env.Runtime->FilterFunction = [&](ui32 nodeId, std::unique_ptr<IEventHandle>& ev) { + Y_UNUSED(nodeId); + switch(ev->Type) { + case TEvBlobStorage::EvChunkReadResult: + if (!rewriterActorId.has_value() && IsDefragRewriter(env.Runtime->GetActor(ev->Recipient))) { + rewriterActorId = ev->Recipient; + + readMsg = ev->Release<NPDisk::TEvChunkReadResult>(); + env.Runtime->Send( + new IEventHandle( + *rewriterActorId, + ev->Sender, + messChunkReadResult(readMsg.Get())), + ev->Sender.NodeId()); + + return false; + } + return true; + case TEvBlobStorage::EvDefragRewritten: + if (rewriterActorId == ev->Sender) { + UNIT_ASSERT_VALUES_EQUAL(caughtDone, false); + caughtDone = true; + + const TEvDefragRewritten* msg = ev->Get<TEvDefragRewritten>(); + UNIT_ASSERT_VALUES_EQUAL(msg->RewrittenRecs, 18); + UNIT_ASSERT_VALUES_EQUAL(msg->RewrittenBytes, 9961567); + } + return true; + case TEvBlobStorage::EvRestoreCorruptedBlob: + if (rewriterActorId == ev->Sender) { + UNIT_ASSERT_VALUES_EQUAL(caughtRestore, false); + caughtRestore = true; + + const TEvRestoreCorruptedBlob* msg = ev->Get<TEvRestoreCorruptedBlob>(); + UNIT_ASSERT_VALUES_EQUAL(msg->WriteRestoredParts, true); + UNIT_ASSERT_VALUES_EQUAL(msg->ReportNonrestoredParts, false); + UNIT_ASSERT_VALUES_EQUAL(msg->Items.size(), 1); + const TEvRecoverBlob::TItem& item = msg->Items[0]; + UNIT_ASSERT_VALUES_EQUAL(item.CorruptedPart.ChunkIdx, readMsg->ChunkIdx); + UNIT_ASSERT_VALUES_EQUAL(item.CorruptedPart.Offset, readMsg->Offset); + UNIT_ASSERT_VALUES_EQUAL(item.CorruptedPart.Size, readMsg->Data.Size()); + } + default: + return true; + } + }; + while (!caughtDone || !caughtRestore) { + env.Sim(TDuration::Minutes(1)); + } + } + + Y_UNIT_TEST(CorruptedReadHandling) { + TestReadErrorHandlingBase([] (const NPDisk::TEvChunkReadResult* msg) { + return new NPDisk::TEvChunkReadResult( + NKikimrProto::EReplyStatus::CORRUPTED, + msg->ChunkIdx, + msg->Offset, + msg->Cookie, + msg->StatusFlags, + msg->ErrorReason + ); + }); + } + + Y_UNIT_TEST(GappedReadHandling) { + TestReadErrorHandlingBase([] (const NPDisk::TEvChunkReadResult* msg) { + NPDisk::TEvChunkReadResult* res = new NPDisk::TEvChunkReadResult( + NKikimrProto::EReplyStatus::OK, + msg->ChunkIdx, + msg->Offset, + msg->Cookie, + msg->StatusFlags, + msg->ErrorReason + ); + + res->Data.SetData(msg->Data.ToString()); + res->Data.AddGap(0, res->Data.Size()); + + return res; + }); + } } diff --git a/ydb/core/blobstorage/vdisk/defrag/defrag_rewriter.cpp b/ydb/core/blobstorage/vdisk/defrag/defrag_rewriter.cpp index 6978dfc4bf1..3e9b4d936f5 100644 --- a/ydb/core/blobstorage/vdisk/defrag/defrag_rewriter.cpp +++ b/ydb/core/blobstorage/vdisk/defrag/defrag_rewriter.cpp @@ -1,4 +1,5 @@ #include "defrag_rewriter.h" +#include <ydb/core/blobstorage/vdisk/scrub/restore_corrupted_blob_actor.h> #include <ydb/core/blobstorage/vdisk/skeleton/blobstorage_takedbsnap.h> namespace NKikimr { @@ -18,6 +19,7 @@ namespace NKikimr { std::vector<TDefragRecord> Recs; size_t RecToReadIdx = 0; size_t RewrittenRecsCounter = 0; + size_t SkippedRecsCounter = 0; size_t RewrittenBytes = 0; struct TCheckLocationMerger { @@ -63,7 +65,7 @@ namespace NKikimr { void SendNextRead(const TActorContext &ctx) { if (RecToReadIdx < Recs.size()) { ctx.Send(DCtx->SkeletonId, new TEvTakeHullSnapshot(false)); - } else if (RewrittenRecsCounter == Recs.size()) { + } else { ctx.Send(NotifyId, new TEvDefragRewritten(RewrittenRecsCounter, RewrittenBytes)); Die(ctx); } @@ -93,19 +95,36 @@ namespace NKikimr { } ++RecToReadIdx; - ++RewrittenRecsCounter; + ++SkippedRecsCounter; SendNextRead(ctx); } void Handle(NPDisk::TEvChunkReadResult::TPtr ev, const TActorContext& ctx) { FullSnap.reset(); - // FIXME: handle read errors gracefully - CHECK_PDISK_RESPONSE_READABLE(DCtx->VCtx, ev, ctx); - auto *msg = ev->Get(); const TDefragRecord &rec = Recs[RecToReadIdx++]; + if (msg->Status == NKikimrProto::CORRUPTED || (msg->Status == NKikimrProto::OK && !msg->Data.IsReadable())) { + LOG_WARN_S(ctx, NKikimrServices::BS_VDISK_DEFRAG, + "Defrag skipping corrupted blob #" << rec.LogoBlobId << " on " << rec.OldDiskPart.ToString()); + const TBlobStorageGroupType gtype = DCtx->VCtx->Top->GType; + Send(DCtx->SkeletonId, + new TEvRestoreCorruptedBlob( + ctx.Now() + TDuration::Minutes(2), + {TEvRestoreCorruptedBlob::TItem( + rec.LogoBlobId.FullID(), + NMatrix::TVectorType::MakeOneHot(rec.LogoBlobId.PartId() - 1, gtype.TotalPartCount()), + gtype, + rec.OldDiskPart)}, + true, false)); + + ++SkippedRecsCounter; + SendNextRead(ctx); + return; + } + CHECK_PDISK_RESPONSE_READABLE(DCtx->VCtx, ev, ctx); + const auto >ype = DCtx->VCtx->Top->GType; ui8 partId = rec.LogoBlobId.PartId(); Y_VERIFY(partId); @@ -182,4 +201,8 @@ namespace NKikimr { return new TDefragRewriter(dCtx, selfVDiskId, notifyId, std::move(recs)); } + bool IsDefragRewriter(const IActor* actor) { + return dynamic_cast<const TDefragRewriter*>(actor) != nullptr; + } + } // NKikimr diff --git a/ydb/core/blobstorage/vdisk/defrag/defrag_rewriter.h b/ydb/core/blobstorage/vdisk/defrag/defrag_rewriter.h index dba032d87ae..537f6b49fe7 100644 --- a/ydb/core/blobstorage/vdisk/defrag/defrag_rewriter.h +++ b/ydb/core/blobstorage/vdisk/defrag/defrag_rewriter.h @@ -38,5 +38,7 @@ namespace NKikimr { const TVDiskID &selfVDiskId, const TActorId ¬ifyId, std::vector<TDefragRecord> &&recs); + + bool IsDefragRewriter(const IActor* actor); } // NKikimr |