aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSemyon Danilov <senya@ydb.tech>2024-12-12 15:54:15 +0400
committerGitHub <noreply@github.com>2024-12-12 15:54:15 +0400
commiteafe9a1c693e0ccc99f0da1c4dcb1ed7d71e43cd (patch)
tree638c695cf5803005b13a40f6200b00c94181b1b4
parent61be59ff1afdd14ef115af0e4e3c8808e6df5e1f (diff)
downloadydb-eafe9a1c693e0ccc99f0da1c4dcb1ed7d71e43cd.tar.gz
Put log chunks from killed owners on quarantine, if owner is still reading log (#11795)
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp57
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h2
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_logreader.cpp30
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h6
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h7
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_races.cpp171
-rw-r--r--ydb/library/pdisk_io/sector_map.h10
7 files changed, 260 insertions, 23 deletions
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp
index 40516d8477..a63a204f2a 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp
@@ -2112,16 +2112,20 @@ void TPDisk::KillOwner(TOwner owner, TOwnerRound killOwnerRound, TCompletionEven
}
TryTrimChunk(false, 0, NWilson::TSpan{});
+ bool readingLog = OwnerData[owner].ReadingLog();
ui64 lastSeenLsn = 0;
auto it = LogChunks.begin();
while (it != LogChunks.end()) {
if (it->OwnerLsnRange.size() > owner && it->OwnerLsnRange[owner].IsPresent) {
- Y_ABORT_UNLESS(it->CurrentUserCount > 0);
- it->CurrentUserCount--;
- it->OwnerLsnRange[owner].IsPresent = false;
- it->OwnerLsnRange[owner].FirstLsn = 0;
lastSeenLsn = Max(it->OwnerLsnRange[owner].LastLsn, lastSeenLsn);
- it->OwnerLsnRange[owner].LastLsn = 0;
+
+ if (!readingLog) {
+ Y_ABORT_UNLESS(it->CurrentUserCount > 0);
+ it->CurrentUserCount--;
+ it->OwnerLsnRange[owner].IsPresent = false;
+ it->OwnerLsnRange[owner].FirstLsn = 0;
+ it->OwnerLsnRange[owner].LastLsn = 0;
+ }
}
++it;
}
@@ -2372,22 +2376,51 @@ void TPDisk::ClearQuarantineChunks() {
*Mon.QuarantineChunks = QuarantineChunks.size();
}
+ bool haveChunksToRelease = false;
+
{
const auto it = std::partition(QuarantineOwners.begin(), QuarantineOwners.end(), [&] (TOwner i) {
return Keeper.GetOwnerUsed(i) || OwnerData[i].HaveRequestsInFlight();
});
for (auto delIt = it; delIt != QuarantineOwners.end(); ++delIt) {
- ADD_RECORD_WITH_TIMESTAMP_TO_OPERATION_LOG(OwnerData[*delIt].OperationLog, "Remove owner from quarantine, OwnerId# " << *delIt);
- TOwnerRound ownerRound = OwnerData[*delIt].OwnerRound;
- OwnerData[*delIt].Reset(false);
- OwnerData[*delIt].OwnerRound = ownerRound;
- Keeper.RemoveOwner(*delIt);
- P_LOG(PRI_NOTICE, BPD01, "removed owner from chunks Keeper through QuarantineOwners",
- (OwnerId, (ui32)*delIt));
+ TOwner owner = *delIt;
+ ADD_RECORD_WITH_TIMESTAMP_TO_OPERATION_LOG(OwnerData[owner].OperationLog, "Remove owner from quarantine, OwnerId# " << owner);
+ TOwnerRound ownerRound = OwnerData[owner].OwnerRound;
+ OwnerData[owner].Reset(false);
+ OwnerData[owner].OwnerRound = ownerRound;
+ Keeper.RemoveOwner(owner);
+
+ ui64 lastSeenLsn = 0;
+ auto it = LogChunks.begin();
+ while (it != LogChunks.end()) {
+ if (it->OwnerLsnRange.size() > owner && it->OwnerLsnRange[owner].IsPresent) {
+ Y_ABORT_UNLESS(it->CurrentUserCount > 0);
+ ui32 userCount = --it->CurrentUserCount;
+ it->OwnerLsnRange[owner].IsPresent = false;
+ it->OwnerLsnRange[owner].FirstLsn = 0;
+ lastSeenLsn = Max(it->OwnerLsnRange[owner].LastLsn, lastSeenLsn);
+ it->OwnerLsnRange[owner].LastLsn = 0;
+
+ if (userCount == 0) {
+ haveChunksToRelease = true;
+ }
+ }
+ ++it;
+ }
+
+ P_LOG(PRI_NOTICE, BPD01, "removed owner from chunks Keeper through QuarantineOwners" << (haveChunksToRelease ? " along with log chunks" : ""),
+ (OwnerId, (ui32)owner), (LastSeenLsn, lastSeenLsn));
}
QuarantineOwners.erase(it, QuarantineOwners.end());
*Mon.QuarantineOwners = QuarantineOwners.size();
}
+
+ if (haveChunksToRelease) {
+ THolder<TCompletionEventSender> completion(new TCompletionEventSender(this));
+ if (ReleaseUnusedLogChunks(completion.Get())) {
+ WriteSysLogRestorePoint(completion.Release(), TReqId(TReqId::KillOwnerSysLog, 0), {});
+ }
+ }
}
// Should be called to initiate TRIM (on chunk delete or prev trim done)
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h
index b3779de337..1964bf297a 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h
@@ -148,7 +148,7 @@ public:
ui64 InsaneLogChunks = 0; // Set when pdisk sees insanely large log, to give vdisks a chance to cut it
ui32 FirstLogChunkToParseCommits = 0;
- // Chunks that is owned by killed owner, but has operations InFlight
+ // Chunks that are owned by killed owner, but have operations InFlight
TVector<TChunkIdx> QuarantineChunks;
TVector<TOwner> QuarantineOwners;
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_logreader.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_logreader.cpp
index 46c380840a..30ca52fb9b 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_logreader.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_logreader.cpp
@@ -797,12 +797,30 @@ bool TLogReader::ProcessSectorSet(TSectorData *sector) {
P_LOG(PRI_NOTICE, LR018, SelfInfo() << " In ProcessSectorSet got !restorator.GoodSectorFlags",
(LastGoodToWriteLogPosition, LastGoodToWriteLogPosition));
} else {
- Y_VERIFY_S(ChunkIdx == LogEndChunkIdx && SectorIdx >= LogEndSectorIdx, SelfInfo()
- << " File# " << __FILE__
- << " Line# " << __LINE__
- << " LogEndChunkIdx# " << LogEndChunkIdx
- << " LogEndSectorIdx# " << LogEndSectorIdx);
- if (!(ChunkIdx == LogEndChunkIdx && SectorIdx >= LogEndSectorIdx)) {
+ bool outsideLogEnd = ChunkIdx == LogEndChunkIdx && SectorIdx >= LogEndSectorIdx;
+
+ if (!outsideLogEnd) {
+ // If read invalid data from the log (but not outside this owner's log bounds), check if the owner is on quarantine.
+ TGuard<TMutex> guard(PDisk->StateMutex);
+ TOwnerData &ownerData = PDisk->OwnerData[Owner];
+
+ if (ownerData.OnQuarantine) {
+ P_LOG(PRI_WARN, LR019, SelfInfo()
+ << " In ProcessSectorSet got !restorator.GoodSectorFlags with owner on quarantine",
+ (LogEndChunkIdx, LogEndChunkIdx), (LogEndSectorIdx, LogEndSectorIdx));
+ ReplyOk();
+ return true;
+ }
+ }
+
+ Y_VERIFY_S(outsideLogEnd, SelfInfo()
+ << " File# " << __FILE__
+ << " Line# " << __LINE__
+ << " LogEndChunkIdx# " << LogEndChunkIdx
+ << " LogEndSectorIdx# " << LogEndSectorIdx);
+
+ if (outsideLogEnd) {
+ // It's ok.
P_LOG(PRI_WARN, LR004, SelfInfo()
<< " In ProcessSectorSet got !restorator.GoodSectorFlags outside the LogEndSector",
(LogEndChunkIdx, LogEndChunkIdx), (LogEndSectorIdx, LogEndSectorIdx));
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h
index b14b5e20fb..52fe2074f3 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h
@@ -23,7 +23,7 @@ enum class EInitPhase {
};
enum EOwner {
- OwnerSystem = 0, // Chunk0, SysLog chunks and CommonLog + just common log tracking, mens "for dynamic" in requests
+ OwnerSystem = 0, // Chunk0, SysLog chunks and CommonLog + just common log tracking, means "for dynamic" in requests
OwnerUnallocated = 1, // Unallocated chunks, Trim scheduling, Slay commands
OwnerBeginUser = 2,
OwnerEndUser = 241,
@@ -157,6 +157,10 @@ struct TOwnerData {
return LogReader || InFlight->ChunkWrites.load() || InFlight->ChunkReads.load() || InFlight->LogWrites.load();
}
+ bool ReadingLog() const {
+ return bool(LogReader);
+ }
+
TString ToString() const {
TStringStream str;
str << "TOwnerData {";
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h
index ba195851f1..39ffff7e41 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h
@@ -29,6 +29,7 @@ public:
bool SmallDisk = false;
bool SuppressCompatibilityCheck = false;
bool UseSectorMap = true;
+ TAutoPtr<TLogBackend> LogBackend = nullptr;
};
private:
@@ -73,7 +74,11 @@ public:
IoContext = std::make_shared<NPDisk::TIoContextFactoryOSS>();
appData->IoContextFactory = IoContext.get();
- Runtime->SetLogBackend(IsLowVerbose ? CreateStderrBackend() : CreateNullBackend());
+ if (Settings.LogBackend) {
+ Runtime->SetLogBackend(Settings.LogBackend);
+ } else {
+ Runtime->SetLogBackend(IsLowVerbose ? CreateStderrBackend() : CreateNullBackend());
+ }
Runtime->Initialize(TTestActorRuntime::TEgg{appData.Release(), nullptr, {}, {}});
Runtime->SetLogPriority(NKikimrServices::BS_PDISK, NLog::PRI_NOTICE);
Runtime->SetLogPriority(NKikimrServices::BS_PDISK_SYSLOG, NLog::PRI_NOTICE);
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_races.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_races.cpp
index c74a8a94db..19a07954c5 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_races.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_races.cpp
@@ -250,11 +250,11 @@ Y_UNIT_TEST_SUITE(TPDiskRaces) {
}
Y_UNIT_TEST(KillOwnerWhileDecommittingWithInflight) {
- TestKillOwnerWhileDecommitting(false, 20, 0, 10, 100);
+ TestKillOwnerWhileDecommitting(false, 20, 50, 10, 100);
}
Y_UNIT_TEST(KillOwnerWhileDecommittingWithInflightMock) {
- TestKillOwnerWhileDecommitting(true, 20, 0, 10, 100);
+ TestKillOwnerWhileDecommitting(true, 20, 50, 10, 100);
}
void OwnerRecreationRaces(bool usePDiskMock, ui32 timeLimit, ui32 vdisksNum) {
@@ -321,6 +321,173 @@ Y_UNIT_TEST_SUITE(TPDiskRaces) {
Y_UNIT_TEST(OwnerRecreationRaces) {
OwnerRecreationRaces(false, 20, 1);
}
+
+ void TestKillOwnerWhileReadingLog(ui32 timeLimit) {
+ // This test is not deterministic, so we run it multiple times to increase the chance of catching the bug.
+ // We expect to see quarantined log chunks in the log at least once (however locally it was seen every time).
+ // The original bug was crashing the server, so this test also tests this and that's why it doesn't break the cycle
+ // upon encountering quarantined log chunks.
+ bool capturedQuarantinedLogChunks = false;
+ THPTimer timer;
+ while (timer.Passed() < timeLimit) {
+ TStringStream ss;
+
+ TActorTestContext testCtx({
+ .IsBad = false,
+ .UsePDiskMock = false,
+ .LogBackend = new TStreamLogBackend(&ss),
+ });
+ const TString data = PrepareData(10_MB);
+
+ auto logNoTest = [&](TVDiskMock& mock, NPDisk::TCommitRecord rec) {
+ TString dataCopy = data;
+ auto evLog = MakeHolder<NPDisk::TEvLog>(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound, 0, TRcBuf(dataCopy),
+ mock.GetLsnSeg(), nullptr);
+ evLog->Signature.SetCommitRecord();
+ evLog->CommitRecord = std::move(rec);
+ testCtx.Send(evLog.Release());
+ };
+
+ TVDiskMock mock(&testCtx);
+ mock.Init();
+
+ for (ui32 i = 0; i < 20; ++i) {
+ NPDisk::TCommitRecord rec;
+ logNoTest(mock, rec);
+ testCtx.Recv<NPDisk::TEvLogResult>();
+ }
+
+ testCtx.RestartPDiskSync();
+
+ mock.Init();
+
+ NPDisk::TLogPosition position{0, 0};
+
+ bool readCallbackCalled = false;
+
+ testCtx.TestCtx.SectorMap->SetReadCallback([&]() {
+ if (!readCallbackCalled) {
+ readCallbackCalled = true;
+
+ testCtx.Send(new NPDisk::TEvHarakiri(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound));
+ }
+ });
+
+ testCtx.Send(new NPDisk::TEvReadLog(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound, position));
+
+ testCtx.Recv<NPDisk::TEvHarakiriResult>();
+
+ {
+ TVDiskMock mock(&testCtx);
+ mock.Init();
+
+ for (ui32 i = 0; i < 13; ++i) {
+ NPDisk::TCommitRecord rec;
+ logNoTest(mock, rec);
+ testCtx.Recv<NPDisk::TEvLogResult>();
+ }
+ }
+
+ if (!capturedQuarantinedLogChunks) {
+ TString log = ss.Str();
+ capturedQuarantinedLogChunks = log.Contains("along with log chunks");
+ }
+ }
+
+ UNIT_ASSERT(capturedQuarantinedLogChunks);
+ }
+
+ Y_UNIT_TEST(OwnerKilledWhileReadingLog) {
+ TestKillOwnerWhileReadingLog(20);
+ }
+
+ void TestKillOwnerWhileReadingLogAndThenKillLastOwner(ui32 timeLimit) {
+ bool capturedQuarantinedLogChunks = false;
+ THPTimer timer;
+ while (timer.Passed() < timeLimit) {
+ TStringStream ss;
+
+ TActorTestContext testCtx({
+ .IsBad = false,
+ .UsePDiskMock = false,
+ .LogBackend = new TStreamLogBackend(&ss),
+ });
+ const TString data = PrepareData(10_MB);
+
+ auto logNoTest = [&](TVDiskMock& mock, NPDisk::TCommitRecord rec) {
+ TString dataCopy = data;
+ auto evLog = MakeHolder<NPDisk::TEvLog>(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound, 0, TRcBuf(dataCopy),
+ mock.GetLsnSeg(), nullptr);
+ evLog->Signature.SetCommitRecord();
+ evLog->CommitRecord = std::move(rec);
+ testCtx.Send(evLog.Release());
+ };
+
+ TVDiskMock mock1(&testCtx);
+ mock1.Init();
+
+ TVDiskMock mock2(&testCtx);
+ mock2.Init();
+
+ for (ui32 i = 0; i < 20; ++i) {
+ {
+ NPDisk::TCommitRecord rec;
+ logNoTest(mock1, rec);
+ testCtx.Recv<NPDisk::TEvLogResult>();
+ }
+ {
+ NPDisk::TCommitRecord rec;
+ logNoTest(mock2, rec);
+ testCtx.Recv<NPDisk::TEvLogResult>();
+ }
+ }
+
+ testCtx.RestartPDiskSync();
+
+ mock1.Init();
+ mock2.InitFull();
+
+ NPDisk::TLogPosition position{0, 0};
+
+ bool readCallbackCalled = false;
+
+ testCtx.TestCtx.SectorMap->SetReadCallback([&]() {
+ if (!readCallbackCalled) {
+ readCallbackCalled = true;
+
+ testCtx.Send(new NPDisk::TEvHarakiri(mock1.PDiskParams->Owner, mock1.PDiskParams->OwnerRound));
+ testCtx.Send(new NPDisk::TEvHarakiri(mock2.PDiskParams->Owner, mock2.PDiskParams->OwnerRound));
+ }
+ });
+
+ testCtx.Send(new NPDisk::TEvReadLog(mock1.PDiskParams->Owner, mock1.PDiskParams->OwnerRound, position));
+
+ testCtx.Recv<NPDisk::TEvHarakiriResult>();
+ testCtx.Recv<NPDisk::TEvHarakiriResult>();
+
+ {
+ TVDiskMock mock(&testCtx);
+ mock.Init();
+
+ for (ui32 i = 0; i < 30; ++i) {
+ NPDisk::TCommitRecord rec;
+ logNoTest(mock, rec);
+ testCtx.Recv<NPDisk::TEvLogResult>();
+ }
+ }
+
+ if (!capturedQuarantinedLogChunks) {
+ TString log = ss.Str();
+ capturedQuarantinedLogChunks = log.Contains("along with log chunks");
+ }
+ }
+
+ UNIT_ASSERT(capturedQuarantinedLogChunks);
+ }
+
+ Y_UNIT_TEST(OwnerKilledWhileReadingLogAndThenKillLastOwner) {
+ TestKillOwnerWhileReadingLogAndThenKillLastOwner(20);
+ }
}
}
diff --git a/ydb/library/pdisk_io/sector_map.h b/ydb/library/pdisk_io/sector_map.h
index 1bad74f093..d88eab9e6f 100644
--- a/ydb/library/pdisk_io/sector_map.h
+++ b/ydb/library/pdisk_io/sector_map.h
@@ -200,6 +200,7 @@ private:
THashMap<ui64, TString> Map;
NSectorMap::EDiskMode DiskMode = NSectorMap::DM_NONE;
THolder<NSectorMap::TSectorOperationThrottler> SectorOperationThrottler;
+ std::function<void()> ReadCallback = nullptr;
public:
TSectorMap(ui64 deviceSize = 0, NSectorMap::EDiskMode diskMode = NSectorMap::DM_NONE)
@@ -272,6 +273,10 @@ public:
if (SectorOperationThrottler.Get() != nullptr) {
SectorOperationThrottler->ThrottleRead(dataSize, dataOffset, prevOperationIsInProgress, timer.Passed() * 1000);
}
+
+ if (ReadCallback) {
+ ReadCallback();
+ }
}
void Write(const ui8 *data, i64 size, ui64 offset, bool prevOperationIsInProgress = false) {
@@ -324,6 +329,11 @@ public:
return Map.size() * NSectorMap::SECTOR_SIZE;
}
+ void SetReadCallback(std::function<void()> callback) {
+ TGuard<TTicketLock> guard(MapLock);
+ ReadCallback = callback;
+ }
+
TString ToString() const {
TStringStream str;
str << "Serial# " << Serial.Quote() << "\n";