diff options
author | serg-belyakov <serg-belyakov@yandex-team.com> | 2022-09-01 14:26:03 +0300 |
---|---|---|
committer | serg-belyakov <serg-belyakov@yandex-team.com> | 2022-09-01 14:26:03 +0300 |
commit | 37045415adbd50120bacf7a5bb00a69d1c93849d (patch) | |
tree | 462d2e38e2c7fc790c5b9b286f9175ab0ad296af | |
parent | 9da522eaf87dd50f3501b0e472fd2ec03d3f2076 (diff) | |
download | ydb-37045415adbd50120bacf7a5bb00a69d1c93849d.tar.gz |
Race test,
Separate race tests
6 files changed, 664 insertions, 552 deletions
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp index cc7e24858a8..4266d75266d 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp @@ -2,6 +2,7 @@ #include "blobstorage_pdisk_abstract.h" #include "blobstorage_pdisk_impl.h" +#include "blobstorage_pdisk_ut_env.h" #include <ydb/core/blobstorage/crypto/default.h> #include <ydb/core/testlib/actors/test_runtime.h> @@ -42,124 +43,6 @@ Y_UNIT_TEST_SUITE(TPDiskTest) { UNIT_ASSERT(NKikimrBlobStorage::TPDiskState::DeviceIoError == 13); } -struct TActorTestContext { -private: - std::optional<TActorId> PDiskActor; - THolder<TTestActorRuntime> Runtime; - std::shared_ptr<NPDisk::IIoContextFactory> IoContext; - NPDisk::TPDisk *PDisk = nullptr; - -public: - TActorId Sender; - NPDisk::TKey MainKey = NPDisk::YdbDefaultPDiskSequence; - TTestContext TestCtx{false, /*use sector map*/ true}; - - TIntrusivePtr<TPDiskConfig> DefaultPDiskConfig(bool isBad) { - TString path; - EntropyPool().Read(&TestCtx.PDiskGuid, sizeof(TestCtx.PDiskGuid)); - ui64 formatGuid = TestCtx.PDiskGuid + static_cast<ui64>(isBad); - FormatPDiskForTest(path, formatGuid, MIN_CHUNK_SIZE, false, TestCtx.SectorMap); - - ui64 pDiskCategory = 0; - TIntrusivePtr<TPDiskConfig> pDiskConfig = new TPDiskConfig(path, TestCtx.PDiskGuid, 1, pDiskCategory); - pDiskConfig->GetDriveDataSwitch = NKikimrBlobStorage::TPDiskConfig::DoNotTouch; - pDiskConfig->WriteCacheSwitch = NKikimrBlobStorage::TPDiskConfig::DoNotTouch; - pDiskConfig->ChunkSize = MIN_CHUNK_SIZE; - pDiskConfig->SectorMap = TestCtx.SectorMap; - pDiskConfig->EnableSectorEncryption = !pDiskConfig->SectorMap; - return pDiskConfig; - } - - TActorTestContext(bool isBad) - : Runtime(new TTestActorRuntime(1, true)) - { - auto appData = MakeHolder<TAppData>(0, 0, 0, 0, TMap<TString, ui32>(), nullptr, nullptr, nullptr, nullptr); - IoContext = std::make_shared<NPDisk::TIoContextFactoryOSS>(); - appData->IoContextFactory = IoContext.get(); - - Runtime->SetLogBackend(IsLowVerbose ? CreateStderrBackend() : CreateNullBackend()); - Runtime->Initialize(TTestActorRuntime::TEgg{appData.Release(), nullptr, {}}); - Runtime->SetLogPriority(NKikimrServices::BS_PDISK, NLog::PRI_NOTICE); - Runtime->SetLogPriority(NKikimrServices::BS_PDISK_SYSLOG, NLog::PRI_NOTICE); - Runtime->SetLogPriority(NKikimrServices::BS_PDISK_TEST, NLog::PRI_DEBUG); - Sender = Runtime->AllocateEdgeActor(); - - TIntrusivePtr<TPDiskConfig> cfg = DefaultPDiskConfig(isBad); - UpdateConfigRecreatePDisk(cfg); - } - - TIntrusivePtr<TPDiskConfig> GetPDiskConfig() { - return GetPDisk()->Cfg; - } - - void UpdateConfigRecreatePDisk(TIntrusivePtr<TPDiskConfig> cfg) { - if (PDiskActor) { - TestResponce<NPDisk::TEvYardControlResult>( - new NPDisk::TEvYardControl(NPDisk::TEvYardControl::PDiskStop, nullptr), - NKikimrProto::OK); - PDisk = nullptr; - Runtime->Send(new IEventHandle(*PDiskActor, Sender, new TKikimrEvents::TEvPoisonPill)); - } - - auto mainCounters = TIntrusivePtr<::NMonitoring::TDynamicCounters>(new ::NMonitoring::TDynamicCounters()); - IActor* pDiskActor = CreatePDisk(cfg.Get(), MainKey, mainCounters); - PDiskActor = Runtime->Register(pDiskActor); - } - - void Send(IEventBase* ev) { - Runtime->Send(new IEventHandle(*PDiskActor, Sender, ev)); - } - - NPDisk::TPDisk *GetPDisk() { - if (!PDisk) { - // To be sure that pdisk actor is in StateOnline - TestResponce<NPDisk::TEvYardControlResult>( - new NPDisk::TEvYardControl(NPDisk::TEvYardControl::PDiskStart, &MainKey), - NKikimrProto::OK); - - const auto evControlRes = TestResponce<NPDisk::TEvYardControlResult>( - new NPDisk::TEvYardControl(NPDisk::TEvYardControl::GetPDiskPointer, nullptr), - NKikimrProto::OK); - PDisk = reinterpret_cast<NPDisk::TPDisk*>(evControlRes->Cookie); - } - return PDisk; - } - - template<typename T> - auto SafeRunOnPDisk(T&& f) { - TGuard<TMutex> g(GetPDisk()->StateMutex); - return f(GetPDisk()); - } - - void RestartPDiskSync() { - TestResponce<NPDisk::TEvYardControlResult>( - new NPDisk::TEvYardControl(NPDisk::TEvYardControl::PDiskStop, nullptr), - NKikimrProto::OK); - PDisk = nullptr; - // wait initialization and update this->PDisk - GetPDisk(); - } - - template<typename TRes> - THolder<TRes> Recv() { - return Runtime->GrabEdgeEvent<TRes>(); - } - - template<typename TRes> - THolder<TRes> TestResponce(IEventBase* ev, NKikimrProto::EReplyStatus status) { - if (ev) { - Send(ev); - } - THolder<TRes> evRes = Recv<TRes>(); - UNIT_ASSERT_C(evRes->Status == status, evRes->ToString()); - UNIT_ASSERT(status == NKikimrProto::OK || !evRes->ErrorReason.empty()); - - // Test that all ToString methods don't VERIFY - Cnull << evRes->ToString(); - return evRes; - } -}; - Y_UNIT_TEST(TestPDiskActorErrorState) { TActorTestContext testCtx(true); @@ -222,65 +105,6 @@ public: testCtx.Send(new NActors::TEvents::TEvPoisonPill()); } - void TestChunkWriteReleaseRun() { - TActorTestContext testCtx(false); - - const TVDiskID vDiskID(0, 1, 0, 0, 0); - const auto evInitRes = testCtx.TestResponce<NPDisk::TEvYardInitResult>( - new NPDisk::TEvYardInit(2, vDiskID, testCtx.TestCtx.PDiskGuid), - NKikimrProto::OK); - const auto evReserveRes = testCtx.TestResponce<NPDisk::TEvChunkReserveResult>( - new NPDisk::TEvChunkReserve(evInitRes->PDiskParams->Owner, evInitRes->PDiskParams->OwnerRound, 1), - NKikimrProto::OK); - UNIT_ASSERT(evReserveRes->ChunkIds.size() == 1); - - const ui32 reservedChunk = evReserveRes->ChunkIds.front(); - NPDisk::TCommitRecord commitRecord; - commitRecord.CommitChunks.push_back(reservedChunk); - testCtx.TestResponce<NPDisk::TEvLogResult>( - new NPDisk::TEvLog(evInitRes->PDiskParams->Owner, evInitRes->PDiskParams->OwnerRound, 0, commitRecord, - TString(), TLsnSeg(1, 1), nullptr), - NKikimrProto::OK); - - const auto evControlRes = testCtx.TestResponce<NPDisk::TEvYardControlResult>( - new NPDisk::TEvYardControl(NPDisk::TEvYardControl::GetPDiskPointer, nullptr), - NKikimrProto::OK); - auto *pDisk = reinterpret_cast<NPDisk::TPDisk*>(evControlRes->Cookie); - pDisk->PDiskThread.StopSync(); - - { - NPDisk::TCommitRecord commitRecord; - commitRecord.DeleteChunks.push_back(reservedChunk); - NPDisk::TEvLog ev(evInitRes->PDiskParams->Owner, evInitRes->PDiskParams->OwnerRound, 0, commitRecord, - TString(), TLsnSeg(2, 2), nullptr); - NPDisk::TLogWrite *log = new NPDisk::TLogWrite(ev, testCtx.Sender, 0, {}, {}); - bool ok = pDisk->PreprocessRequest(log); - UNIT_ASSERT(ok); - pDisk->RouteRequest(log); - } - pDisk->ProcessLogWriteQueueAndCommits(); - - { - TString chunkWriteData = PrepareData(1024); - NPDisk::TEvChunkWrite ev(evInitRes->PDiskParams->Owner, evInitRes->PDiskParams->OwnerRound, reservedChunk, - 0, new NPDisk::TEvChunkWrite::TStrokaBackedUpParts(chunkWriteData), nullptr, false, 0); - NPDisk::TChunkWrite *chunkWrite = new NPDisk::TChunkWrite(ev, testCtx.Sender, {}, {}); - bool ok = pDisk->PreprocessRequest(chunkWrite); - UNIT_ASSERT(!ok); - } - - pDisk->ProcessChunkWriteQueue(); - - testCtx.TestResponce<NPDisk::TEvLogResult>( - nullptr, - NKikimrProto::OK); - testCtx.TestResponce<NPDisk::TEvChunkWriteResult>( - nullptr, - NKikimrProto::ERROR); - - testCtx.Send(new NActors::TEvents::TEvPoisonPill()); - } - Y_UNIT_TEST(TestChunkWriteRelease) { for (ui32 i = 0; i < 16; ++i) { TestChunkWriteReleaseRun(); @@ -324,23 +148,6 @@ public: } } -struct TVDiskIDOwnerRound { - TVDiskID VDiskID; - ui64 OwnerRound; -}; - -void RecreateOwner(TActorTestContext& testCtx, TVDiskIDOwnerRound& vdisk) { - testCtx.TestResponce<NPDisk::TEvSlayResult>( - new NPDisk::TEvSlay(vdisk.VDiskID, vdisk.OwnerRound + 1, 0, 0), - NKikimrProto::OK); - - const auto evInitRes = testCtx.TestResponce<NPDisk::TEvYardInitResult>( - new NPDisk::TEvYardInit(vdisk.OwnerRound + 1, vdisk.VDiskID, testCtx.TestCtx.PDiskGuid), - NKikimrProto::OK); - - vdisk.OwnerRound = evInitRes->PDiskParams->OwnerRound; -} - Y_UNIT_TEST(TestPDiskManyOwnersInitiation) { TActorTestContext testCtx(false); @@ -375,152 +182,6 @@ void RecreateOwner(TActorTestContext& testCtx, TVDiskIDOwnerRound& vdisk) { } } -enum class EChunkState { - UNKNOWN, - RESERVED, - COMMIT_INFLIGHT, - COMMITTED, - DELETE_INFLIGHT, - DELETED -}; - -struct TVDiskMock { - static std::atomic<ui64> Idx; - static std::atomic<ui64> OwnerRound; - - TActorTestContext *TestCtx; - const TVDiskID VDiskID; - TIntrusivePtr<TPDiskParams> PDiskParams; - ui64 LastUsedLsn = 0; - ui64 FirstLsnToKeep = 1; - - TMap<EChunkState, TSet<TChunkIdx>> Chunks; - - TVDiskMock(TActorTestContext *testCtx) - : TestCtx(testCtx) - , VDiskID(Idx.fetch_add(1), 1, 0, 0, 0) - {} - - TLsnSeg GetLsnSeg() { - ++LastUsedLsn; - return {LastUsedLsn, LastUsedLsn}; - }; - - void InitFull() { - Init(); - ReadLog(); - SendEvLogImpl(1, {}, true); - } - - void Init() { - const auto evInitRes = TestCtx->TestResponce<NPDisk::TEvYardInitResult>( - new NPDisk::TEvYardInit(OwnerRound.fetch_add(1), VDiskID, TestCtx->TestCtx.PDiskGuid), - NKikimrProto::OK); - PDiskParams = evInitRes->PDiskParams; - - TSet<TChunkIdx> commited = Chunks[EChunkState::COMMITTED]; - for (TChunkIdx chunk : evInitRes->OwnedChunks) { - UNIT_ASSERT_C(commited.count(chunk), "misowned chunk# " << chunk); - commited.erase(chunk); - } - UNIT_ASSERT_C(commited.empty(), "there are leaked chunks# " << FormatList(commited)); - } - - - void ReserveChunk() { - const auto evReserveRes = TestCtx->TestResponce<NPDisk::TEvChunkReserveResult>( - new NPDisk::TEvChunkReserve(PDiskParams->Owner, PDiskParams->OwnerRound, 1), - NKikimrProto::OK); - UNIT_ASSERT(evReserveRes->ChunkIds.size() == 1); - const ui32 reservedChunk = evReserveRes->ChunkIds.front(); - Chunks[EChunkState::RESERVED].emplace(reservedChunk); - } - - void CommitReservedChunks() { - auto& reservedChunks = Chunks[EChunkState::RESERVED]; - NPDisk::TCommitRecord rec; - rec.CommitChunks = TVector<TChunkIdx>(reservedChunks.begin(), reservedChunks.end()); - SendEvLogImpl(1, rec); - Chunks[EChunkState::COMMITTED].insert(reservedChunks.begin(), reservedChunks.end()); - reservedChunks.clear(); - } - - void DeleteCommitedChunks() { - auto& commited = Chunks[EChunkState::COMMITTED]; - NPDisk::TCommitRecord rec; - rec.DeleteChunks = TVector<TChunkIdx>(commited.begin(), commited.end()); - SendEvLogImpl(1, rec); - Chunks[EChunkState::DELETED].insert(commited.begin(), commited.end()); - commited.clear(); - } - - ui64 ReadLog(std::function<void(const NPDisk::TLogRecord&)> logResCallback = {}) { - ui64 logRecordsRead = 0; - - NPDisk::TLogPosition position{0, 0}; - bool endOfLog = false; - do { - UNIT_ASSERT(PDiskParams); - auto logReadRes = TestCtx->TestResponce<NPDisk::TEvReadLogResult>( - new NPDisk::TEvReadLog(PDiskParams->Owner, PDiskParams->OwnerRound, position), - NKikimrProto::OK); - UNIT_ASSERT(position == logReadRes->Position); - for (const NPDisk::TLogRecord& rec : logReadRes->Results) { - ++logRecordsRead; - if (logResCallback) { - logResCallback(rec); - } - LastUsedLsn = Max(LastUsedLsn, rec.Lsn); - } - position = logReadRes->NextPosition; - endOfLog = logReadRes->IsEndOfLog; - } while (!endOfLog); - - return logRecordsRead; - } - - void SendEvLogSync(const ui64 size = 128) { - SendEvLogImpl(size, {}, false); - } - - void CutLogAllButOne() { - SendEvLogImpl(1, LastUsedLsn + 1, true); - } - - ui64 OwnedLogRecords() const { - return LastUsedLsn + 1 - FirstLsnToKeep; - } - -private: - void SendEvLogImpl(const ui64 size, TMaybe<NPDisk::TCommitRecord> commitRec) { - auto evLog = MakeHolder<NPDisk::TEvLog>(PDiskParams->Owner, PDiskParams->OwnerRound, 0, PrepareData(size), - GetLsnSeg(), nullptr); - - if (commitRec) { - evLog->Signature.SetCommitRecord(); - evLog->CommitRecord = std::move(*commitRec); - } - - TestCtx->TestResponce<NPDisk::TEvLogResult>(evLog.Release(), NKikimrProto::OK); - } - - void SendEvLogImpl(const ui64 size, TMaybe<ui64> firstLsnToKeep, bool isStartingPoint) { - - TMaybe<NPDisk::TCommitRecord> rec; - - if (firstLsnToKeep || isStartingPoint) { - rec = NPDisk::TCommitRecord(); - rec->FirstLsnToKeep = firstLsnToKeep.GetOrElse(0); - FirstLsnToKeep = Max(FirstLsnToKeep, firstLsnToKeep.GetOrElse(0)); - rec->IsStartingPoint = isStartingPoint; - } - SendEvLogImpl(size, rec); - } -}; - -std::atomic<ui64> TVDiskMock::Idx = 0; -std::atomic<ui64> TVDiskMock::OwnerRound = 2; - Y_UNIT_TEST(TestVDiskMock) { TActorTestContext testCtx(false); TVDiskMock mock(&testCtx); @@ -1023,217 +684,5 @@ std::atomic<ui64> TVDiskMock::OwnerRound = 2; new NPDisk::TEvCheckSpace(evInitRes->PDiskParams->Owner, evInitRes->PDiskParams->OwnerRound), NKikimrProto::CORRUPTED); } - - Y_UNIT_TEST(KillOwnerWhileDeletingChunk) { - THPTimer timer; - ui32 timeLimit = 20; - while (timer.Passed() < timeLimit) { - TActorTestContext testCtx(false); - - auto logNoTest = [&](TVDiskMock& mock, NPDisk::TCommitRecord rec) { - auto evLog = MakeHolder<NPDisk::TEvLog>(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound, 0, PrepareData(1), - mock.GetLsnSeg(), nullptr); - evLog->Signature.SetCommitRecord(); - evLog->CommitRecord = std::move(rec); - testCtx.Send(evLog.Release()); - }; - - TVDiskMock mock(&testCtx); - mock.Init(); - - ui32 vdisksNum = 100; - std::vector<TVDiskMock> mocks; - for (ui32 i = 0; i < vdisksNum; ++i) { - mocks.push_back(TVDiskMock(&testCtx)); - mocks[i].Init(); - } - - ui32 reservedChunks = 10; - - for (ui32 i = 0; i < reservedChunks; ++i) { - mock.ReserveChunk(); - } - mock.CommitReservedChunks(); - - while (mock.Chunks[EChunkState::COMMITTED].size() > 0) { - auto it = mock.Chunks[EChunkState::COMMITTED].begin(); - NPDisk::TCommitRecord rec; - rec.DeleteChunks.push_back(*it); - logNoTest(mock, rec); - mock.Chunks[EChunkState::COMMITTED].erase(it); - } - - testCtx.Send(new NPDisk::TEvHarakiri(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound)); - - for (ui32 c = 0; ; c = (c + 1) % mocks.size()) { - testCtx.Send(new NPDisk::TEvChunkReserve(mocks[c].PDiskParams->Owner, mocks[c].PDiskParams->OwnerRound, 1)); - THolder<NPDisk::TEvChunkReserveResult> evRes = testCtx.Recv<NPDisk::TEvChunkReserveResult>(); - if (!evRes || evRes->Status != NKikimrProto::OK) { - break; - } - const ui32 reservedChunk = evRes->ChunkIds.front(); - auto& reservedChunks = mocks[c].Chunks[EChunkState::RESERVED]; - reservedChunks.emplace(reservedChunk); - - NPDisk::TCommitRecord rec; - rec.CommitChunks.push_back(*reservedChunks.begin()); - logNoTest(mocks[c], rec); - reservedChunks.clear(); - } - testCtx.Recv<NPDisk::TEvHarakiriResult>(); - } - } - - Y_UNIT_TEST(KillOwnerWhileDeletingChunkWithInflight) { - THPTimer timer; - ui32 timeLimit = 20; - while (timer.Passed() < timeLimit) { - TActorTestContext testCtx(false); - - auto logNoTest = [&](TVDiskMock& mock, NPDisk::TCommitRecord rec) { - auto evLog = MakeHolder<NPDisk::TEvLog>(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound, 0, PrepareData(1), - mock.GetLsnSeg(), nullptr); - evLog->Signature.SetCommitRecord(); - evLog->CommitRecord = std::move(rec); - testCtx.Send(evLog.Release()); - }; - - TVDiskMock mock(&testCtx); - mock.Init(); - - ui32 vdisksNum = 100; - std::vector<TVDiskMock> mocks; - for (ui32 i = 0; i < vdisksNum; ++i) { - mocks.push_back(TVDiskMock(&testCtx)); - mocks[i].Init(); - } - - ui32 reservedChunks = 10; - for (ui32 i = 0; i < reservedChunks; ++i) { - mock.ReserveChunk(); - } - mock.CommitReservedChunks(); - TVector<TChunkIdx> chunkIds(mock.Chunks[EChunkState::COMMITTED].begin(), mock.Chunks[EChunkState::COMMITTED].end()); - - ui32 inflight = 300; - - while (mock.Chunks[EChunkState::COMMITTED].size() > 0) { - auto it = mock.Chunks[EChunkState::COMMITTED].begin(); - for (ui32 i = 0; i < inflight; ++i) { - TString data = "HATE. LET ME TELL YOU HOW MUCH I'VE COME TO HATE YOU SINCE I BEGAN TO LIVE..."; - testCtx.Send(new NPDisk::TEvChunkWrite(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound, - *it, 0, new NPDisk::TEvChunkWrite::TStrokaBackedUpParts(data), nullptr, false, 0)); - } - NPDisk::TCommitRecord rec; - rec.DeleteChunks.push_back(*it); - logNoTest(mock, rec); - mock.Chunks[EChunkState::COMMITTED].erase(it); - } - mock.Chunks[EChunkState::COMMITTED].clear(); - - testCtx.Send(new NPDisk::TEvHarakiri(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound)); - - for (ui32 c = 0; ; c = (c + 1) % mocks.size()) { - testCtx.Send(new NPDisk::TEvChunkReserve(mocks[c].PDiskParams->Owner, mocks[c].PDiskParams->OwnerRound, 1)); - THolder<NPDisk::TEvChunkReserveResult> evRes = testCtx.Recv<NPDisk::TEvChunkReserveResult>(); - if (!evRes || evRes->Status != NKikimrProto::OK) { - break; - } - const ui32 reservedChunk = evRes->ChunkIds.front(); - auto& reservedChunks = mocks[c].Chunks[EChunkState::RESERVED]; - reservedChunks.emplace(reservedChunk); - - NPDisk::TCommitRecord rec; - rec.CommitChunks.push_back(*reservedChunks.begin()); - logNoTest(mocks[c], rec); - reservedChunks.clear(); - } - testCtx.Recv<NPDisk::TEvHarakiriResult>(); - } - } - - Y_UNIT_TEST(DecommitWithInflight) { - THPTimer timer; - ui32 timeLimit = 20; - while (timer.Passed() < timeLimit) { - TActorTestContext testCtx(false); - ui32 dataSize = 1024; - - auto logNoTest = [&](TVDiskMock& mock, NPDisk::TCommitRecord rec) { - auto evLog = MakeHolder<NPDisk::TEvLog>(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound, 0, PrepareData(1), - mock.GetLsnSeg(), nullptr); - evLog->Signature.SetCommitRecord(); - evLog->CommitRecord = std::move(rec); - testCtx.Send(evLog.Release()); - }; - - auto sendManyReads = [&](TVDiskMock& mock, TChunkIdx chunk, ui32 number, ui64& cookie) { - for (ui32 i = 0; i < number; ++i) { - testCtx.Send(new NPDisk::TEvChunkRead(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound, - chunk, 0, dataSize, 0, (void*)(cookie++))); - } - }; - - auto sendManyWrites = [&](TVDiskMock& mock, TChunkIdx chunk, ui32 number, ui64& cookie) { - for (ui32 i = 0; i < number; ++i) { - TString data = PrepareData(dataSize); - testCtx.Send(new NPDisk::TEvChunkWrite(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound, - chunk, 0, new NPDisk::TEvChunkWrite::TStrokaBackedUpParts(data), (void*)(cookie++), false, 0)); - } - }; - - TVDiskMock mock(&testCtx); - mock.Init(); - - ui32 reservedChunks = 10; - for (ui32 i = 0; i < reservedChunks; ++i) { - mock.ReserveChunk(); - } - - { - auto& chunkIds = mock.Chunks[EChunkState::COMMITTED]; - for (auto it = chunkIds.begin(); it != chunkIds.end(); ++it) { - TString data = PrepareData(dataSize); - testCtx.TestResponce<NPDisk::TEvChunkWriteResult>(new NPDisk::TEvChunkWrite(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound, - *it, 0, new NPDisk::TEvChunkWrite::TStrokaBackedUpParts(data), (void*)10, false, 0), - NKikimrProto::OK); - } - } - - mock.CommitReservedChunks(); - - ui32 inflight = 100; - auto& chunkIds = mock.Chunks[EChunkState::COMMITTED]; - - ui64 cookie = 0; - for (auto it = chunkIds.begin(); it != chunkIds.end(); ++it) { - sendManyWrites(mock, *it, inflight, cookie); - sendManyReads(mock, *it, inflight, cookie); - NPDisk::TCommitRecord rec; - rec.DeleteChunks.push_back(*it); - rec.DeleteToDecommitted = true; - logNoTest(mock, rec); - sendManyWrites(mock, *it, inflight, cookie); - sendManyReads(mock, *it, inflight, cookie); - } - mock.Chunks[EChunkState::COMMITTED].clear(); - - - for (ui32 i = 0; i < inflight * 2 * reservedChunks; ++i) { - { - auto res = testCtx.Recv<NPDisk::TEvChunkReadResult>(); - UNIT_ASSERT_VALUES_EQUAL_C(res->Status, NKikimrProto::OK, res->ToString()); - } - { - auto res = testCtx.Recv<NPDisk::TEvChunkWriteResult>(); - UNIT_ASSERT_VALUES_EQUAL_C(res->Status, NKikimrProto::OK, res->ToString()); - } - } - for (ui32 i = 0; i < reservedChunks; ++i) { - testCtx.TestResponce<NPDisk::TEvChunkForgetResult>(new NPDisk::TEvChunkForget(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound), - NKikimrProto::OK); - } - } - } } } // namespace NKikimr diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.cpp new file mode 100644 index 00000000000..3777db36499 --- /dev/null +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.cpp @@ -0,0 +1,78 @@ +#include "blobstorage_pdisk_ut_env.h" + +namespace NKikimr { +void RecreateOwner(TActorTestContext& testCtx, TVDiskIDOwnerRound& vdisk) { + testCtx.TestResponce<NPDisk::TEvSlayResult>( + new NPDisk::TEvSlay(vdisk.VDiskID, vdisk.OwnerRound + 1, 0, 0), + NKikimrProto::OK); + + const auto evInitRes = testCtx.TestResponce<NPDisk::TEvYardInitResult>( + new NPDisk::TEvYardInit(vdisk.OwnerRound + 1, vdisk.VDiskID, testCtx.TestCtx.PDiskGuid), + NKikimrProto::OK); + + vdisk.OwnerRound = evInitRes->PDiskParams->OwnerRound; +} + +void TestChunkWriteReleaseRun() { + TActorTestContext testCtx(false); + + const TVDiskID vDiskID(0, 1, 0, 0, 0); + const auto evInitRes = testCtx.TestResponce<NPDisk::TEvYardInitResult>( + new NPDisk::TEvYardInit(2, vDiskID, testCtx.TestCtx.PDiskGuid), + NKikimrProto::OK); + const auto evReserveRes = testCtx.TestResponce<NPDisk::TEvChunkReserveResult>( + new NPDisk::TEvChunkReserve(evInitRes->PDiskParams->Owner, evInitRes->PDiskParams->OwnerRound, 1), + NKikimrProto::OK); + UNIT_ASSERT(evReserveRes->ChunkIds.size() == 1); + + const ui32 reservedChunk = evReserveRes->ChunkIds.front(); + NPDisk::TCommitRecord commitRecord; + commitRecord.CommitChunks.push_back(reservedChunk); + testCtx.TestResponce<NPDisk::TEvLogResult>( + new NPDisk::TEvLog(evInitRes->PDiskParams->Owner, evInitRes->PDiskParams->OwnerRound, 0, commitRecord, + TString(), TLsnSeg(1, 1), nullptr), + NKikimrProto::OK); + + const auto evControlRes = testCtx.TestResponce<NPDisk::TEvYardControlResult>( + new NPDisk::TEvYardControl(NPDisk::TEvYardControl::GetPDiskPointer, nullptr), + NKikimrProto::OK); + auto *pDisk = reinterpret_cast<NPDisk::TPDisk*>(evControlRes->Cookie); + pDisk->PDiskThread.StopSync(); + + { + NPDisk::TCommitRecord commitRecord; + commitRecord.DeleteChunks.push_back(reservedChunk); + NPDisk::TEvLog ev(evInitRes->PDiskParams->Owner, evInitRes->PDiskParams->OwnerRound, 0, commitRecord, + TString(), TLsnSeg(2, 2), nullptr); + NPDisk::TLogWrite *log = new NPDisk::TLogWrite(ev, testCtx.Sender, 0, {}, {}); + bool ok = pDisk->PreprocessRequest(log); + UNIT_ASSERT(ok); + pDisk->RouteRequest(log); + } + pDisk->ProcessLogWriteQueueAndCommits(); + + { + TString chunkWriteData = PrepareData(1024); + NPDisk::TEvChunkWrite ev(evInitRes->PDiskParams->Owner, evInitRes->PDiskParams->OwnerRound, reservedChunk, + 0, new NPDisk::TEvChunkWrite::TStrokaBackedUpParts(chunkWriteData), nullptr, false, 0); + NPDisk::TChunkWrite *chunkWrite = new NPDisk::TChunkWrite(ev, testCtx.Sender, {}, {}); + bool ok = pDisk->PreprocessRequest(chunkWrite); + UNIT_ASSERT(!ok); + } + + pDisk->ProcessChunkWriteQueue(); + + testCtx.TestResponce<NPDisk::TEvLogResult>( + nullptr, + NKikimrProto::OK); + testCtx.TestResponce<NPDisk::TEvChunkWriteResult>( + nullptr, + NKikimrProto::ERROR); + + testCtx.Send(new NActors::TEvents::TEvPoisonPill()); +} + +std::atomic<ui64> TVDiskMock::Idx = 0; +std::atomic<ui64> TVDiskMock::OwnerRound = 2; + +}
\ No newline at end of file diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h new file mode 100644 index 00000000000..37d157a2b83 --- /dev/null +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h @@ -0,0 +1,284 @@ +#pragma once +#include "blobstorage_pdisk_ut.h" + +#include "blobstorage_pdisk_abstract.h" +#include "blobstorage_pdisk_impl.h" +#include "blobstorage_pdisk_ut_env.h" + +#include <ydb/core/blobstorage/crypto/default.h> +#include <ydb/core/testlib/actors/test_runtime.h> + +#include <util/system/hp_timer.h> + +namespace NKikimr { + +struct TActorTestContext { +private: + std::optional<TActorId> PDiskActor; + THolder<TTestActorRuntime> Runtime; + std::shared_ptr<NPDisk::IIoContextFactory> IoContext; + NPDisk::TPDisk *PDisk = nullptr; + +public: + TActorId Sender; + NPDisk::TKey MainKey = NPDisk::YdbDefaultPDiskSequence; + TTestContext TestCtx{false, /*use sector map*/ true}; + + TIntrusivePtr<TPDiskConfig> DefaultPDiskConfig(bool isBad) { + TString path; + EntropyPool().Read(&TestCtx.PDiskGuid, sizeof(TestCtx.PDiskGuid)); + ui64 formatGuid = TestCtx.PDiskGuid + static_cast<ui64>(isBad); + FormatPDiskForTest(path, formatGuid, MIN_CHUNK_SIZE, false, TestCtx.SectorMap); + + ui64 pDiskCategory = 0; + TIntrusivePtr<TPDiskConfig> pDiskConfig = new TPDiskConfig(path, TestCtx.PDiskGuid, 1, pDiskCategory); + pDiskConfig->GetDriveDataSwitch = NKikimrBlobStorage::TPDiskConfig::DoNotTouch; + pDiskConfig->WriteCacheSwitch = NKikimrBlobStorage::TPDiskConfig::DoNotTouch; + pDiskConfig->ChunkSize = MIN_CHUNK_SIZE; + pDiskConfig->SectorMap = TestCtx.SectorMap; + pDiskConfig->EnableSectorEncryption = !pDiskConfig->SectorMap; + return pDiskConfig; + } + + TActorTestContext(bool isBad) + : Runtime(new TTestActorRuntime(1, true)) + { + auto appData = MakeHolder<TAppData>(0, 0, 0, 0, TMap<TString, ui32>(), nullptr, nullptr, nullptr, nullptr); + IoContext = std::make_shared<NPDisk::TIoContextFactoryOSS>(); + appData->IoContextFactory = IoContext.get(); + + Runtime->SetLogBackend(IsLowVerbose ? CreateStderrBackend() : CreateNullBackend()); + Runtime->Initialize(TTestActorRuntime::TEgg{appData.Release(), nullptr, {}}); + Runtime->SetLogPriority(NKikimrServices::BS_PDISK, NLog::PRI_NOTICE); + Runtime->SetLogPriority(NKikimrServices::BS_PDISK_SYSLOG, NLog::PRI_NOTICE); + Runtime->SetLogPriority(NKikimrServices::BS_PDISK_TEST, NLog::PRI_DEBUG); + Sender = Runtime->AllocateEdgeActor(); + + TIntrusivePtr<TPDiskConfig> cfg = DefaultPDiskConfig(isBad); + UpdateConfigRecreatePDisk(cfg); + } + + TIntrusivePtr<TPDiskConfig> GetPDiskConfig() { + return GetPDisk()->Cfg; + } + + void UpdateConfigRecreatePDisk(TIntrusivePtr<TPDiskConfig> cfg) { + if (PDiskActor) { + TestResponce<NPDisk::TEvYardControlResult>( + new NPDisk::TEvYardControl(NPDisk::TEvYardControl::PDiskStop, nullptr), + NKikimrProto::OK); + PDisk = nullptr; + Runtime->Send(new IEventHandle(*PDiskActor, Sender, new TKikimrEvents::TEvPoisonPill)); + } + + auto mainCounters = TIntrusivePtr<::NMonitoring::TDynamicCounters>(new ::NMonitoring::TDynamicCounters()); + IActor* pDiskActor = CreatePDisk(cfg.Get(), MainKey, mainCounters); + PDiskActor = Runtime->Register(pDiskActor); + } + + void Send(IEventBase* ev) { + Runtime->Send(new IEventHandle(*PDiskActor, Sender, ev)); + } + + NPDisk::TPDisk *GetPDisk() { + if (!PDisk) { + // To be sure that pdisk actor is in StateOnline + TestResponce<NPDisk::TEvYardControlResult>( + new NPDisk::TEvYardControl(NPDisk::TEvYardControl::PDiskStart, &MainKey), + NKikimrProto::OK); + + const auto evControlRes = TestResponce<NPDisk::TEvYardControlResult>( + new NPDisk::TEvYardControl(NPDisk::TEvYardControl::GetPDiskPointer, nullptr), + NKikimrProto::OK); + PDisk = reinterpret_cast<NPDisk::TPDisk*>(evControlRes->Cookie); + } + return PDisk; + } + + template<typename T> + auto SafeRunOnPDisk(T&& f) { + TGuard<TMutex> g(GetPDisk()->StateMutex); + return f(GetPDisk()); + } + + void RestartPDiskSync() { + TestResponce<NPDisk::TEvYardControlResult>( + new NPDisk::TEvYardControl(NPDisk::TEvYardControl::PDiskStop, nullptr), + NKikimrProto::OK); + PDisk = nullptr; + // wait initialization and update this->PDisk + GetPDisk(); + } + + template<typename TRes> + THolder<TRes> Recv() { + return Runtime->GrabEdgeEvent<TRes>(); + } + + template<typename TRes> + THolder<TRes> TestResponce(IEventBase* ev, NKikimrProto::EReplyStatus status) { + if (ev) { + Send(ev); + } + THolder<TRes> evRes = Recv<TRes>(); + UNIT_ASSERT_C(evRes->Status == status, evRes->ToString()); + UNIT_ASSERT(status == NKikimrProto::OK || !evRes->ErrorReason.empty()); + + // Test that all ToString methods don't VERIFY + Cnull << evRes->ToString(); + return evRes; + } +}; + +struct TVDiskIDOwnerRound { + TVDiskID VDiskID; + ui64 OwnerRound; +}; + +void RecreateOwner(TActorTestContext& testCtx, TVDiskIDOwnerRound& vdisk); + +enum class EChunkState { + UNKNOWN, + RESERVED, + COMMIT_INFLIGHT, + COMMITTED, + DELETE_INFLIGHT, + DELETED +}; + +struct TVDiskMock { + static std::atomic<ui64> Idx; + static std::atomic<ui64> OwnerRound; + + TActorTestContext *TestCtx; + const TVDiskID VDiskID; + TIntrusivePtr<TPDiskParams> PDiskParams; + ui64 LastUsedLsn = 0; + ui64 FirstLsnToKeep = 1; + + TMap<EChunkState, TSet<TChunkIdx>> Chunks; + + TVDiskMock(TActorTestContext *testCtx) + : TestCtx(testCtx) + , VDiskID(Idx.fetch_add(1), 1, 0, 0, 0) + {} + + TLsnSeg GetLsnSeg() { + ++LastUsedLsn; + return {LastUsedLsn, LastUsedLsn}; + }; + + void InitFull() { + Init(); + ReadLog(); + SendEvLogImpl(1, {}, true); + } + + void Init() { + const auto evInitRes = TestCtx->TestResponce<NPDisk::TEvYardInitResult>( + new NPDisk::TEvYardInit(OwnerRound.fetch_add(1), VDiskID, TestCtx->TestCtx.PDiskGuid), + NKikimrProto::OK); + PDiskParams = evInitRes->PDiskParams; + + TSet<TChunkIdx> commited = Chunks[EChunkState::COMMITTED]; + for (TChunkIdx chunk : evInitRes->OwnedChunks) { + UNIT_ASSERT_C(commited.count(chunk), "misowned chunk# " << chunk); + commited.erase(chunk); + } + UNIT_ASSERT_C(commited.empty(), "there are leaked chunks# " << FormatList(commited)); + } + + + void ReserveChunk() { + const auto evReserveRes = TestCtx->TestResponce<NPDisk::TEvChunkReserveResult>( + new NPDisk::TEvChunkReserve(PDiskParams->Owner, PDiskParams->OwnerRound, 1), + NKikimrProto::OK); + UNIT_ASSERT(evReserveRes->ChunkIds.size() == 1); + const ui32 reservedChunk = evReserveRes->ChunkIds.front(); + Chunks[EChunkState::RESERVED].emplace(reservedChunk); + } + + void CommitReservedChunks() { + auto& reservedChunks = Chunks[EChunkState::RESERVED]; + NPDisk::TCommitRecord rec; + rec.CommitChunks = TVector<TChunkIdx>(reservedChunks.begin(), reservedChunks.end()); + SendEvLogImpl(1, rec); + Chunks[EChunkState::COMMITTED].insert(reservedChunks.begin(), reservedChunks.end()); + reservedChunks.clear(); + } + + void DeleteCommitedChunks() { + auto& commited = Chunks[EChunkState::COMMITTED]; + NPDisk::TCommitRecord rec; + rec.DeleteChunks = TVector<TChunkIdx>(commited.begin(), commited.end()); + SendEvLogImpl(1, rec); + Chunks[EChunkState::DELETED].insert(commited.begin(), commited.end()); + commited.clear(); + } + + ui64 ReadLog(std::function<void(const NPDisk::TLogRecord&)> logResCallback = {}) { + ui64 logRecordsRead = 0; + + NPDisk::TLogPosition position{0, 0}; + bool endOfLog = false; + do { + UNIT_ASSERT(PDiskParams); + auto logReadRes = TestCtx->TestResponce<NPDisk::TEvReadLogResult>( + new NPDisk::TEvReadLog(PDiskParams->Owner, PDiskParams->OwnerRound, position), + NKikimrProto::OK); + UNIT_ASSERT(position == logReadRes->Position); + for (const NPDisk::TLogRecord& rec : logReadRes->Results) { + ++logRecordsRead; + if (logResCallback) { + logResCallback(rec); + } + LastUsedLsn = Max(LastUsedLsn, rec.Lsn); + } + position = logReadRes->NextPosition; + endOfLog = logReadRes->IsEndOfLog; + } while (!endOfLog); + + return logRecordsRead; + } + + void SendEvLogSync(const ui64 size = 128) { + SendEvLogImpl(size, {}, false); + } + + void CutLogAllButOne() { + SendEvLogImpl(1, LastUsedLsn + 1, true); + } + + ui64 OwnedLogRecords() const { + return LastUsedLsn + 1 - FirstLsnToKeep; + } + +private: + void SendEvLogImpl(const ui64 size, TMaybe<NPDisk::TCommitRecord> commitRec) { + auto evLog = MakeHolder<NPDisk::TEvLog>(PDiskParams->Owner, PDiskParams->OwnerRound, 0, PrepareData(size), + GetLsnSeg(), nullptr); + + if (commitRec) { + evLog->Signature.SetCommitRecord(); + evLog->CommitRecord = std::move(*commitRec); + } + + TestCtx->TestResponce<NPDisk::TEvLogResult>(evLog.Release(), NKikimrProto::OK); + } + + void SendEvLogImpl(const ui64 size, TMaybe<ui64> firstLsnToKeep, bool isStartingPoint) { + + TMaybe<NPDisk::TCommitRecord> rec; + + if (firstLsnToKeep || isStartingPoint) { + rec = NPDisk::TCommitRecord(); + rec->FirstLsnToKeep = firstLsnToKeep.GetOrElse(0); + FirstLsnToKeep = Max(FirstLsnToKeep, firstLsnToKeep.GetOrElse(0)); + rec->IsStartingPoint = isStartingPoint; + } + SendEvLogImpl(size, rec); + } +}; + +void TestChunkWriteReleaseRun(); +}
\ No newline at end of file diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_races.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_races.cpp new file mode 100644 index 00000000000..c2bfb201f2c --- /dev/null +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_races.cpp @@ -0,0 +1,297 @@ +#include "blobstorage_pdisk_ut.h" + +#include "blobstorage_pdisk_abstract.h" +#include "blobstorage_pdisk_impl.h" +#include "blobstorage_pdisk_ut_env.h" + +#include <ydb/core/blobstorage/crypto/default.h> +#include <ydb/core/testlib/actors/test_runtime.h> + +#include <util/system/hp_timer.h> + +namespace NKikimr { + +Y_UNIT_TEST_SUITE(TPDiskRaces) { + Y_UNIT_TEST(KillOwnerWhileDeletingChunk) { + THPTimer timer; + ui32 timeLimit = 20; + while (timer.Passed() < timeLimit) { + TActorTestContext testCtx(false); + + auto logNoTest = [&](TVDiskMock& mock, NPDisk::TCommitRecord rec) { + auto evLog = MakeHolder<NPDisk::TEvLog>(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound, 0, PrepareData(1), + mock.GetLsnSeg(), nullptr); + evLog->Signature.SetCommitRecord(); + evLog->CommitRecord = std::move(rec); + testCtx.Send(evLog.Release()); + }; + + TVDiskMock mock(&testCtx); + mock.Init(); + + ui32 vdisksNum = 100; + std::vector<TVDiskMock> mocks; + for (ui32 i = 0; i < vdisksNum; ++i) { + mocks.push_back(TVDiskMock(&testCtx)); + mocks[i].Init(); + } + + ui32 reservedChunks = 10; + + for (ui32 i = 0; i < reservedChunks; ++i) { + mock.ReserveChunk(); + } + mock.CommitReservedChunks(); + + while (mock.Chunks[EChunkState::COMMITTED].size() > 0) { + auto it = mock.Chunks[EChunkState::COMMITTED].begin(); + NPDisk::TCommitRecord rec; + rec.DeleteChunks.push_back(*it); + logNoTest(mock, rec); + mock.Chunks[EChunkState::COMMITTED].erase(it); + } + + testCtx.Send(new NPDisk::TEvHarakiri(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound)); + + for (ui32 c = 0; ; c = (c + 1) % mocks.size()) { + testCtx.Send(new NPDisk::TEvChunkReserve(mocks[c].PDiskParams->Owner, mocks[c].PDiskParams->OwnerRound, 1)); + THolder<NPDisk::TEvChunkReserveResult> evRes = testCtx.Recv<NPDisk::TEvChunkReserveResult>(); + if (!evRes || evRes->Status != NKikimrProto::OK) { + break; + } + const ui32 reservedChunk = evRes->ChunkIds.front(); + auto& reservedChunks = mocks[c].Chunks[EChunkState::RESERVED]; + reservedChunks.emplace(reservedChunk); + + NPDisk::TCommitRecord rec; + rec.CommitChunks.push_back(*reservedChunks.begin()); + logNoTest(mocks[c], rec); + reservedChunks.clear(); + } + testCtx.Recv<NPDisk::TEvHarakiriResult>(); + } + } + + Y_UNIT_TEST(KillOwnerWhileDeletingChunkWithInflight) { + THPTimer timer; + ui32 timeLimit = 20; + while (timer.Passed() < timeLimit) { + TActorTestContext testCtx(false); + + auto logNoTest = [&](TVDiskMock& mock, NPDisk::TCommitRecord rec) { + auto evLog = MakeHolder<NPDisk::TEvLog>(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound, 0, PrepareData(1), + mock.GetLsnSeg(), nullptr); + evLog->Signature.SetCommitRecord(); + evLog->CommitRecord = std::move(rec); + testCtx.Send(evLog.Release()); + }; + + TVDiskMock mock(&testCtx); + mock.Init(); + + ui32 vdisksNum = 100; + std::vector<TVDiskMock> mocks; + for (ui32 i = 0; i < vdisksNum; ++i) { + mocks.push_back(TVDiskMock(&testCtx)); + mocks[i].Init(); + } + + ui32 reservedChunks = 10; + for (ui32 i = 0; i < reservedChunks; ++i) { + mock.ReserveChunk(); + } + mock.CommitReservedChunks(); + TVector<TChunkIdx> chunkIds(mock.Chunks[EChunkState::COMMITTED].begin(), mock.Chunks[EChunkState::COMMITTED].end()); + + ui32 inflight = 50; + + while (mock.Chunks[EChunkState::COMMITTED].size() > 0) { + auto it = mock.Chunks[EChunkState::COMMITTED].begin(); + for (ui32 i = 0; i < inflight; ++i) { + TString data = "HATE. LET ME TELL YOU HOW MUCH I'VE COME TO HATE YOU SINCE I BEGAN TO LIVE..."; + testCtx.Send(new NPDisk::TEvChunkWrite(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound, + *it, 0, new NPDisk::TEvChunkWrite::TStrokaBackedUpParts(data), nullptr, false, 0)); + } + NPDisk::TCommitRecord rec; + rec.DeleteChunks.push_back(*it); + logNoTest(mock, rec); + mock.Chunks[EChunkState::COMMITTED].erase(it); + } + mock.Chunks[EChunkState::COMMITTED].clear(); + + testCtx.Send(new NPDisk::TEvHarakiri(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound)); + + for (ui32 c = 0; ; c = (c + 1) % mocks.size()) { + testCtx.Send(new NPDisk::TEvChunkReserve(mocks[c].PDiskParams->Owner, mocks[c].PDiskParams->OwnerRound, 1)); + THolder<NPDisk::TEvChunkReserveResult> evRes = testCtx.Recv<NPDisk::TEvChunkReserveResult>(); + if (!evRes || evRes->Status != NKikimrProto::OK) { + break; + } + const ui32 reservedChunk = evRes->ChunkIds.front(); + auto& reservedChunks = mocks[c].Chunks[EChunkState::RESERVED]; + reservedChunks.emplace(reservedChunk); + + NPDisk::TCommitRecord rec; + rec.CommitChunks.push_back(*reservedChunks.begin()); + logNoTest(mocks[c], rec); + reservedChunks.clear(); + } + testCtx.Recv<NPDisk::TEvHarakiriResult>(); + } + } + + Y_UNIT_TEST(DecommitWithInflight) { + THPTimer timer; + ui32 timeLimit = 20; + while (timer.Passed() < timeLimit) { + TActorTestContext testCtx(false); + ui32 dataSize = 1024; + + auto logNoTest = [&](TVDiskMock& mock, NPDisk::TCommitRecord rec) { + auto evLog = MakeHolder<NPDisk::TEvLog>(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound, 0, PrepareData(1), + mock.GetLsnSeg(), nullptr); + evLog->Signature.SetCommitRecord(); + evLog->CommitRecord = std::move(rec); + testCtx.Send(evLog.Release()); + }; + + auto sendManyReads = [&](TVDiskMock& mock, TChunkIdx chunk, ui32 number, ui64& cookie) { + for (ui32 i = 0; i < number; ++i) { + testCtx.Send(new NPDisk::TEvChunkRead(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound, + chunk, 0, dataSize, 0, (void*)(cookie++))); + } + }; + + auto sendManyWrites = [&](TVDiskMock& mock, TChunkIdx chunk, ui32 number, ui64& cookie) { + for (ui32 i = 0; i < number; ++i) { + TString data = PrepareData(dataSize); + testCtx.Send(new NPDisk::TEvChunkWrite(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound, + chunk, 0, new NPDisk::TEvChunkWrite::TStrokaBackedUpParts(data), (void*)(cookie++), false, 0)); + } + }; + + TVDiskMock mock(&testCtx); + mock.Init(); + + ui32 reservedChunks = 10; + for (ui32 i = 0; i < reservedChunks; ++i) { + mock.ReserveChunk(); + } + + { + auto& chunkIds = mock.Chunks[EChunkState::COMMITTED]; + for (auto it = chunkIds.begin(); it != chunkIds.end(); ++it) { + TString data = PrepareData(dataSize); + testCtx.TestResponce<NPDisk::TEvChunkWriteResult>(new NPDisk::TEvChunkWrite(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound, + *it, 0, new NPDisk::TEvChunkWrite::TStrokaBackedUpParts(data), (void*)10, false, 0), + NKikimrProto::OK); + } + } + + mock.CommitReservedChunks(); + + ui32 inflight = 50; + auto& chunkIds = mock.Chunks[EChunkState::COMMITTED]; + + ui64 cookie = 0; + for (auto it = chunkIds.begin(); it != chunkIds.end(); ++it) { + sendManyWrites(mock, *it, inflight, cookie); + sendManyReads(mock, *it, inflight, cookie); + NPDisk::TCommitRecord rec; + rec.DeleteChunks.push_back(*it); + rec.DeleteToDecommitted = true; + logNoTest(mock, rec); + sendManyWrites(mock, *it, inflight, cookie); + sendManyReads(mock, *it, inflight, cookie); + } + mock.Chunks[EChunkState::COMMITTED].clear(); + + + for (ui32 i = 0; i < inflight * 2 * reservedChunks; ++i) { + { + auto res = testCtx.Recv<NPDisk::TEvChunkReadResult>(); + UNIT_ASSERT_VALUES_EQUAL_C(res->Status, NKikimrProto::OK, res->ToString()); + } + { + auto res = testCtx.Recv<NPDisk::TEvChunkWriteResult>(); + UNIT_ASSERT_VALUES_EQUAL_C(res->Status, NKikimrProto::OK, res->ToString()); + } + } + for (ui32 i = 0; i < reservedChunks; ++i) { + testCtx.TestResponce<NPDisk::TEvChunkForgetResult>(new NPDisk::TEvChunkForget(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound), + NKikimrProto::OK); + } + } + } + + Y_UNIT_TEST(KillOwnerWhileDecommittingChunksWithInflight) { + THPTimer timer; + ui32 timeLimit = 20; + while (timer.Passed() < timeLimit) { + TActorTestContext testCtx(false); + + auto logNoTest = [&](TVDiskMock& mock, NPDisk::TCommitRecord rec) { + auto evLog = MakeHolder<NPDisk::TEvLog>(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound, 0, PrepareData(1), + mock.GetLsnSeg(), nullptr); + evLog->Signature.SetCommitRecord(); + evLog->CommitRecord = std::move(rec); + testCtx.Send(evLog.Release()); + }; + + TVDiskMock mock(&testCtx); + mock.Init(); + + ui32 vdisksNum = 100; + std::vector<TVDiskMock> mocks; + for (ui32 i = 0; i < vdisksNum; ++i) { + mocks.push_back(TVDiskMock(&testCtx)); + mocks[i].Init(); + } + + ui32 reservedChunks = 10; + for (ui32 i = 0; i < reservedChunks; ++i) { + mock.ReserveChunk(); + } + mock.CommitReservedChunks(); + TVector<TChunkIdx> chunkIds(mock.Chunks[EChunkState::COMMITTED].begin(), mock.Chunks[EChunkState::COMMITTED].end()); + + ui32 inflight = 50; + + while (mock.Chunks[EChunkState::COMMITTED].size() > 0) { + auto it = mock.Chunks[EChunkState::COMMITTED].begin(); + for (ui32 i = 0; i < inflight; ++i) { + TString data = "HATE. LET ME TELL YOU HOW MUCH I'VE COME TO HATE YOU SINCE I BEGAN TO LIVE..."; + testCtx.Send(new NPDisk::TEvChunkWrite(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound, + *it, 0, new NPDisk::TEvChunkWrite::TStrokaBackedUpParts(data), nullptr, false, 0)); + } + NPDisk::TCommitRecord rec; + rec.DeleteChunks.push_back(*it); + rec.DeleteToDecommitted = true; + logNoTest(mock, rec); + mock.Chunks[EChunkState::COMMITTED].erase(it); + } + mock.Chunks[EChunkState::COMMITTED].clear(); + + testCtx.Send(new NPDisk::TEvHarakiri(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound)); + + for (ui32 c = 0; ; c = (c + 1) % mocks.size()) { + testCtx.Send(new NPDisk::TEvChunkReserve(mocks[c].PDiskParams->Owner, mocks[c].PDiskParams->OwnerRound, 1)); + THolder<NPDisk::TEvChunkReserveResult> evRes = testCtx.Recv<NPDisk::TEvChunkReserveResult>(); + if (!evRes || evRes->Status != NKikimrProto::OK) { + break; + } + const ui32 reservedChunk = evRes->ChunkIds.front(); + auto& reservedChunks = mocks[c].Chunks[EChunkState::RESERVED]; + reservedChunks.emplace(reservedChunk); + + NPDisk::TCommitRecord rec; + rec.CommitChunks.push_back(*reservedChunks.begin()); + logNoTest(mocks[c], rec); + reservedChunks.clear(); + } + testCtx.Recv<NPDisk::TEvHarakiriResult>(); + } + } +} + +} diff --git a/ydb/core/blobstorage/pdisk/ut/CMakeLists.darwin.txt b/ydb/core/blobstorage/pdisk/ut/CMakeLists.darwin.txt index 2412431a31e..056fc7ed795 100644 --- a/ydb/core/blobstorage/pdisk/ut/CMakeLists.darwin.txt +++ b/ydb/core/blobstorage/pdisk/ut/CMakeLists.darwin.txt @@ -34,6 +34,8 @@ target_sources(ydb-core-blobstorage-pdisk-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/pdisk/blobstorage_pdisk_crypto_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/pdisk/blobstorage_pdisk_util_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_races.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_actions.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_helpers.cpp diff --git a/ydb/core/blobstorage/pdisk/ut/CMakeLists.linux.txt b/ydb/core/blobstorage/pdisk/ut/CMakeLists.linux.txt index 41b649904d4..2665fa336a8 100644 --- a/ydb/core/blobstorage/pdisk/ut/CMakeLists.linux.txt +++ b/ydb/core/blobstorage/pdisk/ut/CMakeLists.linux.txt @@ -38,6 +38,8 @@ target_sources(ydb-core-blobstorage-pdisk-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/pdisk/blobstorage_pdisk_crypto_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/pdisk/blobstorage_pdisk_util_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_races.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_actions.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_helpers.cpp |