aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorserg-belyakov <serg-belyakov@yandex-team.com>2022-09-01 14:26:03 +0300
committerserg-belyakov <serg-belyakov@yandex-team.com>2022-09-01 14:26:03 +0300
commit37045415adbd50120bacf7a5bb00a69d1c93849d (patch)
tree462d2e38e2c7fc790c5b9b286f9175ab0ad296af
parent9da522eaf87dd50f3501b0e472fd2ec03d3f2076 (diff)
downloadydb-37045415adbd50120bacf7a5bb00a69d1c93849d.tar.gz
Race test,
Separate race tests
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp553
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.cpp78
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h284
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_races.cpp297
-rw-r--r--ydb/core/blobstorage/pdisk/ut/CMakeLists.darwin.txt2
-rw-r--r--ydb/core/blobstorage/pdisk/ut/CMakeLists.linux.txt2
6 files changed, 664 insertions, 552 deletions
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp
index cc7e24858a8..4266d75266d 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp
@@ -2,6 +2,7 @@
#include "blobstorage_pdisk_abstract.h"
#include "blobstorage_pdisk_impl.h"
+#include "blobstorage_pdisk_ut_env.h"
#include <ydb/core/blobstorage/crypto/default.h>
#include <ydb/core/testlib/actors/test_runtime.h>
@@ -42,124 +43,6 @@ Y_UNIT_TEST_SUITE(TPDiskTest) {
UNIT_ASSERT(NKikimrBlobStorage::TPDiskState::DeviceIoError == 13);
}
-struct TActorTestContext {
-private:
- std::optional<TActorId> PDiskActor;
- THolder<TTestActorRuntime> Runtime;
- std::shared_ptr<NPDisk::IIoContextFactory> IoContext;
- NPDisk::TPDisk *PDisk = nullptr;
-
-public:
- TActorId Sender;
- NPDisk::TKey MainKey = NPDisk::YdbDefaultPDiskSequence;
- TTestContext TestCtx{false, /*use sector map*/ true};
-
- TIntrusivePtr<TPDiskConfig> DefaultPDiskConfig(bool isBad) {
- TString path;
- EntropyPool().Read(&TestCtx.PDiskGuid, sizeof(TestCtx.PDiskGuid));
- ui64 formatGuid = TestCtx.PDiskGuid + static_cast<ui64>(isBad);
- FormatPDiskForTest(path, formatGuid, MIN_CHUNK_SIZE, false, TestCtx.SectorMap);
-
- ui64 pDiskCategory = 0;
- TIntrusivePtr<TPDiskConfig> pDiskConfig = new TPDiskConfig(path, TestCtx.PDiskGuid, 1, pDiskCategory);
- pDiskConfig->GetDriveDataSwitch = NKikimrBlobStorage::TPDiskConfig::DoNotTouch;
- pDiskConfig->WriteCacheSwitch = NKikimrBlobStorage::TPDiskConfig::DoNotTouch;
- pDiskConfig->ChunkSize = MIN_CHUNK_SIZE;
- pDiskConfig->SectorMap = TestCtx.SectorMap;
- pDiskConfig->EnableSectorEncryption = !pDiskConfig->SectorMap;
- return pDiskConfig;
- }
-
- TActorTestContext(bool isBad)
- : Runtime(new TTestActorRuntime(1, true))
- {
- auto appData = MakeHolder<TAppData>(0, 0, 0, 0, TMap<TString, ui32>(), nullptr, nullptr, nullptr, nullptr);
- IoContext = std::make_shared<NPDisk::TIoContextFactoryOSS>();
- appData->IoContextFactory = IoContext.get();
-
- Runtime->SetLogBackend(IsLowVerbose ? CreateStderrBackend() : CreateNullBackend());
- Runtime->Initialize(TTestActorRuntime::TEgg{appData.Release(), nullptr, {}});
- Runtime->SetLogPriority(NKikimrServices::BS_PDISK, NLog::PRI_NOTICE);
- Runtime->SetLogPriority(NKikimrServices::BS_PDISK_SYSLOG, NLog::PRI_NOTICE);
- Runtime->SetLogPriority(NKikimrServices::BS_PDISK_TEST, NLog::PRI_DEBUG);
- Sender = Runtime->AllocateEdgeActor();
-
- TIntrusivePtr<TPDiskConfig> cfg = DefaultPDiskConfig(isBad);
- UpdateConfigRecreatePDisk(cfg);
- }
-
- TIntrusivePtr<TPDiskConfig> GetPDiskConfig() {
- return GetPDisk()->Cfg;
- }
-
- void UpdateConfigRecreatePDisk(TIntrusivePtr<TPDiskConfig> cfg) {
- if (PDiskActor) {
- TestResponce<NPDisk::TEvYardControlResult>(
- new NPDisk::TEvYardControl(NPDisk::TEvYardControl::PDiskStop, nullptr),
- NKikimrProto::OK);
- PDisk = nullptr;
- Runtime->Send(new IEventHandle(*PDiskActor, Sender, new TKikimrEvents::TEvPoisonPill));
- }
-
- auto mainCounters = TIntrusivePtr<::NMonitoring::TDynamicCounters>(new ::NMonitoring::TDynamicCounters());
- IActor* pDiskActor = CreatePDisk(cfg.Get(), MainKey, mainCounters);
- PDiskActor = Runtime->Register(pDiskActor);
- }
-
- void Send(IEventBase* ev) {
- Runtime->Send(new IEventHandle(*PDiskActor, Sender, ev));
- }
-
- NPDisk::TPDisk *GetPDisk() {
- if (!PDisk) {
- // To be sure that pdisk actor is in StateOnline
- TestResponce<NPDisk::TEvYardControlResult>(
- new NPDisk::TEvYardControl(NPDisk::TEvYardControl::PDiskStart, &MainKey),
- NKikimrProto::OK);
-
- const auto evControlRes = TestResponce<NPDisk::TEvYardControlResult>(
- new NPDisk::TEvYardControl(NPDisk::TEvYardControl::GetPDiskPointer, nullptr),
- NKikimrProto::OK);
- PDisk = reinterpret_cast<NPDisk::TPDisk*>(evControlRes->Cookie);
- }
- return PDisk;
- }
-
- template<typename T>
- auto SafeRunOnPDisk(T&& f) {
- TGuard<TMutex> g(GetPDisk()->StateMutex);
- return f(GetPDisk());
- }
-
- void RestartPDiskSync() {
- TestResponce<NPDisk::TEvYardControlResult>(
- new NPDisk::TEvYardControl(NPDisk::TEvYardControl::PDiskStop, nullptr),
- NKikimrProto::OK);
- PDisk = nullptr;
- // wait initialization and update this->PDisk
- GetPDisk();
- }
-
- template<typename TRes>
- THolder<TRes> Recv() {
- return Runtime->GrabEdgeEvent<TRes>();
- }
-
- template<typename TRes>
- THolder<TRes> TestResponce(IEventBase* ev, NKikimrProto::EReplyStatus status) {
- if (ev) {
- Send(ev);
- }
- THolder<TRes> evRes = Recv<TRes>();
- UNIT_ASSERT_C(evRes->Status == status, evRes->ToString());
- UNIT_ASSERT(status == NKikimrProto::OK || !evRes->ErrorReason.empty());
-
- // Test that all ToString methods don't VERIFY
- Cnull << evRes->ToString();
- return evRes;
- }
-};
-
Y_UNIT_TEST(TestPDiskActorErrorState) {
TActorTestContext testCtx(true);
@@ -222,65 +105,6 @@ public:
testCtx.Send(new NActors::TEvents::TEvPoisonPill());
}
- void TestChunkWriteReleaseRun() {
- TActorTestContext testCtx(false);
-
- const TVDiskID vDiskID(0, 1, 0, 0, 0);
- const auto evInitRes = testCtx.TestResponce<NPDisk::TEvYardInitResult>(
- new NPDisk::TEvYardInit(2, vDiskID, testCtx.TestCtx.PDiskGuid),
- NKikimrProto::OK);
- const auto evReserveRes = testCtx.TestResponce<NPDisk::TEvChunkReserveResult>(
- new NPDisk::TEvChunkReserve(evInitRes->PDiskParams->Owner, evInitRes->PDiskParams->OwnerRound, 1),
- NKikimrProto::OK);
- UNIT_ASSERT(evReserveRes->ChunkIds.size() == 1);
-
- const ui32 reservedChunk = evReserveRes->ChunkIds.front();
- NPDisk::TCommitRecord commitRecord;
- commitRecord.CommitChunks.push_back(reservedChunk);
- testCtx.TestResponce<NPDisk::TEvLogResult>(
- new NPDisk::TEvLog(evInitRes->PDiskParams->Owner, evInitRes->PDiskParams->OwnerRound, 0, commitRecord,
- TString(), TLsnSeg(1, 1), nullptr),
- NKikimrProto::OK);
-
- const auto evControlRes = testCtx.TestResponce<NPDisk::TEvYardControlResult>(
- new NPDisk::TEvYardControl(NPDisk::TEvYardControl::GetPDiskPointer, nullptr),
- NKikimrProto::OK);
- auto *pDisk = reinterpret_cast<NPDisk::TPDisk*>(evControlRes->Cookie);
- pDisk->PDiskThread.StopSync();
-
- {
- NPDisk::TCommitRecord commitRecord;
- commitRecord.DeleteChunks.push_back(reservedChunk);
- NPDisk::TEvLog ev(evInitRes->PDiskParams->Owner, evInitRes->PDiskParams->OwnerRound, 0, commitRecord,
- TString(), TLsnSeg(2, 2), nullptr);
- NPDisk::TLogWrite *log = new NPDisk::TLogWrite(ev, testCtx.Sender, 0, {}, {});
- bool ok = pDisk->PreprocessRequest(log);
- UNIT_ASSERT(ok);
- pDisk->RouteRequest(log);
- }
- pDisk->ProcessLogWriteQueueAndCommits();
-
- {
- TString chunkWriteData = PrepareData(1024);
- NPDisk::TEvChunkWrite ev(evInitRes->PDiskParams->Owner, evInitRes->PDiskParams->OwnerRound, reservedChunk,
- 0, new NPDisk::TEvChunkWrite::TStrokaBackedUpParts(chunkWriteData), nullptr, false, 0);
- NPDisk::TChunkWrite *chunkWrite = new NPDisk::TChunkWrite(ev, testCtx.Sender, {}, {});
- bool ok = pDisk->PreprocessRequest(chunkWrite);
- UNIT_ASSERT(!ok);
- }
-
- pDisk->ProcessChunkWriteQueue();
-
- testCtx.TestResponce<NPDisk::TEvLogResult>(
- nullptr,
- NKikimrProto::OK);
- testCtx.TestResponce<NPDisk::TEvChunkWriteResult>(
- nullptr,
- NKikimrProto::ERROR);
-
- testCtx.Send(new NActors::TEvents::TEvPoisonPill());
- }
-
Y_UNIT_TEST(TestChunkWriteRelease) {
for (ui32 i = 0; i < 16; ++i) {
TestChunkWriteReleaseRun();
@@ -324,23 +148,6 @@ public:
}
}
-struct TVDiskIDOwnerRound {
- TVDiskID VDiskID;
- ui64 OwnerRound;
-};
-
-void RecreateOwner(TActorTestContext& testCtx, TVDiskIDOwnerRound& vdisk) {
- testCtx.TestResponce<NPDisk::TEvSlayResult>(
- new NPDisk::TEvSlay(vdisk.VDiskID, vdisk.OwnerRound + 1, 0, 0),
- NKikimrProto::OK);
-
- const auto evInitRes = testCtx.TestResponce<NPDisk::TEvYardInitResult>(
- new NPDisk::TEvYardInit(vdisk.OwnerRound + 1, vdisk.VDiskID, testCtx.TestCtx.PDiskGuid),
- NKikimrProto::OK);
-
- vdisk.OwnerRound = evInitRes->PDiskParams->OwnerRound;
-}
-
Y_UNIT_TEST(TestPDiskManyOwnersInitiation) {
TActorTestContext testCtx(false);
@@ -375,152 +182,6 @@ void RecreateOwner(TActorTestContext& testCtx, TVDiskIDOwnerRound& vdisk) {
}
}
-enum class EChunkState {
- UNKNOWN,
- RESERVED,
- COMMIT_INFLIGHT,
- COMMITTED,
- DELETE_INFLIGHT,
- DELETED
-};
-
-struct TVDiskMock {
- static std::atomic<ui64> Idx;
- static std::atomic<ui64> OwnerRound;
-
- TActorTestContext *TestCtx;
- const TVDiskID VDiskID;
- TIntrusivePtr<TPDiskParams> PDiskParams;
- ui64 LastUsedLsn = 0;
- ui64 FirstLsnToKeep = 1;
-
- TMap<EChunkState, TSet<TChunkIdx>> Chunks;
-
- TVDiskMock(TActorTestContext *testCtx)
- : TestCtx(testCtx)
- , VDiskID(Idx.fetch_add(1), 1, 0, 0, 0)
- {}
-
- TLsnSeg GetLsnSeg() {
- ++LastUsedLsn;
- return {LastUsedLsn, LastUsedLsn};
- };
-
- void InitFull() {
- Init();
- ReadLog();
- SendEvLogImpl(1, {}, true);
- }
-
- void Init() {
- const auto evInitRes = TestCtx->TestResponce<NPDisk::TEvYardInitResult>(
- new NPDisk::TEvYardInit(OwnerRound.fetch_add(1), VDiskID, TestCtx->TestCtx.PDiskGuid),
- NKikimrProto::OK);
- PDiskParams = evInitRes->PDiskParams;
-
- TSet<TChunkIdx> commited = Chunks[EChunkState::COMMITTED];
- for (TChunkIdx chunk : evInitRes->OwnedChunks) {
- UNIT_ASSERT_C(commited.count(chunk), "misowned chunk# " << chunk);
- commited.erase(chunk);
- }
- UNIT_ASSERT_C(commited.empty(), "there are leaked chunks# " << FormatList(commited));
- }
-
-
- void ReserveChunk() {
- const auto evReserveRes = TestCtx->TestResponce<NPDisk::TEvChunkReserveResult>(
- new NPDisk::TEvChunkReserve(PDiskParams->Owner, PDiskParams->OwnerRound, 1),
- NKikimrProto::OK);
- UNIT_ASSERT(evReserveRes->ChunkIds.size() == 1);
- const ui32 reservedChunk = evReserveRes->ChunkIds.front();
- Chunks[EChunkState::RESERVED].emplace(reservedChunk);
- }
-
- void CommitReservedChunks() {
- auto& reservedChunks = Chunks[EChunkState::RESERVED];
- NPDisk::TCommitRecord rec;
- rec.CommitChunks = TVector<TChunkIdx>(reservedChunks.begin(), reservedChunks.end());
- SendEvLogImpl(1, rec);
- Chunks[EChunkState::COMMITTED].insert(reservedChunks.begin(), reservedChunks.end());
- reservedChunks.clear();
- }
-
- void DeleteCommitedChunks() {
- auto& commited = Chunks[EChunkState::COMMITTED];
- NPDisk::TCommitRecord rec;
- rec.DeleteChunks = TVector<TChunkIdx>(commited.begin(), commited.end());
- SendEvLogImpl(1, rec);
- Chunks[EChunkState::DELETED].insert(commited.begin(), commited.end());
- commited.clear();
- }
-
- ui64 ReadLog(std::function<void(const NPDisk::TLogRecord&)> logResCallback = {}) {
- ui64 logRecordsRead = 0;
-
- NPDisk::TLogPosition position{0, 0};
- bool endOfLog = false;
- do {
- UNIT_ASSERT(PDiskParams);
- auto logReadRes = TestCtx->TestResponce<NPDisk::TEvReadLogResult>(
- new NPDisk::TEvReadLog(PDiskParams->Owner, PDiskParams->OwnerRound, position),
- NKikimrProto::OK);
- UNIT_ASSERT(position == logReadRes->Position);
- for (const NPDisk::TLogRecord& rec : logReadRes->Results) {
- ++logRecordsRead;
- if (logResCallback) {
- logResCallback(rec);
- }
- LastUsedLsn = Max(LastUsedLsn, rec.Lsn);
- }
- position = logReadRes->NextPosition;
- endOfLog = logReadRes->IsEndOfLog;
- } while (!endOfLog);
-
- return logRecordsRead;
- }
-
- void SendEvLogSync(const ui64 size = 128) {
- SendEvLogImpl(size, {}, false);
- }
-
- void CutLogAllButOne() {
- SendEvLogImpl(1, LastUsedLsn + 1, true);
- }
-
- ui64 OwnedLogRecords() const {
- return LastUsedLsn + 1 - FirstLsnToKeep;
- }
-
-private:
- void SendEvLogImpl(const ui64 size, TMaybe<NPDisk::TCommitRecord> commitRec) {
- auto evLog = MakeHolder<NPDisk::TEvLog>(PDiskParams->Owner, PDiskParams->OwnerRound, 0, PrepareData(size),
- GetLsnSeg(), nullptr);
-
- if (commitRec) {
- evLog->Signature.SetCommitRecord();
- evLog->CommitRecord = std::move(*commitRec);
- }
-
- TestCtx->TestResponce<NPDisk::TEvLogResult>(evLog.Release(), NKikimrProto::OK);
- }
-
- void SendEvLogImpl(const ui64 size, TMaybe<ui64> firstLsnToKeep, bool isStartingPoint) {
-
- TMaybe<NPDisk::TCommitRecord> rec;
-
- if (firstLsnToKeep || isStartingPoint) {
- rec = NPDisk::TCommitRecord();
- rec->FirstLsnToKeep = firstLsnToKeep.GetOrElse(0);
- FirstLsnToKeep = Max(FirstLsnToKeep, firstLsnToKeep.GetOrElse(0));
- rec->IsStartingPoint = isStartingPoint;
- }
- SendEvLogImpl(size, rec);
- }
-};
-
-std::atomic<ui64> TVDiskMock::Idx = 0;
-std::atomic<ui64> TVDiskMock::OwnerRound = 2;
-
Y_UNIT_TEST(TestVDiskMock) {
TActorTestContext testCtx(false);
TVDiskMock mock(&testCtx);
@@ -1023,217 +684,5 @@ std::atomic<ui64> TVDiskMock::OwnerRound = 2;
new NPDisk::TEvCheckSpace(evInitRes->PDiskParams->Owner, evInitRes->PDiskParams->OwnerRound),
NKikimrProto::CORRUPTED);
}
-
- Y_UNIT_TEST(KillOwnerWhileDeletingChunk) {
- THPTimer timer;
- ui32 timeLimit = 20;
- while (timer.Passed() < timeLimit) {
- TActorTestContext testCtx(false);
-
- auto logNoTest = [&](TVDiskMock& mock, NPDisk::TCommitRecord rec) {
- auto evLog = MakeHolder<NPDisk::TEvLog>(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound, 0, PrepareData(1),
- mock.GetLsnSeg(), nullptr);
- evLog->Signature.SetCommitRecord();
- evLog->CommitRecord = std::move(rec);
- testCtx.Send(evLog.Release());
- };
-
- TVDiskMock mock(&testCtx);
- mock.Init();
-
- ui32 vdisksNum = 100;
- std::vector<TVDiskMock> mocks;
- for (ui32 i = 0; i < vdisksNum; ++i) {
- mocks.push_back(TVDiskMock(&testCtx));
- mocks[i].Init();
- }
-
- ui32 reservedChunks = 10;
-
- for (ui32 i = 0; i < reservedChunks; ++i) {
- mock.ReserveChunk();
- }
- mock.CommitReservedChunks();
-
- while (mock.Chunks[EChunkState::COMMITTED].size() > 0) {
- auto it = mock.Chunks[EChunkState::COMMITTED].begin();
- NPDisk::TCommitRecord rec;
- rec.DeleteChunks.push_back(*it);
- logNoTest(mock, rec);
- mock.Chunks[EChunkState::COMMITTED].erase(it);
- }
-
- testCtx.Send(new NPDisk::TEvHarakiri(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound));
-
- for (ui32 c = 0; ; c = (c + 1) % mocks.size()) {
- testCtx.Send(new NPDisk::TEvChunkReserve(mocks[c].PDiskParams->Owner, mocks[c].PDiskParams->OwnerRound, 1));
- THolder<NPDisk::TEvChunkReserveResult> evRes = testCtx.Recv<NPDisk::TEvChunkReserveResult>();
- if (!evRes || evRes->Status != NKikimrProto::OK) {
- break;
- }
- const ui32 reservedChunk = evRes->ChunkIds.front();
- auto& reservedChunks = mocks[c].Chunks[EChunkState::RESERVED];
- reservedChunks.emplace(reservedChunk);
-
- NPDisk::TCommitRecord rec;
- rec.CommitChunks.push_back(*reservedChunks.begin());
- logNoTest(mocks[c], rec);
- reservedChunks.clear();
- }
- testCtx.Recv<NPDisk::TEvHarakiriResult>();
- }
- }
-
- Y_UNIT_TEST(KillOwnerWhileDeletingChunkWithInflight) {
- THPTimer timer;
- ui32 timeLimit = 20;
- while (timer.Passed() < timeLimit) {
- TActorTestContext testCtx(false);
-
- auto logNoTest = [&](TVDiskMock& mock, NPDisk::TCommitRecord rec) {
- auto evLog = MakeHolder<NPDisk::TEvLog>(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound, 0, PrepareData(1),
- mock.GetLsnSeg(), nullptr);
- evLog->Signature.SetCommitRecord();
- evLog->CommitRecord = std::move(rec);
- testCtx.Send(evLog.Release());
- };
-
- TVDiskMock mock(&testCtx);
- mock.Init();
-
- ui32 vdisksNum = 100;
- std::vector<TVDiskMock> mocks;
- for (ui32 i = 0; i < vdisksNum; ++i) {
- mocks.push_back(TVDiskMock(&testCtx));
- mocks[i].Init();
- }
-
- ui32 reservedChunks = 10;
- for (ui32 i = 0; i < reservedChunks; ++i) {
- mock.ReserveChunk();
- }
- mock.CommitReservedChunks();
- TVector<TChunkIdx> chunkIds(mock.Chunks[EChunkState::COMMITTED].begin(), mock.Chunks[EChunkState::COMMITTED].end());
-
- ui32 inflight = 300;
-
- while (mock.Chunks[EChunkState::COMMITTED].size() > 0) {
- auto it = mock.Chunks[EChunkState::COMMITTED].begin();
- for (ui32 i = 0; i < inflight; ++i) {
- TString data = "HATE. LET ME TELL YOU HOW MUCH I'VE COME TO HATE YOU SINCE I BEGAN TO LIVE...";
- testCtx.Send(new NPDisk::TEvChunkWrite(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound,
- *it, 0, new NPDisk::TEvChunkWrite::TStrokaBackedUpParts(data), nullptr, false, 0));
- }
- NPDisk::TCommitRecord rec;
- rec.DeleteChunks.push_back(*it);
- logNoTest(mock, rec);
- mock.Chunks[EChunkState::COMMITTED].erase(it);
- }
- mock.Chunks[EChunkState::COMMITTED].clear();
-
- testCtx.Send(new NPDisk::TEvHarakiri(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound));
-
- for (ui32 c = 0; ; c = (c + 1) % mocks.size()) {
- testCtx.Send(new NPDisk::TEvChunkReserve(mocks[c].PDiskParams->Owner, mocks[c].PDiskParams->OwnerRound, 1));
- THolder<NPDisk::TEvChunkReserveResult> evRes = testCtx.Recv<NPDisk::TEvChunkReserveResult>();
- if (!evRes || evRes->Status != NKikimrProto::OK) {
- break;
- }
- const ui32 reservedChunk = evRes->ChunkIds.front();
- auto& reservedChunks = mocks[c].Chunks[EChunkState::RESERVED];
- reservedChunks.emplace(reservedChunk);
-
- NPDisk::TCommitRecord rec;
- rec.CommitChunks.push_back(*reservedChunks.begin());
- logNoTest(mocks[c], rec);
- reservedChunks.clear();
- }
- testCtx.Recv<NPDisk::TEvHarakiriResult>();
- }
- }
-
- Y_UNIT_TEST(DecommitWithInflight) {
- THPTimer timer;
- ui32 timeLimit = 20;
- while (timer.Passed() < timeLimit) {
- TActorTestContext testCtx(false);
- ui32 dataSize = 1024;
-
- auto logNoTest = [&](TVDiskMock& mock, NPDisk::TCommitRecord rec) {
- auto evLog = MakeHolder<NPDisk::TEvLog>(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound, 0, PrepareData(1),
- mock.GetLsnSeg(), nullptr);
- evLog->Signature.SetCommitRecord();
- evLog->CommitRecord = std::move(rec);
- testCtx.Send(evLog.Release());
- };
-
- auto sendManyReads = [&](TVDiskMock& mock, TChunkIdx chunk, ui32 number, ui64& cookie) {
- for (ui32 i = 0; i < number; ++i) {
- testCtx.Send(new NPDisk::TEvChunkRead(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound,
- chunk, 0, dataSize, 0, (void*)(cookie++)));
- }
- };
-
- auto sendManyWrites = [&](TVDiskMock& mock, TChunkIdx chunk, ui32 number, ui64& cookie) {
- for (ui32 i = 0; i < number; ++i) {
- TString data = PrepareData(dataSize);
- testCtx.Send(new NPDisk::TEvChunkWrite(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound,
- chunk, 0, new NPDisk::TEvChunkWrite::TStrokaBackedUpParts(data), (void*)(cookie++), false, 0));
- }
- };
-
- TVDiskMock mock(&testCtx);
- mock.Init();
-
- ui32 reservedChunks = 10;
- for (ui32 i = 0; i < reservedChunks; ++i) {
- mock.ReserveChunk();
- }
-
- {
- auto& chunkIds = mock.Chunks[EChunkState::COMMITTED];
- for (auto it = chunkIds.begin(); it != chunkIds.end(); ++it) {
- TString data = PrepareData(dataSize);
- testCtx.TestResponce<NPDisk::TEvChunkWriteResult>(new NPDisk::TEvChunkWrite(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound,
- *it, 0, new NPDisk::TEvChunkWrite::TStrokaBackedUpParts(data), (void*)10, false, 0),
- NKikimrProto::OK);
- }
- }
-
- mock.CommitReservedChunks();
-
- ui32 inflight = 100;
- auto& chunkIds = mock.Chunks[EChunkState::COMMITTED];
-
- ui64 cookie = 0;
- for (auto it = chunkIds.begin(); it != chunkIds.end(); ++it) {
- sendManyWrites(mock, *it, inflight, cookie);
- sendManyReads(mock, *it, inflight, cookie);
- NPDisk::TCommitRecord rec;
- rec.DeleteChunks.push_back(*it);
- rec.DeleteToDecommitted = true;
- logNoTest(mock, rec);
- sendManyWrites(mock, *it, inflight, cookie);
- sendManyReads(mock, *it, inflight, cookie);
- }
- mock.Chunks[EChunkState::COMMITTED].clear();
-
-
- for (ui32 i = 0; i < inflight * 2 * reservedChunks; ++i) {
- {
- auto res = testCtx.Recv<NPDisk::TEvChunkReadResult>();
- UNIT_ASSERT_VALUES_EQUAL_C(res->Status, NKikimrProto::OK, res->ToString());
- }
- {
- auto res = testCtx.Recv<NPDisk::TEvChunkWriteResult>();
- UNIT_ASSERT_VALUES_EQUAL_C(res->Status, NKikimrProto::OK, res->ToString());
- }
- }
- for (ui32 i = 0; i < reservedChunks; ++i) {
- testCtx.TestResponce<NPDisk::TEvChunkForgetResult>(new NPDisk::TEvChunkForget(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound),
- NKikimrProto::OK);
- }
- }
- }
}
} // namespace NKikimr
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.cpp
new file mode 100644
index 00000000000..3777db36499
--- /dev/null
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.cpp
@@ -0,0 +1,78 @@
+#include "blobstorage_pdisk_ut_env.h"
+
+namespace NKikimr {
+void RecreateOwner(TActorTestContext& testCtx, TVDiskIDOwnerRound& vdisk) {
+ testCtx.TestResponce<NPDisk::TEvSlayResult>(
+ new NPDisk::TEvSlay(vdisk.VDiskID, vdisk.OwnerRound + 1, 0, 0),
+ NKikimrProto::OK);
+
+ const auto evInitRes = testCtx.TestResponce<NPDisk::TEvYardInitResult>(
+ new NPDisk::TEvYardInit(vdisk.OwnerRound + 1, vdisk.VDiskID, testCtx.TestCtx.PDiskGuid),
+ NKikimrProto::OK);
+
+ vdisk.OwnerRound = evInitRes->PDiskParams->OwnerRound;
+}
+
+void TestChunkWriteReleaseRun() {
+ TActorTestContext testCtx(false);
+
+ const TVDiskID vDiskID(0, 1, 0, 0, 0);
+ const auto evInitRes = testCtx.TestResponce<NPDisk::TEvYardInitResult>(
+ new NPDisk::TEvYardInit(2, vDiskID, testCtx.TestCtx.PDiskGuid),
+ NKikimrProto::OK);
+ const auto evReserveRes = testCtx.TestResponce<NPDisk::TEvChunkReserveResult>(
+ new NPDisk::TEvChunkReserve(evInitRes->PDiskParams->Owner, evInitRes->PDiskParams->OwnerRound, 1),
+ NKikimrProto::OK);
+ UNIT_ASSERT(evReserveRes->ChunkIds.size() == 1);
+
+ const ui32 reservedChunk = evReserveRes->ChunkIds.front();
+ NPDisk::TCommitRecord commitRecord;
+ commitRecord.CommitChunks.push_back(reservedChunk);
+ testCtx.TestResponce<NPDisk::TEvLogResult>(
+ new NPDisk::TEvLog(evInitRes->PDiskParams->Owner, evInitRes->PDiskParams->OwnerRound, 0, commitRecord,
+ TString(), TLsnSeg(1, 1), nullptr),
+ NKikimrProto::OK);
+
+ const auto evControlRes = testCtx.TestResponce<NPDisk::TEvYardControlResult>(
+ new NPDisk::TEvYardControl(NPDisk::TEvYardControl::GetPDiskPointer, nullptr),
+ NKikimrProto::OK);
+ auto *pDisk = reinterpret_cast<NPDisk::TPDisk*>(evControlRes->Cookie);
+ pDisk->PDiskThread.StopSync();
+
+ {
+ NPDisk::TCommitRecord commitRecord;
+ commitRecord.DeleteChunks.push_back(reservedChunk);
+ NPDisk::TEvLog ev(evInitRes->PDiskParams->Owner, evInitRes->PDiskParams->OwnerRound, 0, commitRecord,
+ TString(), TLsnSeg(2, 2), nullptr);
+ NPDisk::TLogWrite *log = new NPDisk::TLogWrite(ev, testCtx.Sender, 0, {}, {});
+ bool ok = pDisk->PreprocessRequest(log);
+ UNIT_ASSERT(ok);
+ pDisk->RouteRequest(log);
+ }
+ pDisk->ProcessLogWriteQueueAndCommits();
+
+ {
+ TString chunkWriteData = PrepareData(1024);
+ NPDisk::TEvChunkWrite ev(evInitRes->PDiskParams->Owner, evInitRes->PDiskParams->OwnerRound, reservedChunk,
+ 0, new NPDisk::TEvChunkWrite::TStrokaBackedUpParts(chunkWriteData), nullptr, false, 0);
+ NPDisk::TChunkWrite *chunkWrite = new NPDisk::TChunkWrite(ev, testCtx.Sender, {}, {});
+ bool ok = pDisk->PreprocessRequest(chunkWrite);
+ UNIT_ASSERT(!ok);
+ }
+
+ pDisk->ProcessChunkWriteQueue();
+
+ testCtx.TestResponce<NPDisk::TEvLogResult>(
+ nullptr,
+ NKikimrProto::OK);
+ testCtx.TestResponce<NPDisk::TEvChunkWriteResult>(
+ nullptr,
+ NKikimrProto::ERROR);
+
+ testCtx.Send(new NActors::TEvents::TEvPoisonPill());
+}
+
+std::atomic<ui64> TVDiskMock::Idx = 0;
+std::atomic<ui64> TVDiskMock::OwnerRound = 2;
+
+} \ No newline at end of file
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h
new file mode 100644
index 00000000000..37d157a2b83
--- /dev/null
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h
@@ -0,0 +1,284 @@
+#pragma once
+#include "blobstorage_pdisk_ut.h"
+
+#include "blobstorage_pdisk_abstract.h"
+#include "blobstorage_pdisk_impl.h"
+#include "blobstorage_pdisk_ut_env.h"
+
+#include <ydb/core/blobstorage/crypto/default.h>
+#include <ydb/core/testlib/actors/test_runtime.h>
+
+#include <util/system/hp_timer.h>
+
+namespace NKikimr {
+
+struct TActorTestContext {
+private:
+ std::optional<TActorId> PDiskActor;
+ THolder<TTestActorRuntime> Runtime;
+ std::shared_ptr<NPDisk::IIoContextFactory> IoContext;
+ NPDisk::TPDisk *PDisk = nullptr;
+
+public:
+ TActorId Sender;
+ NPDisk::TKey MainKey = NPDisk::YdbDefaultPDiskSequence;
+ TTestContext TestCtx{false, /*use sector map*/ true};
+
+ TIntrusivePtr<TPDiskConfig> DefaultPDiskConfig(bool isBad) {
+ TString path;
+ EntropyPool().Read(&TestCtx.PDiskGuid, sizeof(TestCtx.PDiskGuid));
+ ui64 formatGuid = TestCtx.PDiskGuid + static_cast<ui64>(isBad);
+ FormatPDiskForTest(path, formatGuid, MIN_CHUNK_SIZE, false, TestCtx.SectorMap);
+
+ ui64 pDiskCategory = 0;
+ TIntrusivePtr<TPDiskConfig> pDiskConfig = new TPDiskConfig(path, TestCtx.PDiskGuid, 1, pDiskCategory);
+ pDiskConfig->GetDriveDataSwitch = NKikimrBlobStorage::TPDiskConfig::DoNotTouch;
+ pDiskConfig->WriteCacheSwitch = NKikimrBlobStorage::TPDiskConfig::DoNotTouch;
+ pDiskConfig->ChunkSize = MIN_CHUNK_SIZE;
+ pDiskConfig->SectorMap = TestCtx.SectorMap;
+ pDiskConfig->EnableSectorEncryption = !pDiskConfig->SectorMap;
+ return pDiskConfig;
+ }
+
+ TActorTestContext(bool isBad)
+ : Runtime(new TTestActorRuntime(1, true))
+ {
+ auto appData = MakeHolder<TAppData>(0, 0, 0, 0, TMap<TString, ui32>(), nullptr, nullptr, nullptr, nullptr);
+ IoContext = std::make_shared<NPDisk::TIoContextFactoryOSS>();
+ appData->IoContextFactory = IoContext.get();
+
+ Runtime->SetLogBackend(IsLowVerbose ? CreateStderrBackend() : CreateNullBackend());
+ Runtime->Initialize(TTestActorRuntime::TEgg{appData.Release(), nullptr, {}});
+ Runtime->SetLogPriority(NKikimrServices::BS_PDISK, NLog::PRI_NOTICE);
+ Runtime->SetLogPriority(NKikimrServices::BS_PDISK_SYSLOG, NLog::PRI_NOTICE);
+ Runtime->SetLogPriority(NKikimrServices::BS_PDISK_TEST, NLog::PRI_DEBUG);
+ Sender = Runtime->AllocateEdgeActor();
+
+ TIntrusivePtr<TPDiskConfig> cfg = DefaultPDiskConfig(isBad);
+ UpdateConfigRecreatePDisk(cfg);
+ }
+
+ TIntrusivePtr<TPDiskConfig> GetPDiskConfig() {
+ return GetPDisk()->Cfg;
+ }
+
+ void UpdateConfigRecreatePDisk(TIntrusivePtr<TPDiskConfig> cfg) {
+ if (PDiskActor) {
+ TestResponce<NPDisk::TEvYardControlResult>(
+ new NPDisk::TEvYardControl(NPDisk::TEvYardControl::PDiskStop, nullptr),
+ NKikimrProto::OK);
+ PDisk = nullptr;
+ Runtime->Send(new IEventHandle(*PDiskActor, Sender, new TKikimrEvents::TEvPoisonPill));
+ }
+
+ auto mainCounters = TIntrusivePtr<::NMonitoring::TDynamicCounters>(new ::NMonitoring::TDynamicCounters());
+ IActor* pDiskActor = CreatePDisk(cfg.Get(), MainKey, mainCounters);
+ PDiskActor = Runtime->Register(pDiskActor);
+ }
+
+ void Send(IEventBase* ev) {
+ Runtime->Send(new IEventHandle(*PDiskActor, Sender, ev));
+ }
+
+ NPDisk::TPDisk *GetPDisk() {
+ if (!PDisk) {
+ // To be sure that pdisk actor is in StateOnline
+ TestResponce<NPDisk::TEvYardControlResult>(
+ new NPDisk::TEvYardControl(NPDisk::TEvYardControl::PDiskStart, &MainKey),
+ NKikimrProto::OK);
+
+ const auto evControlRes = TestResponce<NPDisk::TEvYardControlResult>(
+ new NPDisk::TEvYardControl(NPDisk::TEvYardControl::GetPDiskPointer, nullptr),
+ NKikimrProto::OK);
+ PDisk = reinterpret_cast<NPDisk::TPDisk*>(evControlRes->Cookie);
+ }
+ return PDisk;
+ }
+
+ template<typename T>
+ auto SafeRunOnPDisk(T&& f) {
+ TGuard<TMutex> g(GetPDisk()->StateMutex);
+ return f(GetPDisk());
+ }
+
+ void RestartPDiskSync() {
+ TestResponce<NPDisk::TEvYardControlResult>(
+ new NPDisk::TEvYardControl(NPDisk::TEvYardControl::PDiskStop, nullptr),
+ NKikimrProto::OK);
+ PDisk = nullptr;
+ // wait initialization and update this->PDisk
+ GetPDisk();
+ }
+
+ template<typename TRes>
+ THolder<TRes> Recv() {
+ return Runtime->GrabEdgeEvent<TRes>();
+ }
+
+ template<typename TRes>
+ THolder<TRes> TestResponce(IEventBase* ev, NKikimrProto::EReplyStatus status) {
+ if (ev) {
+ Send(ev);
+ }
+ THolder<TRes> evRes = Recv<TRes>();
+ UNIT_ASSERT_C(evRes->Status == status, evRes->ToString());
+ UNIT_ASSERT(status == NKikimrProto::OK || !evRes->ErrorReason.empty());
+
+ // Test that all ToString methods don't VERIFY
+ Cnull << evRes->ToString();
+ return evRes;
+ }
+};
+
+struct TVDiskIDOwnerRound {
+ TVDiskID VDiskID;
+ ui64 OwnerRound;
+};
+
+void RecreateOwner(TActorTestContext& testCtx, TVDiskIDOwnerRound& vdisk);
+
+enum class EChunkState {
+ UNKNOWN,
+ RESERVED,
+ COMMIT_INFLIGHT,
+ COMMITTED,
+ DELETE_INFLIGHT,
+ DELETED
+};
+
+struct TVDiskMock {
+ static std::atomic<ui64> Idx;
+ static std::atomic<ui64> OwnerRound;
+
+ TActorTestContext *TestCtx;
+ const TVDiskID VDiskID;
+ TIntrusivePtr<TPDiskParams> PDiskParams;
+ ui64 LastUsedLsn = 0;
+ ui64 FirstLsnToKeep = 1;
+
+ TMap<EChunkState, TSet<TChunkIdx>> Chunks;
+
+ TVDiskMock(TActorTestContext *testCtx)
+ : TestCtx(testCtx)
+ , VDiskID(Idx.fetch_add(1), 1, 0, 0, 0)
+ {}
+
+ TLsnSeg GetLsnSeg() {
+ ++LastUsedLsn;
+ return {LastUsedLsn, LastUsedLsn};
+ };
+
+ void InitFull() {
+ Init();
+ ReadLog();
+ SendEvLogImpl(1, {}, true);
+ }
+
+ void Init() {
+ const auto evInitRes = TestCtx->TestResponce<NPDisk::TEvYardInitResult>(
+ new NPDisk::TEvYardInit(OwnerRound.fetch_add(1), VDiskID, TestCtx->TestCtx.PDiskGuid),
+ NKikimrProto::OK);
+ PDiskParams = evInitRes->PDiskParams;
+
+ TSet<TChunkIdx> commited = Chunks[EChunkState::COMMITTED];
+ for (TChunkIdx chunk : evInitRes->OwnedChunks) {
+ UNIT_ASSERT_C(commited.count(chunk), "misowned chunk# " << chunk);
+ commited.erase(chunk);
+ }
+ UNIT_ASSERT_C(commited.empty(), "there are leaked chunks# " << FormatList(commited));
+ }
+
+
+ void ReserveChunk() {
+ const auto evReserveRes = TestCtx->TestResponce<NPDisk::TEvChunkReserveResult>(
+ new NPDisk::TEvChunkReserve(PDiskParams->Owner, PDiskParams->OwnerRound, 1),
+ NKikimrProto::OK);
+ UNIT_ASSERT(evReserveRes->ChunkIds.size() == 1);
+ const ui32 reservedChunk = evReserveRes->ChunkIds.front();
+ Chunks[EChunkState::RESERVED].emplace(reservedChunk);
+ }
+
+ void CommitReservedChunks() {
+ auto& reservedChunks = Chunks[EChunkState::RESERVED];
+ NPDisk::TCommitRecord rec;
+ rec.CommitChunks = TVector<TChunkIdx>(reservedChunks.begin(), reservedChunks.end());
+ SendEvLogImpl(1, rec);
+ Chunks[EChunkState::COMMITTED].insert(reservedChunks.begin(), reservedChunks.end());
+ reservedChunks.clear();
+ }
+
+ void DeleteCommitedChunks() {
+ auto& commited = Chunks[EChunkState::COMMITTED];
+ NPDisk::TCommitRecord rec;
+ rec.DeleteChunks = TVector<TChunkIdx>(commited.begin(), commited.end());
+ SendEvLogImpl(1, rec);
+ Chunks[EChunkState::DELETED].insert(commited.begin(), commited.end());
+ commited.clear();
+ }
+
+ ui64 ReadLog(std::function<void(const NPDisk::TLogRecord&)> logResCallback = {}) {
+ ui64 logRecordsRead = 0;
+
+ NPDisk::TLogPosition position{0, 0};
+ bool endOfLog = false;
+ do {
+ UNIT_ASSERT(PDiskParams);
+ auto logReadRes = TestCtx->TestResponce<NPDisk::TEvReadLogResult>(
+ new NPDisk::TEvReadLog(PDiskParams->Owner, PDiskParams->OwnerRound, position),
+ NKikimrProto::OK);
+ UNIT_ASSERT(position == logReadRes->Position);
+ for (const NPDisk::TLogRecord& rec : logReadRes->Results) {
+ ++logRecordsRead;
+ if (logResCallback) {
+ logResCallback(rec);
+ }
+ LastUsedLsn = Max(LastUsedLsn, rec.Lsn);
+ }
+ position = logReadRes->NextPosition;
+ endOfLog = logReadRes->IsEndOfLog;
+ } while (!endOfLog);
+
+ return logRecordsRead;
+ }
+
+ void SendEvLogSync(const ui64 size = 128) {
+ SendEvLogImpl(size, {}, false);
+ }
+
+ void CutLogAllButOne() {
+ SendEvLogImpl(1, LastUsedLsn + 1, true);
+ }
+
+ ui64 OwnedLogRecords() const {
+ return LastUsedLsn + 1 - FirstLsnToKeep;
+ }
+
+private:
+ void SendEvLogImpl(const ui64 size, TMaybe<NPDisk::TCommitRecord> commitRec) {
+ auto evLog = MakeHolder<NPDisk::TEvLog>(PDiskParams->Owner, PDiskParams->OwnerRound, 0, PrepareData(size),
+ GetLsnSeg(), nullptr);
+
+ if (commitRec) {
+ evLog->Signature.SetCommitRecord();
+ evLog->CommitRecord = std::move(*commitRec);
+ }
+
+ TestCtx->TestResponce<NPDisk::TEvLogResult>(evLog.Release(), NKikimrProto::OK);
+ }
+
+ void SendEvLogImpl(const ui64 size, TMaybe<ui64> firstLsnToKeep, bool isStartingPoint) {
+
+ TMaybe<NPDisk::TCommitRecord> rec;
+
+ if (firstLsnToKeep || isStartingPoint) {
+ rec = NPDisk::TCommitRecord();
+ rec->FirstLsnToKeep = firstLsnToKeep.GetOrElse(0);
+ FirstLsnToKeep = Max(FirstLsnToKeep, firstLsnToKeep.GetOrElse(0));
+ rec->IsStartingPoint = isStartingPoint;
+ }
+ SendEvLogImpl(size, rec);
+ }
+};
+
+void TestChunkWriteReleaseRun();
+} \ No newline at end of file
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_races.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_races.cpp
new file mode 100644
index 00000000000..c2bfb201f2c
--- /dev/null
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_races.cpp
@@ -0,0 +1,297 @@
+#include "blobstorage_pdisk_ut.h"
+
+#include "blobstorage_pdisk_abstract.h"
+#include "blobstorage_pdisk_impl.h"
+#include "blobstorage_pdisk_ut_env.h"
+
+#include <ydb/core/blobstorage/crypto/default.h>
+#include <ydb/core/testlib/actors/test_runtime.h>
+
+#include <util/system/hp_timer.h>
+
+namespace NKikimr {
+
+Y_UNIT_TEST_SUITE(TPDiskRaces) {
+ Y_UNIT_TEST(KillOwnerWhileDeletingChunk) {
+ THPTimer timer;
+ ui32 timeLimit = 20;
+ while (timer.Passed() < timeLimit) {
+ TActorTestContext testCtx(false);
+
+ auto logNoTest = [&](TVDiskMock& mock, NPDisk::TCommitRecord rec) {
+ auto evLog = MakeHolder<NPDisk::TEvLog>(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound, 0, PrepareData(1),
+ mock.GetLsnSeg(), nullptr);
+ evLog->Signature.SetCommitRecord();
+ evLog->CommitRecord = std::move(rec);
+ testCtx.Send(evLog.Release());
+ };
+
+ TVDiskMock mock(&testCtx);
+ mock.Init();
+
+ ui32 vdisksNum = 100;
+ std::vector<TVDiskMock> mocks;
+ for (ui32 i = 0; i < vdisksNum; ++i) {
+ mocks.push_back(TVDiskMock(&testCtx));
+ mocks[i].Init();
+ }
+
+ ui32 reservedChunks = 10;
+
+ for (ui32 i = 0; i < reservedChunks; ++i) {
+ mock.ReserveChunk();
+ }
+ mock.CommitReservedChunks();
+
+ while (mock.Chunks[EChunkState::COMMITTED].size() > 0) {
+ auto it = mock.Chunks[EChunkState::COMMITTED].begin();
+ NPDisk::TCommitRecord rec;
+ rec.DeleteChunks.push_back(*it);
+ logNoTest(mock, rec);
+ mock.Chunks[EChunkState::COMMITTED].erase(it);
+ }
+
+ testCtx.Send(new NPDisk::TEvHarakiri(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound));
+
+ for (ui32 c = 0; ; c = (c + 1) % mocks.size()) {
+ testCtx.Send(new NPDisk::TEvChunkReserve(mocks[c].PDiskParams->Owner, mocks[c].PDiskParams->OwnerRound, 1));
+ THolder<NPDisk::TEvChunkReserveResult> evRes = testCtx.Recv<NPDisk::TEvChunkReserveResult>();
+ if (!evRes || evRes->Status != NKikimrProto::OK) {
+ break;
+ }
+ const ui32 reservedChunk = evRes->ChunkIds.front();
+ auto& reservedChunks = mocks[c].Chunks[EChunkState::RESERVED];
+ reservedChunks.emplace(reservedChunk);
+
+ NPDisk::TCommitRecord rec;
+ rec.CommitChunks.push_back(*reservedChunks.begin());
+ logNoTest(mocks[c], rec);
+ reservedChunks.clear();
+ }
+ testCtx.Recv<NPDisk::TEvHarakiriResult>();
+ }
+ }
+
+ Y_UNIT_TEST(KillOwnerWhileDeletingChunkWithInflight) {
+ THPTimer timer;
+ ui32 timeLimit = 20;
+ while (timer.Passed() < timeLimit) {
+ TActorTestContext testCtx(false);
+
+ auto logNoTest = [&](TVDiskMock& mock, NPDisk::TCommitRecord rec) {
+ auto evLog = MakeHolder<NPDisk::TEvLog>(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound, 0, PrepareData(1),
+ mock.GetLsnSeg(), nullptr);
+ evLog->Signature.SetCommitRecord();
+ evLog->CommitRecord = std::move(rec);
+ testCtx.Send(evLog.Release());
+ };
+
+ TVDiskMock mock(&testCtx);
+ mock.Init();
+
+ ui32 vdisksNum = 100;
+ std::vector<TVDiskMock> mocks;
+ for (ui32 i = 0; i < vdisksNum; ++i) {
+ mocks.push_back(TVDiskMock(&testCtx));
+ mocks[i].Init();
+ }
+
+ ui32 reservedChunks = 10;
+ for (ui32 i = 0; i < reservedChunks; ++i) {
+ mock.ReserveChunk();
+ }
+ mock.CommitReservedChunks();
+ TVector<TChunkIdx> chunkIds(mock.Chunks[EChunkState::COMMITTED].begin(), mock.Chunks[EChunkState::COMMITTED].end());
+
+ ui32 inflight = 50;
+
+ while (mock.Chunks[EChunkState::COMMITTED].size() > 0) {
+ auto it = mock.Chunks[EChunkState::COMMITTED].begin();
+ for (ui32 i = 0; i < inflight; ++i) {
+ TString data = "HATE. LET ME TELL YOU HOW MUCH I'VE COME TO HATE YOU SINCE I BEGAN TO LIVE...";
+ testCtx.Send(new NPDisk::TEvChunkWrite(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound,
+ *it, 0, new NPDisk::TEvChunkWrite::TStrokaBackedUpParts(data), nullptr, false, 0));
+ }
+ NPDisk::TCommitRecord rec;
+ rec.DeleteChunks.push_back(*it);
+ logNoTest(mock, rec);
+ mock.Chunks[EChunkState::COMMITTED].erase(it);
+ }
+ mock.Chunks[EChunkState::COMMITTED].clear();
+
+ testCtx.Send(new NPDisk::TEvHarakiri(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound));
+
+ for (ui32 c = 0; ; c = (c + 1) % mocks.size()) {
+ testCtx.Send(new NPDisk::TEvChunkReserve(mocks[c].PDiskParams->Owner, mocks[c].PDiskParams->OwnerRound, 1));
+ THolder<NPDisk::TEvChunkReserveResult> evRes = testCtx.Recv<NPDisk::TEvChunkReserveResult>();
+ if (!evRes || evRes->Status != NKikimrProto::OK) {
+ break;
+ }
+ const ui32 reservedChunk = evRes->ChunkIds.front();
+ auto& reservedChunks = mocks[c].Chunks[EChunkState::RESERVED];
+ reservedChunks.emplace(reservedChunk);
+
+ NPDisk::TCommitRecord rec;
+ rec.CommitChunks.push_back(*reservedChunks.begin());
+ logNoTest(mocks[c], rec);
+ reservedChunks.clear();
+ }
+ testCtx.Recv<NPDisk::TEvHarakiriResult>();
+ }
+ }
+
+ Y_UNIT_TEST(DecommitWithInflight) {
+ THPTimer timer;
+ ui32 timeLimit = 20;
+ while (timer.Passed() < timeLimit) {
+ TActorTestContext testCtx(false);
+ ui32 dataSize = 1024;
+
+ auto logNoTest = [&](TVDiskMock& mock, NPDisk::TCommitRecord rec) {
+ auto evLog = MakeHolder<NPDisk::TEvLog>(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound, 0, PrepareData(1),
+ mock.GetLsnSeg(), nullptr);
+ evLog->Signature.SetCommitRecord();
+ evLog->CommitRecord = std::move(rec);
+ testCtx.Send(evLog.Release());
+ };
+
+ auto sendManyReads = [&](TVDiskMock& mock, TChunkIdx chunk, ui32 number, ui64& cookie) {
+ for (ui32 i = 0; i < number; ++i) {
+ testCtx.Send(new NPDisk::TEvChunkRead(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound,
+ chunk, 0, dataSize, 0, (void*)(cookie++)));
+ }
+ };
+
+ auto sendManyWrites = [&](TVDiskMock& mock, TChunkIdx chunk, ui32 number, ui64& cookie) {
+ for (ui32 i = 0; i < number; ++i) {
+ TString data = PrepareData(dataSize);
+ testCtx.Send(new NPDisk::TEvChunkWrite(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound,
+ chunk, 0, new NPDisk::TEvChunkWrite::TStrokaBackedUpParts(data), (void*)(cookie++), false, 0));
+ }
+ };
+
+ TVDiskMock mock(&testCtx);
+ mock.Init();
+
+ ui32 reservedChunks = 10;
+ for (ui32 i = 0; i < reservedChunks; ++i) {
+ mock.ReserveChunk();
+ }
+
+ {
+ auto& chunkIds = mock.Chunks[EChunkState::COMMITTED];
+ for (auto it = chunkIds.begin(); it != chunkIds.end(); ++it) {
+ TString data = PrepareData(dataSize);
+ testCtx.TestResponce<NPDisk::TEvChunkWriteResult>(new NPDisk::TEvChunkWrite(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound,
+ *it, 0, new NPDisk::TEvChunkWrite::TStrokaBackedUpParts(data), (void*)10, false, 0),
+ NKikimrProto::OK);
+ }
+ }
+
+ mock.CommitReservedChunks();
+
+ ui32 inflight = 50;
+ auto& chunkIds = mock.Chunks[EChunkState::COMMITTED];
+
+ ui64 cookie = 0;
+ for (auto it = chunkIds.begin(); it != chunkIds.end(); ++it) {
+ sendManyWrites(mock, *it, inflight, cookie);
+ sendManyReads(mock, *it, inflight, cookie);
+ NPDisk::TCommitRecord rec;
+ rec.DeleteChunks.push_back(*it);
+ rec.DeleteToDecommitted = true;
+ logNoTest(mock, rec);
+ sendManyWrites(mock, *it, inflight, cookie);
+ sendManyReads(mock, *it, inflight, cookie);
+ }
+ mock.Chunks[EChunkState::COMMITTED].clear();
+
+
+ for (ui32 i = 0; i < inflight * 2 * reservedChunks; ++i) {
+ {
+ auto res = testCtx.Recv<NPDisk::TEvChunkReadResult>();
+ UNIT_ASSERT_VALUES_EQUAL_C(res->Status, NKikimrProto::OK, res->ToString());
+ }
+ {
+ auto res = testCtx.Recv<NPDisk::TEvChunkWriteResult>();
+ UNIT_ASSERT_VALUES_EQUAL_C(res->Status, NKikimrProto::OK, res->ToString());
+ }
+ }
+ for (ui32 i = 0; i < reservedChunks; ++i) {
+ testCtx.TestResponce<NPDisk::TEvChunkForgetResult>(new NPDisk::TEvChunkForget(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound),
+ NKikimrProto::OK);
+ }
+ }
+ }
+
+ Y_UNIT_TEST(KillOwnerWhileDecommittingChunksWithInflight) {
+ THPTimer timer;
+ ui32 timeLimit = 20;
+ while (timer.Passed() < timeLimit) {
+ TActorTestContext testCtx(false);
+
+ auto logNoTest = [&](TVDiskMock& mock, NPDisk::TCommitRecord rec) {
+ auto evLog = MakeHolder<NPDisk::TEvLog>(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound, 0, PrepareData(1),
+ mock.GetLsnSeg(), nullptr);
+ evLog->Signature.SetCommitRecord();
+ evLog->CommitRecord = std::move(rec);
+ testCtx.Send(evLog.Release());
+ };
+
+ TVDiskMock mock(&testCtx);
+ mock.Init();
+
+ ui32 vdisksNum = 100;
+ std::vector<TVDiskMock> mocks;
+ for (ui32 i = 0; i < vdisksNum; ++i) {
+ mocks.push_back(TVDiskMock(&testCtx));
+ mocks[i].Init();
+ }
+
+ ui32 reservedChunks = 10;
+ for (ui32 i = 0; i < reservedChunks; ++i) {
+ mock.ReserveChunk();
+ }
+ mock.CommitReservedChunks();
+ TVector<TChunkIdx> chunkIds(mock.Chunks[EChunkState::COMMITTED].begin(), mock.Chunks[EChunkState::COMMITTED].end());
+
+ ui32 inflight = 50;
+
+ while (mock.Chunks[EChunkState::COMMITTED].size() > 0) {
+ auto it = mock.Chunks[EChunkState::COMMITTED].begin();
+ for (ui32 i = 0; i < inflight; ++i) {
+ TString data = "HATE. LET ME TELL YOU HOW MUCH I'VE COME TO HATE YOU SINCE I BEGAN TO LIVE...";
+ testCtx.Send(new NPDisk::TEvChunkWrite(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound,
+ *it, 0, new NPDisk::TEvChunkWrite::TStrokaBackedUpParts(data), nullptr, false, 0));
+ }
+ NPDisk::TCommitRecord rec;
+ rec.DeleteChunks.push_back(*it);
+ rec.DeleteToDecommitted = true;
+ logNoTest(mock, rec);
+ mock.Chunks[EChunkState::COMMITTED].erase(it);
+ }
+ mock.Chunks[EChunkState::COMMITTED].clear();
+
+ testCtx.Send(new NPDisk::TEvHarakiri(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound));
+
+ for (ui32 c = 0; ; c = (c + 1) % mocks.size()) {
+ testCtx.Send(new NPDisk::TEvChunkReserve(mocks[c].PDiskParams->Owner, mocks[c].PDiskParams->OwnerRound, 1));
+ THolder<NPDisk::TEvChunkReserveResult> evRes = testCtx.Recv<NPDisk::TEvChunkReserveResult>();
+ if (!evRes || evRes->Status != NKikimrProto::OK) {
+ break;
+ }
+ const ui32 reservedChunk = evRes->ChunkIds.front();
+ auto& reservedChunks = mocks[c].Chunks[EChunkState::RESERVED];
+ reservedChunks.emplace(reservedChunk);
+
+ NPDisk::TCommitRecord rec;
+ rec.CommitChunks.push_back(*reservedChunks.begin());
+ logNoTest(mocks[c], rec);
+ reservedChunks.clear();
+ }
+ testCtx.Recv<NPDisk::TEvHarakiriResult>();
+ }
+ }
+}
+
+}
diff --git a/ydb/core/blobstorage/pdisk/ut/CMakeLists.darwin.txt b/ydb/core/blobstorage/pdisk/ut/CMakeLists.darwin.txt
index 2412431a31e..056fc7ed795 100644
--- a/ydb/core/blobstorage/pdisk/ut/CMakeLists.darwin.txt
+++ b/ydb/core/blobstorage/pdisk/ut/CMakeLists.darwin.txt
@@ -34,6 +34,8 @@ target_sources(ydb-core-blobstorage-pdisk-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/pdisk/blobstorage_pdisk_crypto_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/pdisk/blobstorage_pdisk_util_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_races.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_actions.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_helpers.cpp
diff --git a/ydb/core/blobstorage/pdisk/ut/CMakeLists.linux.txt b/ydb/core/blobstorage/pdisk/ut/CMakeLists.linux.txt
index 41b649904d4..2665fa336a8 100644
--- a/ydb/core/blobstorage/pdisk/ut/CMakeLists.linux.txt
+++ b/ydb/core/blobstorage/pdisk/ut/CMakeLists.linux.txt
@@ -38,6 +38,8 @@ target_sources(ydb-core-blobstorage-pdisk-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/pdisk/blobstorage_pdisk_crypto_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/pdisk/blobstorage_pdisk_util_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_races.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_actions.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_helpers.cpp