diff options
author | Semyon Danilov <senya@ydb.tech> | 2024-12-12 15:54:15 +0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-12-12 15:54:15 +0400 |
commit | eafe9a1c693e0ccc99f0da1c4dcb1ed7d71e43cd (patch) | |
tree | 638c695cf5803005b13a40f6200b00c94181b1b4 | |
parent | 61be59ff1afdd14ef115af0e4e3c8808e6df5e1f (diff) | |
download | ydb-eafe9a1c693e0ccc99f0da1c4dcb1ed7d71e43cd.tar.gz |
Put log chunks from killed owners on quarantine, if owner is still reading log (#11795)
7 files changed, 260 insertions, 23 deletions
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp index 40516d8477..a63a204f2a 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp @@ -2112,16 +2112,20 @@ void TPDisk::KillOwner(TOwner owner, TOwnerRound killOwnerRound, TCompletionEven } TryTrimChunk(false, 0, NWilson::TSpan{}); + bool readingLog = OwnerData[owner].ReadingLog(); ui64 lastSeenLsn = 0; auto it = LogChunks.begin(); while (it != LogChunks.end()) { if (it->OwnerLsnRange.size() > owner && it->OwnerLsnRange[owner].IsPresent) { - Y_ABORT_UNLESS(it->CurrentUserCount > 0); - it->CurrentUserCount--; - it->OwnerLsnRange[owner].IsPresent = false; - it->OwnerLsnRange[owner].FirstLsn = 0; lastSeenLsn = Max(it->OwnerLsnRange[owner].LastLsn, lastSeenLsn); - it->OwnerLsnRange[owner].LastLsn = 0; + + if (!readingLog) { + Y_ABORT_UNLESS(it->CurrentUserCount > 0); + it->CurrentUserCount--; + it->OwnerLsnRange[owner].IsPresent = false; + it->OwnerLsnRange[owner].FirstLsn = 0; + it->OwnerLsnRange[owner].LastLsn = 0; + } } ++it; } @@ -2372,22 +2376,51 @@ void TPDisk::ClearQuarantineChunks() { *Mon.QuarantineChunks = QuarantineChunks.size(); } + bool haveChunksToRelease = false; + { const auto it = std::partition(QuarantineOwners.begin(), QuarantineOwners.end(), [&] (TOwner i) { return Keeper.GetOwnerUsed(i) || OwnerData[i].HaveRequestsInFlight(); }); for (auto delIt = it; delIt != QuarantineOwners.end(); ++delIt) { - ADD_RECORD_WITH_TIMESTAMP_TO_OPERATION_LOG(OwnerData[*delIt].OperationLog, "Remove owner from quarantine, OwnerId# " << *delIt); - TOwnerRound ownerRound = OwnerData[*delIt].OwnerRound; - OwnerData[*delIt].Reset(false); - OwnerData[*delIt].OwnerRound = ownerRound; - Keeper.RemoveOwner(*delIt); - P_LOG(PRI_NOTICE, BPD01, "removed owner from chunks Keeper through QuarantineOwners", - (OwnerId, (ui32)*delIt)); + TOwner owner = *delIt; + ADD_RECORD_WITH_TIMESTAMP_TO_OPERATION_LOG(OwnerData[owner].OperationLog, "Remove owner from quarantine, OwnerId# " << owner); + TOwnerRound ownerRound = OwnerData[owner].OwnerRound; + OwnerData[owner].Reset(false); + OwnerData[owner].OwnerRound = ownerRound; + Keeper.RemoveOwner(owner); + + ui64 lastSeenLsn = 0; + auto it = LogChunks.begin(); + while (it != LogChunks.end()) { + if (it->OwnerLsnRange.size() > owner && it->OwnerLsnRange[owner].IsPresent) { + Y_ABORT_UNLESS(it->CurrentUserCount > 0); + ui32 userCount = --it->CurrentUserCount; + it->OwnerLsnRange[owner].IsPresent = false; + it->OwnerLsnRange[owner].FirstLsn = 0; + lastSeenLsn = Max(it->OwnerLsnRange[owner].LastLsn, lastSeenLsn); + it->OwnerLsnRange[owner].LastLsn = 0; + + if (userCount == 0) { + haveChunksToRelease = true; + } + } + ++it; + } + + P_LOG(PRI_NOTICE, BPD01, "removed owner from chunks Keeper through QuarantineOwners" << (haveChunksToRelease ? " along with log chunks" : ""), + (OwnerId, (ui32)owner), (LastSeenLsn, lastSeenLsn)); } QuarantineOwners.erase(it, QuarantineOwners.end()); *Mon.QuarantineOwners = QuarantineOwners.size(); } + + if (haveChunksToRelease) { + THolder<TCompletionEventSender> completion(new TCompletionEventSender(this)); + if (ReleaseUnusedLogChunks(completion.Get())) { + WriteSysLogRestorePoint(completion.Release(), TReqId(TReqId::KillOwnerSysLog, 0), {}); + } + } } // Should be called to initiate TRIM (on chunk delete or prev trim done) diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h index b3779de337..1964bf297a 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h @@ -148,7 +148,7 @@ public: ui64 InsaneLogChunks = 0; // Set when pdisk sees insanely large log, to give vdisks a chance to cut it ui32 FirstLogChunkToParseCommits = 0; - // Chunks that is owned by killed owner, but has operations InFlight + // Chunks that are owned by killed owner, but have operations InFlight TVector<TChunkIdx> QuarantineChunks; TVector<TOwner> QuarantineOwners; diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_logreader.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_logreader.cpp index 46c380840a..30ca52fb9b 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_logreader.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_logreader.cpp @@ -797,12 +797,30 @@ bool TLogReader::ProcessSectorSet(TSectorData *sector) { P_LOG(PRI_NOTICE, LR018, SelfInfo() << " In ProcessSectorSet got !restorator.GoodSectorFlags", (LastGoodToWriteLogPosition, LastGoodToWriteLogPosition)); } else { - Y_VERIFY_S(ChunkIdx == LogEndChunkIdx && SectorIdx >= LogEndSectorIdx, SelfInfo() - << " File# " << __FILE__ - << " Line# " << __LINE__ - << " LogEndChunkIdx# " << LogEndChunkIdx - << " LogEndSectorIdx# " << LogEndSectorIdx); - if (!(ChunkIdx == LogEndChunkIdx && SectorIdx >= LogEndSectorIdx)) { + bool outsideLogEnd = ChunkIdx == LogEndChunkIdx && SectorIdx >= LogEndSectorIdx; + + if (!outsideLogEnd) { + // If read invalid data from the log (but not outside this owner's log bounds), check if the owner is on quarantine. + TGuard<TMutex> guard(PDisk->StateMutex); + TOwnerData &ownerData = PDisk->OwnerData[Owner]; + + if (ownerData.OnQuarantine) { + P_LOG(PRI_WARN, LR019, SelfInfo() + << " In ProcessSectorSet got !restorator.GoodSectorFlags with owner on quarantine", + (LogEndChunkIdx, LogEndChunkIdx), (LogEndSectorIdx, LogEndSectorIdx)); + ReplyOk(); + return true; + } + } + + Y_VERIFY_S(outsideLogEnd, SelfInfo() + << " File# " << __FILE__ + << " Line# " << __LINE__ + << " LogEndChunkIdx# " << LogEndChunkIdx + << " LogEndSectorIdx# " << LogEndSectorIdx); + + if (outsideLogEnd) { + // It's ok. P_LOG(PRI_WARN, LR004, SelfInfo() << " In ProcessSectorSet got !restorator.GoodSectorFlags outside the LogEndSector", (LogEndChunkIdx, LogEndChunkIdx), (LogEndSectorIdx, LogEndSectorIdx)); diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h index b14b5e20fb..52fe2074f3 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h @@ -23,7 +23,7 @@ enum class EInitPhase { }; enum EOwner { - OwnerSystem = 0, // Chunk0, SysLog chunks and CommonLog + just common log tracking, mens "for dynamic" in requests + OwnerSystem = 0, // Chunk0, SysLog chunks and CommonLog + just common log tracking, means "for dynamic" in requests OwnerUnallocated = 1, // Unallocated chunks, Trim scheduling, Slay commands OwnerBeginUser = 2, OwnerEndUser = 241, @@ -157,6 +157,10 @@ struct TOwnerData { return LogReader || InFlight->ChunkWrites.load() || InFlight->ChunkReads.load() || InFlight->LogWrites.load(); } + bool ReadingLog() const { + return bool(LogReader); + } + TString ToString() const { TStringStream str; str << "TOwnerData {"; diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h index ba195851f1..39ffff7e41 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h @@ -29,6 +29,7 @@ public: bool SmallDisk = false; bool SuppressCompatibilityCheck = false; bool UseSectorMap = true; + TAutoPtr<TLogBackend> LogBackend = nullptr; }; private: @@ -73,7 +74,11 @@ public: IoContext = std::make_shared<NPDisk::TIoContextFactoryOSS>(); appData->IoContextFactory = IoContext.get(); - Runtime->SetLogBackend(IsLowVerbose ? CreateStderrBackend() : CreateNullBackend()); + if (Settings.LogBackend) { + Runtime->SetLogBackend(Settings.LogBackend); + } else { + Runtime->SetLogBackend(IsLowVerbose ? CreateStderrBackend() : CreateNullBackend()); + } Runtime->Initialize(TTestActorRuntime::TEgg{appData.Release(), nullptr, {}, {}}); Runtime->SetLogPriority(NKikimrServices::BS_PDISK, NLog::PRI_NOTICE); Runtime->SetLogPriority(NKikimrServices::BS_PDISK_SYSLOG, NLog::PRI_NOTICE); diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_races.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_races.cpp index c74a8a94db..19a07954c5 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_races.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_races.cpp @@ -250,11 +250,11 @@ Y_UNIT_TEST_SUITE(TPDiskRaces) { } Y_UNIT_TEST(KillOwnerWhileDecommittingWithInflight) { - TestKillOwnerWhileDecommitting(false, 20, 0, 10, 100); + TestKillOwnerWhileDecommitting(false, 20, 50, 10, 100); } Y_UNIT_TEST(KillOwnerWhileDecommittingWithInflightMock) { - TestKillOwnerWhileDecommitting(true, 20, 0, 10, 100); + TestKillOwnerWhileDecommitting(true, 20, 50, 10, 100); } void OwnerRecreationRaces(bool usePDiskMock, ui32 timeLimit, ui32 vdisksNum) { @@ -321,6 +321,173 @@ Y_UNIT_TEST_SUITE(TPDiskRaces) { Y_UNIT_TEST(OwnerRecreationRaces) { OwnerRecreationRaces(false, 20, 1); } + + void TestKillOwnerWhileReadingLog(ui32 timeLimit) { + // This test is not deterministic, so we run it multiple times to increase the chance of catching the bug. + // We expect to see quarantined log chunks in the log at least once (however locally it was seen every time). + // The original bug was crashing the server, so this test also tests this and that's why it doesn't break the cycle + // upon encountering quarantined log chunks. + bool capturedQuarantinedLogChunks = false; + THPTimer timer; + while (timer.Passed() < timeLimit) { + TStringStream ss; + + TActorTestContext testCtx({ + .IsBad = false, + .UsePDiskMock = false, + .LogBackend = new TStreamLogBackend(&ss), + }); + const TString data = PrepareData(10_MB); + + auto logNoTest = [&](TVDiskMock& mock, NPDisk::TCommitRecord rec) { + TString dataCopy = data; + auto evLog = MakeHolder<NPDisk::TEvLog>(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound, 0, TRcBuf(dataCopy), + mock.GetLsnSeg(), nullptr); + evLog->Signature.SetCommitRecord(); + evLog->CommitRecord = std::move(rec); + testCtx.Send(evLog.Release()); + }; + + TVDiskMock mock(&testCtx); + mock.Init(); + + for (ui32 i = 0; i < 20; ++i) { + NPDisk::TCommitRecord rec; + logNoTest(mock, rec); + testCtx.Recv<NPDisk::TEvLogResult>(); + } + + testCtx.RestartPDiskSync(); + + mock.Init(); + + NPDisk::TLogPosition position{0, 0}; + + bool readCallbackCalled = false; + + testCtx.TestCtx.SectorMap->SetReadCallback([&]() { + if (!readCallbackCalled) { + readCallbackCalled = true; + + testCtx.Send(new NPDisk::TEvHarakiri(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound)); + } + }); + + testCtx.Send(new NPDisk::TEvReadLog(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound, position)); + + testCtx.Recv<NPDisk::TEvHarakiriResult>(); + + { + TVDiskMock mock(&testCtx); + mock.Init(); + + for (ui32 i = 0; i < 13; ++i) { + NPDisk::TCommitRecord rec; + logNoTest(mock, rec); + testCtx.Recv<NPDisk::TEvLogResult>(); + } + } + + if (!capturedQuarantinedLogChunks) { + TString log = ss.Str(); + capturedQuarantinedLogChunks = log.Contains("along with log chunks"); + } + } + + UNIT_ASSERT(capturedQuarantinedLogChunks); + } + + Y_UNIT_TEST(OwnerKilledWhileReadingLog) { + TestKillOwnerWhileReadingLog(20); + } + + void TestKillOwnerWhileReadingLogAndThenKillLastOwner(ui32 timeLimit) { + bool capturedQuarantinedLogChunks = false; + THPTimer timer; + while (timer.Passed() < timeLimit) { + TStringStream ss; + + TActorTestContext testCtx({ + .IsBad = false, + .UsePDiskMock = false, + .LogBackend = new TStreamLogBackend(&ss), + }); + const TString data = PrepareData(10_MB); + + auto logNoTest = [&](TVDiskMock& mock, NPDisk::TCommitRecord rec) { + TString dataCopy = data; + auto evLog = MakeHolder<NPDisk::TEvLog>(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound, 0, TRcBuf(dataCopy), + mock.GetLsnSeg(), nullptr); + evLog->Signature.SetCommitRecord(); + evLog->CommitRecord = std::move(rec); + testCtx.Send(evLog.Release()); + }; + + TVDiskMock mock1(&testCtx); + mock1.Init(); + + TVDiskMock mock2(&testCtx); + mock2.Init(); + + for (ui32 i = 0; i < 20; ++i) { + { + NPDisk::TCommitRecord rec; + logNoTest(mock1, rec); + testCtx.Recv<NPDisk::TEvLogResult>(); + } + { + NPDisk::TCommitRecord rec; + logNoTest(mock2, rec); + testCtx.Recv<NPDisk::TEvLogResult>(); + } + } + + testCtx.RestartPDiskSync(); + + mock1.Init(); + mock2.InitFull(); + + NPDisk::TLogPosition position{0, 0}; + + bool readCallbackCalled = false; + + testCtx.TestCtx.SectorMap->SetReadCallback([&]() { + if (!readCallbackCalled) { + readCallbackCalled = true; + + testCtx.Send(new NPDisk::TEvHarakiri(mock1.PDiskParams->Owner, mock1.PDiskParams->OwnerRound)); + testCtx.Send(new NPDisk::TEvHarakiri(mock2.PDiskParams->Owner, mock2.PDiskParams->OwnerRound)); + } + }); + + testCtx.Send(new NPDisk::TEvReadLog(mock1.PDiskParams->Owner, mock1.PDiskParams->OwnerRound, position)); + + testCtx.Recv<NPDisk::TEvHarakiriResult>(); + testCtx.Recv<NPDisk::TEvHarakiriResult>(); + + { + TVDiskMock mock(&testCtx); + mock.Init(); + + for (ui32 i = 0; i < 30; ++i) { + NPDisk::TCommitRecord rec; + logNoTest(mock, rec); + testCtx.Recv<NPDisk::TEvLogResult>(); + } + } + + if (!capturedQuarantinedLogChunks) { + TString log = ss.Str(); + capturedQuarantinedLogChunks = log.Contains("along with log chunks"); + } + } + + UNIT_ASSERT(capturedQuarantinedLogChunks); + } + + Y_UNIT_TEST(OwnerKilledWhileReadingLogAndThenKillLastOwner) { + TestKillOwnerWhileReadingLogAndThenKillLastOwner(20); + } } } diff --git a/ydb/library/pdisk_io/sector_map.h b/ydb/library/pdisk_io/sector_map.h index 1bad74f093..d88eab9e6f 100644 --- a/ydb/library/pdisk_io/sector_map.h +++ b/ydb/library/pdisk_io/sector_map.h @@ -200,6 +200,7 @@ private: THashMap<ui64, TString> Map; NSectorMap::EDiskMode DiskMode = NSectorMap::DM_NONE; THolder<NSectorMap::TSectorOperationThrottler> SectorOperationThrottler; + std::function<void()> ReadCallback = nullptr; public: TSectorMap(ui64 deviceSize = 0, NSectorMap::EDiskMode diskMode = NSectorMap::DM_NONE) @@ -272,6 +273,10 @@ public: if (SectorOperationThrottler.Get() != nullptr) { SectorOperationThrottler->ThrottleRead(dataSize, dataOffset, prevOperationIsInProgress, timer.Passed() * 1000); } + + if (ReadCallback) { + ReadCallback(); + } } void Write(const ui8 *data, i64 size, ui64 offset, bool prevOperationIsInProgress = false) { @@ -324,6 +329,11 @@ public: return Map.size() * NSectorMap::SECTOR_SIZE; } + void SetReadCallback(std::function<void()> callback) { + TGuard<TTicketLock> guard(MapLock); + ReadCallback = callback; + } + TString ToString() const { TStringStream str; str << "Serial# " << Serial.Quote() << "\n"; |