aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-04-13 09:55:01 +0300
committeralexvru <alexvru@ydb.tech>2023-04-13 09:55:01 +0300
commit5453009c8feef430339b6d7dbeb811a8dbb73d50 (patch)
treeb84fa66015fb17319f427b0326dc31924120661a
parent6d0c4e1ba27ee43298c7bfc6c857f8190c6ab337 (diff)
downloadydb-5453009c8feef430339b6d7dbeb811a8dbb73d50.tar.gz
Fix PDiskFIT
-rw-r--r--ydb/core/blobstorage/ut_pdiskfit/lib/basic_test.cpp152
-rw-r--r--ydb/core/blobstorage/ut_pdiskfit/lib/basic_test.h4
-rw-r--r--ydb/core/blobstorage/ut_pdiskfit/lib/fail_injection_test.h20
3 files changed, 120 insertions, 56 deletions
diff --git a/ydb/core/blobstorage/ut_pdiskfit/lib/basic_test.cpp b/ydb/core/blobstorage/ut_pdiskfit/lib/basic_test.cpp
index 22f28131794..d9dd9505126 100644
--- a/ydb/core/blobstorage/ut_pdiskfit/lib/basic_test.cpp
+++ b/ydb/core/blobstorage/ut_pdiskfit/lib/basic_test.cpp
@@ -154,7 +154,39 @@ class TFakeVDisk
}
str << x.Lsn << ":" << x.Signature.ToString() << ":" << x.DataLen;
}
- str << "] FirstLsnToKeep# " << FirstLsnToKeep << "}";
+ str << "] FirstLsnToKeep# " << FirstLsnToKeep;
+ str << " Chunks# {";
+ first = true;
+ for (const auto& [chunkIdx, chunk] : Chunks) {
+ if (first) {
+ first = false;
+ } else {
+ str << ' ';
+ }
+ str << chunkIdx << ":";
+ switch (chunk.GetCommitState()) {
+ case ECommitState::RESERVED:
+ str << 'R';
+ break;
+
+ case ECommitState::COMMIT_IN_PROGRESS:
+ str << 'c';
+ break;
+
+ case ECommitState::COMMITTED:
+ str << 'C';
+ break;
+
+ case ECommitState::DELETE_IN_PROGRESS:
+ str << 'd';
+ break;
+
+ case ECommitState::DELETED:
+ Y_FAIL();
+ }
+ }
+ str << '}';
+ str << '}';
return str.Str();
}
@@ -273,6 +305,9 @@ class TFakeVDisk
bool StateVerified = false;
+ std::vector<ui32> ChunksToForget;
+ THashSet<ui32> OwnedChunks;
+
public:
TFakeVDisk(const TVDiskID& vdiskId, const TActorId& pdiskServiceId, ui64 pdiskGuid, TStateManager *stateManager,
TFakeVDiskParams params)
@@ -321,7 +356,7 @@ public:
}
TString SelfInfo() const {
- return TStringBuilder() << " VDiskId# " << VDiskId.ToStringWOGeneration() << " Owner# " << PDiskParams->Owner;
+ return TStringBuilder() << "VDiskId# " << VDiskId.ToStringWOGeneration() << " Owner# " << PDiskParams->Owner;
}
void Bootstrap(const TActorContext& ctx) {
@@ -335,29 +370,32 @@ public:
PDiskParams = msg->PDiskParams;
State.BlocksInChunk = PDiskParams->ChunkSize / PDiskParams->AppendBlockSize;
- THashSet<TChunkIdx> owned(msg->OwnedChunks.begin(), msg->OwnedChunks.end());
+ TStringStream str;
+ str << SelfInfo() << " starting, owned chunks# " << FormatList(msg->OwnedChunks) << Endl;
+ Cerr << str.Str();
+
+ OwnedChunks = {msg->OwnedChunks.begin(), msg->OwnedChunks.end()};
+ IssueReadLogRequest(ctx);
+ }
+
+ void VerifyOwnedChunks(const TActorContext& /*ctx*/) {
for (const auto& [idx, info] : Recovered.Chunks) {
if (info.GetCommitState() == ECommitState::COMMITTED) {
- Y_VERIFY_S(owned.count(idx), SelfInfo() << " has commited chunk# " << idx << " from Recovered.Chunks,"
+ Y_VERIFY_S(OwnedChunks.count(idx), SelfInfo() << " has commited chunk# " << idx << " from Recovered.Chunks,"
<< " but can't find it in OwnedChunks list from PDisk");
- owned.erase(idx);
+ OwnedChunks.erase(idx);
}
}
- for (auto idx : owned) {
+ for (auto idx : OwnedChunks) {
auto it = Recovered.Chunks.find(idx);
Y_VERIFY_S(it != Recovered.Chunks.end(), SelfInfo() << " has owned chunk# " << idx
- << " from PDisks's OwnedChunks, but can't find it in Recovered list");
+ << " from PDisks's OwnedChunks, but can't find it in Recovered list"
+ << " Recovered# " << Recovered.ToString());
auto& info = it->second;
Y_VERIFY(info.GetCommitState() == ECommitState::COMMIT_IN_PROGRESS ||
info.GetCommitState() == ECommitState::DELETE_IN_PROGRESS);
}
-
- TStringStream str;
- str << SelfInfo() << " starting, owned chunks# " << FormatList(msg->OwnedChunks) << Endl;
- Cerr << str.Str();
-
- IssueReadLogRequest(ctx);
}
void IssueReadLogRequest(const TActorContext& ctx) {
@@ -385,6 +423,7 @@ public:
if (msg->IsEndOfLog) {
VerifyRecoveredLog(ctx);
+ VerifyOwnedChunks(ctx);
} else {
ReadLogPosition = msg->NextPosition;
IssueReadLogRequest(ctx);
@@ -401,11 +440,14 @@ public:
// there is a record in current set (recovered from PDisk), but not in previous set (stored state); there
// may be an item in flight?
auto it = Recovered.InFlight.find(*current);
- Y_VERIFY_S(it != Recovered.InFlight.end(), "unexpected log record " << SelfInfo() << " Lsn# " << current->Lsn
- << " Signature# " << current->Signature.ToString());
- Y_VERIFY_S(it->DataLen == current->DataLen && it->Checksum == current->Checksum &&
- it->Signature == current->Signature, SelfInfo() << "Lsn# " << it->Lsn
+ Y_VERIFY_S(it != Recovered.InFlight.end() || current->Lsn < Recovered.FirstLsnToKeep,
+ "unexpected log record " << SelfInfo() << " Lsn# " << current->Lsn
+ << " Signature# " << current->Signature.ToString());
+ if (it != Recovered.InFlight.end()) {
+ Y_VERIFY_S(it->DataLen == current->DataLen && it->Checksum == current->Checksum &&
+ it->Signature == current->Signature, SelfInfo() << " Lsn# " << it->Lsn
<< " InFlightData# " << it->DataLen << " StoredData# " << current->DataLen);
+ }
} else if (prev) {
// lost item
if (prev->Lsn >= Recovered.FirstLsnToKeep) {
@@ -475,8 +517,9 @@ public:
ui64 writeLogScore = 100;
ui64 allocateScore = State.Chunks.size() < 20 ? 10 : 0;
ui64 writeScore = State.Chunks.empty() ? 0 : 5;
+ ui64 forgetScore = ChunksToForget.empty() ? 0 : ChunksToForget.size() < 100 ? 10 : 1000;
- ui64 totalScore = writeLogScore + allocateScore + writeScore;
+ ui64 totalScore = writeLogScore + allocateScore + writeScore + forgetScore;
if (!totalScore) {
// nothing to do
break;
@@ -491,6 +534,12 @@ public:
IssueAllocateRequest(ctx);
} else if ((option -= allocateScore) < writeScore) {
IssueWriteRequest(ctx);
+ } else if ((option -= writeScore) < forgetScore) {
+ const size_t index = RandomNumber(ChunksToForget.size());
+ ui32& chunk = ChunksToForget[index];
+ SendPDiskRequest(ctx, new NPDisk::TEvChunkForget(PDiskParams->Owner, PDiskParams->OwnerRound, {chunk}), [] {});
+ std::swap(chunk, ChunksToForget.back());
+ ChunksToForget.pop_back();
} else {
Y_FAIL("unexpected option");
}
@@ -513,26 +562,23 @@ public:
TState::TChunkInfo& chunk = pair.second;
switch (chunk.GetCommitState()) {
case ECommitState::RESERVED:
- // reserved chunk is a subject for commit; 1% chance to commit chunk
- if (RandomNumber<double>() < 0.01) {
+ // reserved chunk is a subject for commit; 10% chance to commit chunk
+ if (RandomNumber<double>() < 0.1) {
info->CommitChunks.push_back(chunkIdx);
chunk.SetCommitState(ECommitState::COMMIT_IN_PROGRESS);
+ break;
}
- break;
-
+ [[fallthrough]];
case ECommitState::COMMITTED:
- // committed chunk is a subject for deletion; 0.5% change to delete chunk
- if (RandomNumber<double>() < 0.005) {
- bool hasWrites = false;
- for (const TWriteRecord& w : State.WritesInFlight) {
- if (w.ChunkIdx == chunkIdx) {
- hasWrites = true;
- break;
+ // committed chunk is a subject for deletion; 10% chance to delete chunk
+ if (RandomNumber<double>() < 0.1) {
+ for (auto it = State.WritesInFlight.begin(); it != State.WritesInFlight.end(); ) {
+ if (it->ChunkIdx == chunkIdx) {
+ it = State.WritesInFlight.erase(it);
+ } else {
+ ++it;
}
}
- if (hasWrites) {
- break;
- }
info->DeleteChunks.push_back(chunkIdx);
chunk.SetCommitState(ECommitState::DELETE_IN_PROGRESS);
}
@@ -545,7 +591,7 @@ public:
NPDisk::TCommitRecord cr;
- // advance LSN every 30000 items avg
+ // advance LSN every 500 items avg
if (Lsn > Params.LsnToKeepCount && RandomNumber<double>() < Params.LogCutProbability) {
cr.FirstLsnToKeep = Lsn - Params.LsnToKeepCount;
cr.IsStartingPoint = true; // make starting point if we cut log
@@ -554,6 +600,7 @@ public:
// fill in commit/delete records
cr.CommitChunks = info->CommitChunks;
cr.DeleteChunks = info->DeleteChunks;
+ cr.DeleteToDecommitted = RandomNumber(2u);
cr.IsStartingPoint = cr.IsStartingPoint || cr.CommitChunks || cr.DeleteChunks;
if (cr.FirstLsnToKeep) {
@@ -562,19 +609,17 @@ public:
Cerr << str.Str();
}
- TStringStream msg;
- msg << "TEvLog " << SelfInfo() << " Lsn# " << Lsn << " Size# " << info->DataLen;
- auto printChunks = [] (TVector<TChunkIdx> chunks) {
- bool first = true;
- TStringStream str;
- for (TChunkIdx chunk : chunks) {
- str << (first ? first = false, "" : " ") << chunk;
- }
- return str.Str();
- };
- msg << " Commit# " << printChunks(cr.CommitChunks) << " Delete# " << printChunks(cr.DeleteChunks)
- << " IsStartingPoint# " << (cr.IsStartingPoint ? "true" : "false") << Endl;
+ if (cr.DeleteToDecommitted) {
+ ChunksToForget.insert(ChunksToForget.end(), cr.DeleteChunks.begin(), cr.DeleteChunks.end());
+ }
+
if (cr.IsStartingPoint || cr.CommitChunks || cr.DeleteChunks) {
+ TStringStream msg;
+ msg << "TEvLog " << SelfInfo() << " Lsn# " << Lsn << " Size# " << info->DataLen
+ << " Commit# " << FormatList(cr.CommitChunks) << " Delete# " << FormatList(cr.DeleteChunks)
+ << " IsStartingPoint# " << (cr.IsStartingPoint ? "true" : "false")
+ << " DeleteToDecommitted# " << (cr.DeleteToDecommitted ? "true" : "false")
+ << Endl;
Cerr << msg.Str();
}
@@ -615,6 +660,7 @@ public:
// apply chunk commits
for (TChunkIdx chunkIdx : info->CommitChunks) {
TState::TChunkInfo& chunk = State.GetChunk(chunkIdx);
+ Cerr << (TStringBuilder() << SelfInfo() << " Chunk COMMITTED ChunkIdx# " << chunkIdx << Endl);
chunk.SetCommitState(success ? ECommitState::COMMITTED : ECommitState::RESERVED);
}
@@ -625,6 +671,7 @@ public:
TState::TChunkInfo& chunk = it->second;
chunk.SetCommitState(success ? ECommitState::DELETED : ECommitState::COMMITTED);
if (chunk.GetCommitState() == ECommitState::DELETED) {
+ Cerr << (TStringBuilder() << SelfInfo() << " Chunk DELETED ChunkIdx# " << chunkIdx << Endl);
State.Chunks.erase(it);
}
}
@@ -749,6 +796,11 @@ public:
void Handle(NPDisk::TEvChunkWriteResult::TPtr& ev, const TActorContext& ctx) {
auto *msg = ev->Get();
+ if (const auto it = State.Chunks.find(msg->ChunkIdx); it == State.Chunks.end() ||
+ it->second.GetCommitState() == ECommitState::DELETE_IN_PROGRESS) {
+ return Activity(ctx); // ignore this write result
+ }
+
Y_VERIFY(msg->Status == NKikimrProto::OK);
StateManager->ExecuteConsistentAction([&] {
@@ -810,6 +862,7 @@ public:
|| chunk.GetCommitState() == ECommitState::DELETE_IN_PROGRESS,
"ChunkIdx# %" PRIu32 " CommitState# %" PRIu32, msg->ChunkIdx,
static_cast<ui32>(chunk.GetCommitState()));
+ const bool wasDeleteInProgress = chunk.GetCommitState() == ECommitState::DELETE_IN_PROGRESS;
chunk.SetCommitState(ECommitState::COMMITTED);
ui32 offset = 0;
@@ -838,7 +891,7 @@ public:
break;
}
}
- Y_VERIFY(it != Recovered.WritesInFlight.end() || chunk.VerifyChecksum(i, checksum),
+ Y_VERIFY(it != Recovered.WritesInFlight.end() || chunk.VerifyChecksum(i, checksum) || wasDeleteInProgress,
"inconsistent chunk data ChunkIdx# %" PRIu32 " OffsetInBlocks# %" PRIu32 " Used# %s"
" Checksum# %08" PRIx32 " StoredChecksum# %08" PRIx32 " WritesInFlight# %s", msg->ChunkIdx, i,
chunk.IsUsed(i) ? "true" : "false", checksum, chunk.GetChecksum(i), s.Str().data());
@@ -889,6 +942,12 @@ public:
return data;
}
+ void Handle(NPDisk::TEvChunkForgetResult::TPtr& ev, const TActorContext& /*ctx*/) {
+ if (ev->Get()->Status != NKikimrProto::OK) {
+ Cerr << (TStringBuilder() << VDiskId << " TEvChunkForgetResult# " << ev->Get()->ToString() << Endl);
+ }
+ }
+
STFUNC(StateFunc) {
switch (const ui32 type = ev->GetTypeRewrite()) {
CFunc(TEvents::TSystem::Bootstrap, Bootstrap);
@@ -898,6 +957,7 @@ public:
HFunc(NPDisk::TEvChunkReserveResult, Handle);
HFunc(NPDisk::TEvChunkWriteResult, Handle);
HFunc(NPDisk::TEvChunkReadResult, Handle);
+ HFunc(NPDisk::TEvChunkForgetResult, Handle);
default: Y_FAIL("unexpected message 0x%08" PRIx32, type);
}
}
diff --git a/ydb/core/blobstorage/ut_pdiskfit/lib/basic_test.h b/ydb/core/blobstorage/ut_pdiskfit/lib/basic_test.h
index f9e1bcf05bb..e046fdb8472 100644
--- a/ydb/core/blobstorage/ut_pdiskfit/lib/basic_test.h
+++ b/ydb/core/blobstorage/ut_pdiskfit/lib/basic_test.h
@@ -19,10 +19,10 @@ struct TFakeVDiskParams {
// LogRecord size distribution
ui32 SizeMin = 1000;
- ui32 SizeMax = 2000;
+ ui32 SizeMax = 100000;
ui32 LsnToKeepCount = 1000;
- double LogCutProbability = 1.0 / 30000;
+ double LogCutProbability = 1.0 / 500;
};
diff --git a/ydb/core/blobstorage/ut_pdiskfit/lib/fail_injection_test.h b/ydb/core/blobstorage/ut_pdiskfit/lib/fail_injection_test.h
index 7b7abf4818e..c53155f3669 100644
--- a/ydb/core/blobstorage/ut_pdiskfit/lib/fail_injection_test.h
+++ b/ydb/core/blobstorage/ut_pdiskfit/lib/fail_injection_test.h
@@ -37,7 +37,11 @@ public:
AtomicSet(FailCounter, failCounter);
}
- void Inject(ui64 /*cookie*/) {
+ void Inject(ui64 cookie) {
+ if (cookie == 2 && RandomNumber(10u) == 0) {
+ AtomicSet(FailCounter, 1 + RandomNumber(24u));
+ }
+
TAtomicBase result = AtomicDecrement(FailCounter);
if (result < 0) {
// overshoot, return one position back
@@ -91,10 +95,10 @@ ui32 GenerateFailCounter(bool frequentFails) {
double p = (rng() % (1000 * 1000 * 1000)) / 1e9;
- if (frequentFails) {
+ if (!frequentFails) {
return p < 0.05 ? rng() % 10 + 1
- : p < 0.10 ? rng() % 1000 + 1000 :
- rng() % 5000 + 5000;
+ : p < 0.10 ? rng() % 100 + 100 :
+ rng() % 1000 + 1000;
} else {
return p < 0.9 ? rng() % 10 + 1
: p < 0.99 ? rng() % 100 + 100 :
@@ -183,10 +187,10 @@ struct TPDiskFailureInjectionTest {
setup->NodeId = 1;
setup->ExecutorsCount = 4; // system, user, io, batch
setup->Executors.Reset(new TAutoPtr<IExecutorPool>[setup->ExecutorsCount]);
- setup->Executors[0] = new TBasicExecutorPool(AppData->SystemPoolId, 8, 10);
- setup->Executors[1] = new TBasicExecutorPool(AppData->UserPoolId, 8, 10);
+ setup->Executors[0] = new TBasicExecutorPool(AppData->SystemPoolId, 16, 10);
+ setup->Executors[1] = new TBasicExecutorPool(AppData->UserPoolId, 1, 10);
setup->Executors[2] = new TIOExecutorPool(AppData->IOPoolId, 10);
- setup->Executors[3] = new TBasicExecutorPool(AppData->BatchPoolId, 8, 10);
+ setup->Executors[3] = new TBasicExecutorPool(AppData->BatchPoolId, 1, 10);
setup->Scheduler = new TBasicSchedulerThread(TSchedulerConfig(512, 100));
// initialize logger settings
@@ -214,7 +218,7 @@ struct TPDiskFailureInjectionTest {
// create/register logger actor
auto logger = std::make_unique<TLoggerActor>(loggerSettings, CreateStderrBackend(),
Counters->GetSubgroup("logger", "counters"));
- setup->LocalServices.emplace_back(loggerId, TActorSetupCmd(logger.release(), TMailboxType::Simple, 0));
+ setup->LocalServices.emplace_back(loggerId, TActorSetupCmd(logger.release(), TMailboxType::Simple, AppData->IOPoolId));
// create and then initialize actor system
ActorSystem = std::make_unique<TActorSystem>(setup, AppData.get(), loggerSettings);