diff options
author | Vlad Kuznetsov <va-kuznecov@ydb.tech> | 2024-10-03 19:24:33 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-10-03 19:24:33 +0200 |
commit | 595af5f38863f2ac75972ffed6fcd498443ad022 (patch) | |
tree | c4a59a9d85a52e8650f4b34ee624d7497075133f | |
parent | 23532724efd93953c12105fd6bc1ee1e4552e445 (diff) | |
download | ydb-595af5f38863f2ac75972ffed6fcd498443ad022.tar.gz |
Rework FormatRead and SysLogRead, move their execution to PDisk to avoid race (#10009)
7 files changed, 46 insertions, 86 deletions
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp index 38c90188fa..9eda6940d6 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp @@ -280,7 +280,7 @@ public: str << " Config: " << Cfg->ToString(); P_LOG(PRI_CRIT, BPD01, str.Str()); } else { - PDisk->InitiateReadSysLog(SelfId()); + PDisk->InputRequest(PDisk->ReqCreator.CreateFromArgs<TReadFormat>(SelfId())); StateErrorReason = "PDisk is in StateInit, wait for PDisk to read sys log. Did you ckeck EvYardInit result? Marker# BSY09"; Become(&TThis::StateInit); @@ -557,8 +557,8 @@ public: // PDisk GUID is OK and format is complete *PDisk->Mon.PDiskState = NKikimrBlobStorage::TPDiskState::InitialSysLogRead; *PDisk->Mon.PDiskDetailedState = TPDiskMon::TPDisk::BootingSysLogRead; - PDisk->Format.InitMagic(); - PDisk->ReadSysLog(SelfId()); + + PDisk->InputRequest(PDisk->ReqCreator.CreateFromArgs<TReadSysLog>(SelfId())); } } } @@ -1083,7 +1083,7 @@ public: } Send(ev->Sender, new TEvBlobStorage::TEvNotifyWardenPDiskRestarted(PCtx->PDiskId, NKikimrProto::EReplyStatus::NOTREADY)); - + return; } @@ -1096,7 +1096,7 @@ public: NPDisk::TMainKey newMainKey = ev->Get()->MainKey; SecureWipeBuffer((ui8*)ev->Get()->MainKey.Keys.data(), sizeof(NPDisk::TKey) * ev->Get()->MainKey.Keys.size()); - + P_LOG(PRI_NOTICE, BSP01, "Going to restart PDisk since received TEvAskWardenRestartPDiskResult"); const TActorIdentity& thisActorId = SelfId(); @@ -1109,7 +1109,7 @@ public: TIntrusivePtr<TPDiskConfig> actorCfg = std::move(Cfg); auto& newCfg = ev->Get()->Config; - + if (newCfg) { Y_VERIFY_S(newCfg->PDiskId == pdiskId, "New config's PDiskId# " << newCfg->PDiskId << " is not equal to real PDiskId# " << pdiskId); @@ -1124,7 +1124,7 @@ public: TGenericExecutorThread& executorThread = actorCtx.ExecutorThread; PassAway(); - + CreatePDiskActor(executorThread, counters, actorCfg, newMainKey, pdiskId, poolId, nodeId); Send(ev->Sender, new TEvBlobStorage::TEvNotifyWardenPDiskRestarted(pdiskId)); diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_ut.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_ut.cpp index aebfb7c78c..9068feaa67 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_ut.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_ut.cpp @@ -173,69 +173,6 @@ void WaitForValue(TAtomic *counter, TDuration maxDuration, TAtomicBase expectedV } } -void RunTestMultipleRequestsFromCompletionAction() { - const TIntrusivePtr<::NMonitoring::TDynamicCounters> counters = new ::NMonitoring::TDynamicCounters; - THolder<TPDiskMon> mon(new TPDiskMon(counters, 0, nullptr)); - const ui32 dataSize = 4 << 10; - const ui64 generations = 8; - TAtomic counter = 0; - - - TTempDir tempDir; - TString path = CreateFile(tempDir().c_str(), dataSize); - - { - TActorSystemCreator creator; - THolder<NPDisk::TBufferPool> bufferPool(NPDisk::CreateBufferPool(dataSize, 1, false, {})); - NPDisk::TBuffer::TPtr alignedBuffer(bufferPool->Pop()); - memset(alignedBuffer->Data(), 0, dataSize); - THolder<NPDisk::IBlockDevice> device(NPDisk::CreateRealBlockDevice(path, *mon, 0, 0, 4, - NPDisk::TDeviceMode::LockFile, 2 << generations, nullptr)); - device->Initialize(std::make_shared<NPDisk::TPDiskCtx>(creator.GetActorSystem())); - - (new TWriter(*device, alignedBuffer.Get(), (i32)generations, &counter))->Exec(nullptr); - - TAtomicBase expectedCounter = 0; - for (ui64 i = 0; i <= generations; ++i) { - expectedCounter += 1ull << i; - } - WaitForValue(&counter, TIMEOUT, expectedCounter); - - TAtomicBase resultingCounter = AtomicGet(counter); - - UNIT_ASSERT_VALUES_EQUAL( - resultingCounter, - expectedCounter - ); - } - Ctest << "Done" << Endl; -} - -void RunTestDestructionWithMultipleFlushesFromCompletionAction() { - const TIntrusivePtr<::NMonitoring::TDynamicCounters> counters = new ::NMonitoring::TDynamicCounters; - THolder<TPDiskMon> mon(new TPDiskMon(counters, 0, nullptr)); - const ui32 dataSize = 4 << 10; - const i32 generations = 8; - TAtomic counter = 0; - - TTempDir tempDir; - TString path = CreateFile(tempDir().c_str(), dataSize); - - TActorSystemCreator creator; - THolder<NPDisk::IBlockDevice> device(NPDisk::CreateRealBlockDevice(path, *mon, 0, 0, 4, - NPDisk::TDeviceMode::LockFile, 2 << generations, nullptr)); - device->Initialize(std::make_shared<NPDisk::TPDiskCtx>(creator.GetActorSystem())); - - (new TFlusher(*device, generations, &counter))->Exec(nullptr); - device->Stop(); - for (int i = 0; i < 10000; ++i) { - (new TFlusher(*device, generations, &counter))->Exec(nullptr); - } - device.Destroy(); - - Ctest << "Done" << Endl; -} - void RunWriteTestWithSectorMap(NPDisk::NSectorMap::EDiskMode diskMode, ui32 diskSize, ui32 bufferSize, bool sequential = true) { const TIntrusivePtr<::NMonitoring::TDynamicCounters> counters = new ::NMonitoring::TDynamicCounters; THolder<TPDiskMon> mon(new TPDiskMon(counters, 0, nullptr)); @@ -264,14 +201,6 @@ void RunWriteTestWithSectorMap(NPDisk::NSectorMap::EDiskMode diskMode, ui32 disk Y_UNIT_TEST_SUITE(TBlockDeviceTest) { - Y_UNIT_TEST(TestMultipleRequestsFromCompletionAction) { - RunTestMultipleRequestsFromCompletionAction(); - } - - Y_UNIT_TEST(TestDestructionWithMultipleFlushesFromCompletionAction) { - RunTestDestructionWithMultipleFlushesFromCompletionAction(); - } - Y_UNIT_TEST(TestDeviceWithSubmitGetThread) { const TIntrusivePtr<::NMonitoring::TDynamicCounters> counters = new ::NMonitoring::TDynamicCounters; THolder<TPDiskMon> mon(new TPDiskMon(counters, 0, nullptr)); diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp index 0ab6abf307..1a16e1d47f 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp @@ -2487,6 +2487,11 @@ void TPDisk::ProcessFastOperationsQueue() { break; case ERequestType::RequestContinueReadMetadata: static_cast<TContinueReadMetadata&>(*req).Execute(PCtx->ActorSystem); + case ERequestType::RequestReadFormat: + InitiateReadFormat(); + break; + case ERequestType::RequestReadSysLog: + InitiateReadSysLog(); break; default: Y_FAIL_S("Unexpected request type# " << (ui64)req->GetType()); @@ -3082,6 +3087,8 @@ bool TPDisk::PreprocessRequest(TRequestBase *request) { case ERequestType::RequestReadMetadata: case ERequestType::RequestWriteMetadata: case ERequestType::RequestContinueReadMetadata: + case ERequestType::RequestReadFormat: + case ERequestType::RequestReadSysLog: break; case ERequestType::RequestStopDevice: BlockDevice->Stop(); diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h index e571605489..c544fcc3c8 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h @@ -211,7 +211,7 @@ public: bool IsFormatMagicValid(ui8 *magicData, ui32 magicDataSize, const TMainKey& mainKey); // Called by actor bool CheckGuid(TString *outReason); // Called by actor bool CheckFormatComplete(); // Called by actor - void ReadSysLog(const TActorId &pDiskActor); // Called by actor + void InitiateReadSysLog(); bool ProcessChunk0(const TEvReadLogResult &readLogResult, TString& errorReason); void PrintChunksDebugInfo(); TRcBuf ProcessReadSysLogResult(ui64 &outWritePosition, ui64 &outLsn, const TEvReadLogResult &readLogResult); @@ -394,9 +394,8 @@ public: // Internal interface // Schedules EvReadLogResult event for the system log - void ResetInit(); bool Initialize(); // Called by actor - void InitiateReadSysLog(const TActorId &pDiskActor); // Called by actor + void InitiateReadFormat(); void ProcessReadLogResult(const TEvReadLogResult &evReadLogResult, const TActorId &pDiskActor); NKikimrProto::EReplyStatus ValidateRequest(TLogWrite *logWrite, TStringStream& outErrorReason); @@ -432,4 +431,3 @@ bool ParseSectorOffset(const TDiskFormat& format, TActorSystem *actorSystem, ui3 } // NPDisk } // NKikimr - diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp index 8bd2e20ced..8681d943e5 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp @@ -205,8 +205,10 @@ void TPDisk::GetStartingPoints(NPDisk::TOwner owner, TMap<TLogSignature, NPDisk: } } -void TPDisk::ReadSysLog(const TActorId &pDiskActor) { - TIntrusivePtr<TSysLogReader> sysLogReader(new TSysLogReader(this, PCtx->ActorSystem, pDiskActor, +void TPDisk::InitiateReadSysLog() { + Format.InitMagic(); + + TIntrusivePtr<TSysLogReader> sysLogReader(new TSysLogReader(this, PCtx->ActorSystem, PCtx->PDiskActor, TReqId(TReqId::ReadSysLog, 0))); sysLogReader->Start(); return; @@ -1310,7 +1312,7 @@ void TPDisk::MarkChunksAsReleased(TReleaseChunks& req) { } // Schedules EvReadLogResult event for the system log -void TPDisk::InitiateReadSysLog(const TActorId &pDiskActor) { +void TPDisk::InitiateReadFormat() { Y_VERIFY_S(PDiskThread.Running(), "expect PDiskThread to be running"); Y_VERIFY_S(InitPhase == EInitPhase::Uninitialized, "expect InitPhase to be Uninitialized, but InitPhase# " << InitPhase); @@ -1318,7 +1320,7 @@ void TPDisk::InitiateReadSysLog(const TActorId &pDiskActor) { THolder<TEvReadFormatResult> evReadFormatResult(new TEvReadFormatResult(formatSectorsSize, UseHugePages)); ui8 *formatSectors = evReadFormatResult->FormatSectors.Get(); BlockDevice->PreadAsync(formatSectors, formatSectorsSize, 0, - new TCompletionEventSender(this, pDiskActor, evReadFormatResult.Release()), TReqId(TReqId::InitialFormatRead, 0), {}); + new TCompletionEventSender(this, PCtx->PDiskActor, evReadFormatResult.Release()), TReqId(TReqId::InitialFormatRead, 0), {}); *Mon.PDiskState = NKikimrBlobStorage::TPDiskState::InitialFormatRead; *Mon.PDiskDetailedState = TPDiskMon::TPDisk::BootingFormatRead; InitPhase = EInitPhase::ReadingSysLog; diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_request_id.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_request_id.h index f186544538..f25d8206bc 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_request_id.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_request_id.h @@ -153,6 +153,8 @@ enum class ERequestType { RequestWriteMetadataResult, RequestPushUnformattedMetadataSector, RequestContinueReadMetadata, + RequestReadFormat, + RequestReadSysLog, }; inline IOutputStream& operator <<(IOutputStream& out, const TReqId& reqId) { diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h index bd7dea1db7..1310c28744 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h @@ -119,6 +119,28 @@ public: static void AbortDelete(TRequestBase* request, TActorSystem* actorSystem); }; +class TReadFormat : public TRequestBase { +public: + TReadFormat(TActorId pdiskActor, TAtomicBase reqIdx) + : TRequestBase(pdiskActor, TReqId(TReqId::ReadFormatInfo, reqIdx), 0, 0, NPriInternal::Other) + {} + + ERequestType GetType() const override { + return ERequestType::RequestReadFormat; + } +}; + +class TReadSysLog : public TRequestBase { +public: + TReadSysLog(TActorId pdiskActor, TAtomicBase reqIdx) + : TRequestBase(pdiskActor, TReqId(TReqId::ReadSysLog, reqIdx), 0, 0, NPriInternal::Other) + {} + + ERequestType GetType() const override { + return ERequestType::RequestReadSysLog; + } +}; + // // TYardInit // |