diff options
author | alexvru <alexvru@ydb.tech> | 2023-04-13 09:55:01 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-04-13 09:55:01 +0300 |
commit | 5453009c8feef430339b6d7dbeb811a8dbb73d50 (patch) | |
tree | b84fa66015fb17319f427b0326dc31924120661a | |
parent | 6d0c4e1ba27ee43298c7bfc6c857f8190c6ab337 (diff) | |
download | ydb-5453009c8feef430339b6d7dbeb811a8dbb73d50.tar.gz |
Fix PDiskFIT
-rw-r--r-- | ydb/core/blobstorage/ut_pdiskfit/lib/basic_test.cpp | 152 | ||||
-rw-r--r-- | ydb/core/blobstorage/ut_pdiskfit/lib/basic_test.h | 4 | ||||
-rw-r--r-- | ydb/core/blobstorage/ut_pdiskfit/lib/fail_injection_test.h | 20 |
3 files changed, 120 insertions, 56 deletions
diff --git a/ydb/core/blobstorage/ut_pdiskfit/lib/basic_test.cpp b/ydb/core/blobstorage/ut_pdiskfit/lib/basic_test.cpp index 22f28131794..d9dd9505126 100644 --- a/ydb/core/blobstorage/ut_pdiskfit/lib/basic_test.cpp +++ b/ydb/core/blobstorage/ut_pdiskfit/lib/basic_test.cpp @@ -154,7 +154,39 @@ class TFakeVDisk } str << x.Lsn << ":" << x.Signature.ToString() << ":" << x.DataLen; } - str << "] FirstLsnToKeep# " << FirstLsnToKeep << "}"; + str << "] FirstLsnToKeep# " << FirstLsnToKeep; + str << " Chunks# {"; + first = true; + for (const auto& [chunkIdx, chunk] : Chunks) { + if (first) { + first = false; + } else { + str << ' '; + } + str << chunkIdx << ":"; + switch (chunk.GetCommitState()) { + case ECommitState::RESERVED: + str << 'R'; + break; + + case ECommitState::COMMIT_IN_PROGRESS: + str << 'c'; + break; + + case ECommitState::COMMITTED: + str << 'C'; + break; + + case ECommitState::DELETE_IN_PROGRESS: + str << 'd'; + break; + + case ECommitState::DELETED: + Y_FAIL(); + } + } + str << '}'; + str << '}'; return str.Str(); } @@ -273,6 +305,9 @@ class TFakeVDisk bool StateVerified = false; + std::vector<ui32> ChunksToForget; + THashSet<ui32> OwnedChunks; + public: TFakeVDisk(const TVDiskID& vdiskId, const TActorId& pdiskServiceId, ui64 pdiskGuid, TStateManager *stateManager, TFakeVDiskParams params) @@ -321,7 +356,7 @@ public: } TString SelfInfo() const { - return TStringBuilder() << " VDiskId# " << VDiskId.ToStringWOGeneration() << " Owner# " << PDiskParams->Owner; + return TStringBuilder() << "VDiskId# " << VDiskId.ToStringWOGeneration() << " Owner# " << PDiskParams->Owner; } void Bootstrap(const TActorContext& ctx) { @@ -335,29 +370,32 @@ public: PDiskParams = msg->PDiskParams; State.BlocksInChunk = PDiskParams->ChunkSize / PDiskParams->AppendBlockSize; - THashSet<TChunkIdx> owned(msg->OwnedChunks.begin(), msg->OwnedChunks.end()); + TStringStream str; + str << SelfInfo() << " starting, owned chunks# " << FormatList(msg->OwnedChunks) << Endl; + Cerr << str.Str(); + + OwnedChunks = {msg->OwnedChunks.begin(), msg->OwnedChunks.end()}; + IssueReadLogRequest(ctx); + } + + void VerifyOwnedChunks(const TActorContext& /*ctx*/) { for (const auto& [idx, info] : Recovered.Chunks) { if (info.GetCommitState() == ECommitState::COMMITTED) { - Y_VERIFY_S(owned.count(idx), SelfInfo() << " has commited chunk# " << idx << " from Recovered.Chunks," + Y_VERIFY_S(OwnedChunks.count(idx), SelfInfo() << " has commited chunk# " << idx << " from Recovered.Chunks," << " but can't find it in OwnedChunks list from PDisk"); - owned.erase(idx); + OwnedChunks.erase(idx); } } - for (auto idx : owned) { + for (auto idx : OwnedChunks) { auto it = Recovered.Chunks.find(idx); Y_VERIFY_S(it != Recovered.Chunks.end(), SelfInfo() << " has owned chunk# " << idx - << " from PDisks's OwnedChunks, but can't find it in Recovered list"); + << " from PDisks's OwnedChunks, but can't find it in Recovered list" + << " Recovered# " << Recovered.ToString()); auto& info = it->second; Y_VERIFY(info.GetCommitState() == ECommitState::COMMIT_IN_PROGRESS || info.GetCommitState() == ECommitState::DELETE_IN_PROGRESS); } - - TStringStream str; - str << SelfInfo() << " starting, owned chunks# " << FormatList(msg->OwnedChunks) << Endl; - Cerr << str.Str(); - - IssueReadLogRequest(ctx); } void IssueReadLogRequest(const TActorContext& ctx) { @@ -385,6 +423,7 @@ public: if (msg->IsEndOfLog) { VerifyRecoveredLog(ctx); + VerifyOwnedChunks(ctx); } else { ReadLogPosition = msg->NextPosition; IssueReadLogRequest(ctx); @@ -401,11 +440,14 @@ public: // there is a record in current set (recovered from PDisk), but not in previous set (stored state); there // may be an item in flight? auto it = Recovered.InFlight.find(*current); - Y_VERIFY_S(it != Recovered.InFlight.end(), "unexpected log record " << SelfInfo() << " Lsn# " << current->Lsn - << " Signature# " << current->Signature.ToString()); - Y_VERIFY_S(it->DataLen == current->DataLen && it->Checksum == current->Checksum && - it->Signature == current->Signature, SelfInfo() << "Lsn# " << it->Lsn + Y_VERIFY_S(it != Recovered.InFlight.end() || current->Lsn < Recovered.FirstLsnToKeep, + "unexpected log record " << SelfInfo() << " Lsn# " << current->Lsn + << " Signature# " << current->Signature.ToString()); + if (it != Recovered.InFlight.end()) { + Y_VERIFY_S(it->DataLen == current->DataLen && it->Checksum == current->Checksum && + it->Signature == current->Signature, SelfInfo() << " Lsn# " << it->Lsn << " InFlightData# " << it->DataLen << " StoredData# " << current->DataLen); + } } else if (prev) { // lost item if (prev->Lsn >= Recovered.FirstLsnToKeep) { @@ -475,8 +517,9 @@ public: ui64 writeLogScore = 100; ui64 allocateScore = State.Chunks.size() < 20 ? 10 : 0; ui64 writeScore = State.Chunks.empty() ? 0 : 5; + ui64 forgetScore = ChunksToForget.empty() ? 0 : ChunksToForget.size() < 100 ? 10 : 1000; - ui64 totalScore = writeLogScore + allocateScore + writeScore; + ui64 totalScore = writeLogScore + allocateScore + writeScore + forgetScore; if (!totalScore) { // nothing to do break; @@ -491,6 +534,12 @@ public: IssueAllocateRequest(ctx); } else if ((option -= allocateScore) < writeScore) { IssueWriteRequest(ctx); + } else if ((option -= writeScore) < forgetScore) { + const size_t index = RandomNumber(ChunksToForget.size()); + ui32& chunk = ChunksToForget[index]; + SendPDiskRequest(ctx, new NPDisk::TEvChunkForget(PDiskParams->Owner, PDiskParams->OwnerRound, {chunk}), [] {}); + std::swap(chunk, ChunksToForget.back()); + ChunksToForget.pop_back(); } else { Y_FAIL("unexpected option"); } @@ -513,26 +562,23 @@ public: TState::TChunkInfo& chunk = pair.second; switch (chunk.GetCommitState()) { case ECommitState::RESERVED: - // reserved chunk is a subject for commit; 1% chance to commit chunk - if (RandomNumber<double>() < 0.01) { + // reserved chunk is a subject for commit; 10% chance to commit chunk + if (RandomNumber<double>() < 0.1) { info->CommitChunks.push_back(chunkIdx); chunk.SetCommitState(ECommitState::COMMIT_IN_PROGRESS); + break; } - break; - + [[fallthrough]]; case ECommitState::COMMITTED: - // committed chunk is a subject for deletion; 0.5% change to delete chunk - if (RandomNumber<double>() < 0.005) { - bool hasWrites = false; - for (const TWriteRecord& w : State.WritesInFlight) { - if (w.ChunkIdx == chunkIdx) { - hasWrites = true; - break; + // committed chunk is a subject for deletion; 10% chance to delete chunk + if (RandomNumber<double>() < 0.1) { + for (auto it = State.WritesInFlight.begin(); it != State.WritesInFlight.end(); ) { + if (it->ChunkIdx == chunkIdx) { + it = State.WritesInFlight.erase(it); + } else { + ++it; } } - if (hasWrites) { - break; - } info->DeleteChunks.push_back(chunkIdx); chunk.SetCommitState(ECommitState::DELETE_IN_PROGRESS); } @@ -545,7 +591,7 @@ public: NPDisk::TCommitRecord cr; - // advance LSN every 30000 items avg + // advance LSN every 500 items avg if (Lsn > Params.LsnToKeepCount && RandomNumber<double>() < Params.LogCutProbability) { cr.FirstLsnToKeep = Lsn - Params.LsnToKeepCount; cr.IsStartingPoint = true; // make starting point if we cut log @@ -554,6 +600,7 @@ public: // fill in commit/delete records cr.CommitChunks = info->CommitChunks; cr.DeleteChunks = info->DeleteChunks; + cr.DeleteToDecommitted = RandomNumber(2u); cr.IsStartingPoint = cr.IsStartingPoint || cr.CommitChunks || cr.DeleteChunks; if (cr.FirstLsnToKeep) { @@ -562,19 +609,17 @@ public: Cerr << str.Str(); } - TStringStream msg; - msg << "TEvLog " << SelfInfo() << " Lsn# " << Lsn << " Size# " << info->DataLen; - auto printChunks = [] (TVector<TChunkIdx> chunks) { - bool first = true; - TStringStream str; - for (TChunkIdx chunk : chunks) { - str << (first ? first = false, "" : " ") << chunk; - } - return str.Str(); - }; - msg << " Commit# " << printChunks(cr.CommitChunks) << " Delete# " << printChunks(cr.DeleteChunks) - << " IsStartingPoint# " << (cr.IsStartingPoint ? "true" : "false") << Endl; + if (cr.DeleteToDecommitted) { + ChunksToForget.insert(ChunksToForget.end(), cr.DeleteChunks.begin(), cr.DeleteChunks.end()); + } + if (cr.IsStartingPoint || cr.CommitChunks || cr.DeleteChunks) { + TStringStream msg; + msg << "TEvLog " << SelfInfo() << " Lsn# " << Lsn << " Size# " << info->DataLen + << " Commit# " << FormatList(cr.CommitChunks) << " Delete# " << FormatList(cr.DeleteChunks) + << " IsStartingPoint# " << (cr.IsStartingPoint ? "true" : "false") + << " DeleteToDecommitted# " << (cr.DeleteToDecommitted ? "true" : "false") + << Endl; Cerr << msg.Str(); } @@ -615,6 +660,7 @@ public: // apply chunk commits for (TChunkIdx chunkIdx : info->CommitChunks) { TState::TChunkInfo& chunk = State.GetChunk(chunkIdx); + Cerr << (TStringBuilder() << SelfInfo() << " Chunk COMMITTED ChunkIdx# " << chunkIdx << Endl); chunk.SetCommitState(success ? ECommitState::COMMITTED : ECommitState::RESERVED); } @@ -625,6 +671,7 @@ public: TState::TChunkInfo& chunk = it->second; chunk.SetCommitState(success ? ECommitState::DELETED : ECommitState::COMMITTED); if (chunk.GetCommitState() == ECommitState::DELETED) { + Cerr << (TStringBuilder() << SelfInfo() << " Chunk DELETED ChunkIdx# " << chunkIdx << Endl); State.Chunks.erase(it); } } @@ -749,6 +796,11 @@ public: void Handle(NPDisk::TEvChunkWriteResult::TPtr& ev, const TActorContext& ctx) { auto *msg = ev->Get(); + if (const auto it = State.Chunks.find(msg->ChunkIdx); it == State.Chunks.end() || + it->second.GetCommitState() == ECommitState::DELETE_IN_PROGRESS) { + return Activity(ctx); // ignore this write result + } + Y_VERIFY(msg->Status == NKikimrProto::OK); StateManager->ExecuteConsistentAction([&] { @@ -810,6 +862,7 @@ public: || chunk.GetCommitState() == ECommitState::DELETE_IN_PROGRESS, "ChunkIdx# %" PRIu32 " CommitState# %" PRIu32, msg->ChunkIdx, static_cast<ui32>(chunk.GetCommitState())); + const bool wasDeleteInProgress = chunk.GetCommitState() == ECommitState::DELETE_IN_PROGRESS; chunk.SetCommitState(ECommitState::COMMITTED); ui32 offset = 0; @@ -838,7 +891,7 @@ public: break; } } - Y_VERIFY(it != Recovered.WritesInFlight.end() || chunk.VerifyChecksum(i, checksum), + Y_VERIFY(it != Recovered.WritesInFlight.end() || chunk.VerifyChecksum(i, checksum) || wasDeleteInProgress, "inconsistent chunk data ChunkIdx# %" PRIu32 " OffsetInBlocks# %" PRIu32 " Used# %s" " Checksum# %08" PRIx32 " StoredChecksum# %08" PRIx32 " WritesInFlight# %s", msg->ChunkIdx, i, chunk.IsUsed(i) ? "true" : "false", checksum, chunk.GetChecksum(i), s.Str().data()); @@ -889,6 +942,12 @@ public: return data; } + void Handle(NPDisk::TEvChunkForgetResult::TPtr& ev, const TActorContext& /*ctx*/) { + if (ev->Get()->Status != NKikimrProto::OK) { + Cerr << (TStringBuilder() << VDiskId << " TEvChunkForgetResult# " << ev->Get()->ToString() << Endl); + } + } + STFUNC(StateFunc) { switch (const ui32 type = ev->GetTypeRewrite()) { CFunc(TEvents::TSystem::Bootstrap, Bootstrap); @@ -898,6 +957,7 @@ public: HFunc(NPDisk::TEvChunkReserveResult, Handle); HFunc(NPDisk::TEvChunkWriteResult, Handle); HFunc(NPDisk::TEvChunkReadResult, Handle); + HFunc(NPDisk::TEvChunkForgetResult, Handle); default: Y_FAIL("unexpected message 0x%08" PRIx32, type); } } diff --git a/ydb/core/blobstorage/ut_pdiskfit/lib/basic_test.h b/ydb/core/blobstorage/ut_pdiskfit/lib/basic_test.h index f9e1bcf05bb..e046fdb8472 100644 --- a/ydb/core/blobstorage/ut_pdiskfit/lib/basic_test.h +++ b/ydb/core/blobstorage/ut_pdiskfit/lib/basic_test.h @@ -19,10 +19,10 @@ struct TFakeVDiskParams { // LogRecord size distribution ui32 SizeMin = 1000; - ui32 SizeMax = 2000; + ui32 SizeMax = 100000; ui32 LsnToKeepCount = 1000; - double LogCutProbability = 1.0 / 30000; + double LogCutProbability = 1.0 / 500; }; diff --git a/ydb/core/blobstorage/ut_pdiskfit/lib/fail_injection_test.h b/ydb/core/blobstorage/ut_pdiskfit/lib/fail_injection_test.h index 7b7abf4818e..c53155f3669 100644 --- a/ydb/core/blobstorage/ut_pdiskfit/lib/fail_injection_test.h +++ b/ydb/core/blobstorage/ut_pdiskfit/lib/fail_injection_test.h @@ -37,7 +37,11 @@ public: AtomicSet(FailCounter, failCounter); } - void Inject(ui64 /*cookie*/) { + void Inject(ui64 cookie) { + if (cookie == 2 && RandomNumber(10u) == 0) { + AtomicSet(FailCounter, 1 + RandomNumber(24u)); + } + TAtomicBase result = AtomicDecrement(FailCounter); if (result < 0) { // overshoot, return one position back @@ -91,10 +95,10 @@ ui32 GenerateFailCounter(bool frequentFails) { double p = (rng() % (1000 * 1000 * 1000)) / 1e9; - if (frequentFails) { + if (!frequentFails) { return p < 0.05 ? rng() % 10 + 1 - : p < 0.10 ? rng() % 1000 + 1000 : - rng() % 5000 + 5000; + : p < 0.10 ? rng() % 100 + 100 : + rng() % 1000 + 1000; } else { return p < 0.9 ? rng() % 10 + 1 : p < 0.99 ? rng() % 100 + 100 : @@ -183,10 +187,10 @@ struct TPDiskFailureInjectionTest { setup->NodeId = 1; setup->ExecutorsCount = 4; // system, user, io, batch setup->Executors.Reset(new TAutoPtr<IExecutorPool>[setup->ExecutorsCount]); - setup->Executors[0] = new TBasicExecutorPool(AppData->SystemPoolId, 8, 10); - setup->Executors[1] = new TBasicExecutorPool(AppData->UserPoolId, 8, 10); + setup->Executors[0] = new TBasicExecutorPool(AppData->SystemPoolId, 16, 10); + setup->Executors[1] = new TBasicExecutorPool(AppData->UserPoolId, 1, 10); setup->Executors[2] = new TIOExecutorPool(AppData->IOPoolId, 10); - setup->Executors[3] = new TBasicExecutorPool(AppData->BatchPoolId, 8, 10); + setup->Executors[3] = new TBasicExecutorPool(AppData->BatchPoolId, 1, 10); setup->Scheduler = new TBasicSchedulerThread(TSchedulerConfig(512, 100)); // initialize logger settings @@ -214,7 +218,7 @@ struct TPDiskFailureInjectionTest { // create/register logger actor auto logger = std::make_unique<TLoggerActor>(loggerSettings, CreateStderrBackend(), Counters->GetSubgroup("logger", "counters")); - setup->LocalServices.emplace_back(loggerId, TActorSetupCmd(logger.release(), TMailboxType::Simple, 0)); + setup->LocalServices.emplace_back(loggerId, TActorSetupCmd(logger.release(), TMailboxType::Simple, AppData->IOPoolId)); // create and then initialize actor system ActorSystem = std::make_unique<TActorSystem>(setup, AppData.get(), loggerSettings); |