diff options
author | Cthulhu <cthulhu@ydb.tech> | 2025-02-11 19:24:38 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-02-11 19:24:38 +0300 |
commit | 4ac9f23d125ed4bbc5321e0051358c5c6049dc1a (patch) | |
tree | 5a220d33d6856b84adbd39041db6805fab9f8c47 | |
parent | 108ab9a4612caa48f0ed14f2c8ddc7c7b57f7376 (diff) | |
download | ydb-4ac9f23d125ed4bbc5321e0051358c5c6049dc1a.tar.gz |
Shred sectors and track dirty in-memory #12483 (#14411)
14 files changed, 335 insertions, 20 deletions
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.cpp index 1fe8683792..93398e0a93 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.cpp @@ -404,5 +404,17 @@ void TChunkTrimCompletion::Exec(TActorSystem *actorSystem) { delete this; } +void TChunkShredCompletion::Exec(TActorSystem *actorSystem) { + LOG_TRACE_S(*actorSystem, NKikimrServices::BS_PDISK_SHRED, + "PDiskId# " << PDisk->PCtx->PDiskId << " ReqId# " << ReqId + << " TChunkShredCompletion Chunk# " << Chunk + << " SectorIdx# " << SectorIdx + << " SizeBytes# " << SizeBytes); + PDisk->Mon.ChunkShred.CountResponse(); + TChunkShredResult *shredResult = PDisk->ReqCreator.CreateFromArgs<TChunkShredResult>(Chunk, SectorIdx, SizeBytes); + PDisk->InputRequest(shredResult); + delete this; +} + } // NPDisk } // NKikimr diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.h index bd2d99dfe3..5659eb5f76 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.h @@ -359,6 +359,30 @@ public: } }; +class TChunkShredCompletion : public TCompletionAction { + TPDisk *PDisk; + TChunkIdx Chunk; + ui32 SectorIdx; + size_t SizeBytes; + TReqId ReqId; + +public: + TChunkShredCompletion(TPDisk *pdisk, TChunkIdx chunk, ui32 sectorIdx, size_t sizeBytes, TReqId reqId) + : PDisk(pdisk) + , Chunk(chunk) + , SectorIdx(sectorIdx) + , SizeBytes(sizeBytes) + , ReqId(reqId) + {} + + void Exec(TActorSystem *actorSystem) override; + + void Release(TActorSystem *actorSystem) override { + Y_UNUSED(actorSystem); + delete this; + } +}; + class TCompletionSequence : public TCompletionAction { TVector<TCompletionAction*> Actions; diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_free_chunks.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_free_chunks.h index da59aa6a19..ee51f096ed 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_free_chunks.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_free_chunks.h @@ -53,9 +53,32 @@ public: return idx; } - ui32 Size() const { - return AtomicGet(FreeChunkCount); + TDeque<TChunkIdx>::const_iterator begin() const { + return FreeChunks.begin(); } + + TDeque<TChunkIdx>::const_iterator end() const { + return FreeChunks.end(); + } + + TChunkIdx PopAt(TDeque<TChunkIdx>::const_iterator it) { + Y_VERIFY(it != FreeChunks.end()); + Y_VERIFY(FreeChunks.size() > 0); + TChunkIdx idx = *it; + FreeChunks.erase(it); + AtomicDecrement(FreeChunkCount); + MonFreeChunks->Dec(); + return idx; + } + + void PushFront(TChunkIdx idx) { + FreeChunks.push_front(idx); + AtomicIncrement(FreeChunkCount); + MonFreeChunks->Inc(); + } + + // A thread-safe function that returns the current number of free chunks. + ui32 Size() const { return AtomicGet(FreeChunkCount); } }; } // NPDisk diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp index 95ff2dc59e..f3151d250b 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp @@ -2577,6 +2577,9 @@ void TPDisk::ProcessFastOperationsQueue() { case ERequestType::RequestMarkDirty: ProcessMarkDirty(static_cast<TMarkDirty&>(*req)); break; + case ERequestType::RequestChunkShredResult: + ProcessChunkShredResult(static_cast<TChunkShredResult&>(*req)); + break; default: Y_FAIL_S("Unexpected request type# " << TypeName(*req)); break; @@ -3187,6 +3190,7 @@ bool TPDisk::PreprocessRequest(TRequestBase *request) { case ERequestType::RequestPreShredCompactVDiskResult: case ERequestType::RequestShredVDiskResult: case ERequestType::RequestMarkDirty: + case ERequestType::RequestChunkShredResult: break; case ERequestType::RequestStopDevice: BlockDevice->Stop(); @@ -3883,6 +3887,7 @@ bool TPDisk::HandleReadOnlyIfWrite(TRequestBase *request) { case ERequestType::RequestPreShredCompactVDiskResult: case ERequestType::RequestShredVDiskResult: case ERequestType::RequestMarkDirty: + case ERequestType::RequestChunkShredResult: // These requests don't require response. return true; } @@ -3913,8 +3918,31 @@ void TPDisk::AddCbsSet(ui32 ownerId) { SchedulerConfigure(conf); } +TChunkIdx TPDisk::GetUnshreddedFreeChunk() { + // Find a free unshredded chunk + for (TFreeChunks* freeChunks : {&Keeper.UntrimmedFreeChunks, &Keeper.TrimmedFreeChunks}) { + for (auto it = freeChunks->begin(); it != freeChunks->end(); ++it) { + TChunkIdx chunkIdx = *it; + TChunkState& state = ChunkState[chunkIdx]; + // Look for free chunks that haven't been shredded in this generation + if (state.CommitState == TChunkState::FREE && state.IsDirty && state.ShredGeneration < ShredGeneration) { + // Found an unshredded free chunk + TChunkIdx unshreddedChunkIdx = freeChunks->PopAt(it); + Y_VERIFY(unshreddedChunkIdx == chunkIdx); + // Mark it as being shredded and update its generation + LOG_DEBUG_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED, + "PDisk# " << PCtx->PDiskId + << " found unshredded free chunk# " << chunkIdx + << " ShredGeneration# " << ShredGeneration); + return unshreddedChunkIdx; + } + } + } + return 0; +} + void TPDisk::ProgressShredState() { - LOG_DEBUG_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED, + LOG_TRACE_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED, "ProgressShredState at PDisk# " << PCtx->PDiskId << " ShredGeneration# " << ShredGeneration << " ShredState# " << (ui32)ShredState); @@ -3959,9 +3987,80 @@ void TPDisk::ProgressShredState() { << " has finished all pre-shred compact VDisk requests" << " ShredGeneration# " << ShredGeneration << " finishedCount# " << finishedCount); + // All preparations are done, no junk chunks can be unmarked, + // Update chunk states and start shredding the empty space + for (TChunkIdx chunkIdx = 0; chunkIdx < ChunkState.size(); ++chunkIdx) { + TChunkState& state = ChunkState[chunkIdx]; + // Update shred generation for all the clean chunks + if (!state.IsDirty) { + state.ShredGeneration = ShredGeneration; + } + } ShredState = EShredStateSendShredVDisk; } if (ShredState == EShredStateSendShredVDisk) { + // Shred free space while possible + if (ChunkBeingShredded == 0) { + ChunkBeingShredded = GetUnshreddedFreeChunk(); + } + if (ChunkBeingShredded != 0) { + // Continue shredding the free chunk + while (true) { + if (ChunkBeingShreddedInFlight >= 2) { + // We have enough in-flight requests, don't start a new one + return; + } + if (ChunkBeingShreddedNextSectorIdx * Format.SectorSize >= Format.ChunkSize) { + ++ChunkBeingShreddedIteration; + ChunkBeingShreddedNextSectorIdx = 0; + } + if (ChunkBeingShreddedIteration >= 2) { + // We have enough iterations, don't start a new one, just wait for the in-flight requests to finish + if (ChunkBeingShreddedInFlight > 0) { + return; + } + // Done shredding the chunk, mark it clean and push it back to the free chunks + LOG_DEBUG_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED, "PDisk# " << PCtx->PDiskId + << " is done shredding chunk ChunkBeingShredded# " << ChunkBeingShredded); + TChunkState &state = ChunkState[ChunkBeingShredded]; + state.OperationsInProgress--; + state.IsDirty = false; + state.ShredGeneration = ShredGeneration; + Y_VERIFY(ChunkState[ChunkBeingShredded].OperationsInProgress == 0); + Keeper.UntrimmedFreeChunks.PushFront(ChunkBeingShredded); + ChunkBeingShredded = GetUnshreddedFreeChunk(); + ChunkBeingShreddedIteration = 0; + ChunkBeingShreddedNextSectorIdx = 0; + } + if (ChunkBeingShredded) { + if (ChunkBeingShreddedIteration == 0 && ChunkBeingShreddedNextSectorIdx == 0) { + Y_VERIFY(ChunkState[ChunkBeingShredded].OperationsInProgress == 0); + ChunkState[ChunkBeingShredded].OperationsInProgress++; + } + // Continue shredding the chunk: send a write request to the device using the iteration-specific pattern + THolder<TAlignedData>& payload = ShredPayload[ChunkBeingShreddedIteration]; + if (payload == nullptr) { + payload = MakeHolder<TAlignedData>(Format.RoundUpToSectorSize(2097152)); + ui8* data = payload->Get(); + memset(data, ChunkBeingShreddedIteration == 0 ? 0x55 : 0xaa, payload->Size()); + } + ui64 size = std::min((ui64)Format.ChunkSize - ChunkBeingShreddedNextSectorIdx * Format.SectorSize, (ui64)payload->Size()); + ui64 offset = Format.Offset(ChunkBeingShredded, ChunkBeingShreddedNextSectorIdx); + ui64 reqIdx = ShredReqIdx++; + TCompletionAction *completionAction = new TChunkShredCompletion(this, ChunkBeingShredded, ChunkBeingShreddedNextSectorIdx, size, TReqId(TReqId::ChunkShred, reqIdx)); + ++ChunkBeingShreddedInFlight; + ChunkBeingShreddedNextSectorIdx += size / Format.SectorSize; + Mon.ChunkShred.CountRequest(size); + BlockDevice->PwriteAsync(payload->Get(), size, offset, completionAction, + TReqId(TReqId::ChunkShred, reqIdx), {}); + return; + } + break; + } + } + + // If there are no free chunks unshredded, we should ask a vdisk to shred its free space + ui32 shreddedFreeChunks = Keeper.GetFreeChunkCount(); ui32 finishedCount = 0; for (ui32 ownerId = 0; ownerId < OwnerData.size(); ++ownerId) { TOwnerData &data = OwnerData[ownerId]; @@ -3971,22 +4070,31 @@ void TPDisk::ProgressShredState() { } else if (data.ShredState != TOwnerData::VDISK_SHRED_STATE_SHRED_REQUESTED && data.ShredState != TOwnerData::VDISK_SHRED_STATE_SHRED_FINISHED) { std::vector<TChunkIdx> chunksToShred; - chunksToShred.reserve(ChunkState.size()); + chunksToShred.reserve(shreddedFreeChunks/2); for (TChunkIdx chunkIdx = 0; chunkIdx < ChunkState.size(); ++chunkIdx) { - if (ChunkState[chunkIdx].OwnerId == ownerId) { - // TODO(cthulhu): check if chunk is dirty + TChunkState& state = ChunkState[chunkIdx]; + // We need to shred only chunks that got dirty before the current shred generation + if (state.OwnerId == ownerId && state.IsDirty && state.ShredGeneration < ShredGeneration) { chunksToShred.push_back(chunkIdx); + if (chunksToShred.size() >= shreddedFreeChunks/2) { + break; + } } } - THolder<TEvShredVDisk> shredRequest(new TEvShredVDisk(ShredGeneration, chunksToShred)); - LOG_DEBUG_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED, - "PDisk# " << PCtx->PDiskId - << " sends shred request to VDisk# " << data.VDiskId - << " ownerId# " << ownerId - << " request# " << shredRequest->ToString()); - PCtx->ActorSystem->Send(new IEventHandle(data.CutLogId, PCtx->PDiskActor, shredRequest.Release())); - data.ShredState = TOwnerData::VDISK_SHRED_STATE_SHRED_REQUESTED; - data.LastShredGeneration = ShredGeneration; + if (chunksToShred.size() > 0) { + THolder<TEvShredVDisk> shredRequest(new TEvShredVDisk(ShredGeneration, chunksToShred)); + LOG_DEBUG_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED, + "PDisk# " << PCtx->PDiskId + << " sends shred request to VDisk# " << data.VDiskId + << " ownerId# " << ownerId + << " request# " << shredRequest->ToString()); + PCtx->ActorSystem->Send(new IEventHandle(data.CutLogId, PCtx->PDiskActor, shredRequest.Release())); + data.ShredState = TOwnerData::VDISK_SHRED_STATE_SHRED_REQUESTED; + data.LastShredGeneration = ShredGeneration; + } else { + data.ShredState = TOwnerData::VDISK_SHRED_STATE_SHRED_FINISHED; + data.LastShredGeneration = ShredGeneration; + } } if (data.ShredState != TOwnerData::VDISK_SHRED_STATE_SHRED_FINISHED) { LOG_DEBUG_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED, @@ -4170,12 +4278,52 @@ void TPDisk::ProcessShredVDiskResult(TShredVDiskResult& request) { ShredRequesters.clear(); return; } - OwnerData[request.Owner].ShredState = TOwnerData::VDISK_SHRED_STATE_SHRED_FINISHED; + OwnerData[request.Owner].ShredState = TOwnerData::VDISK_SHRED_STATE_COMPACT_FINISHED; ProgressShredState(); } void TPDisk::ProcessMarkDirty(TMarkDirty& request) { - Y_UNUSED(request); + LOG_DEBUG_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED, + "ProcessMarkDirty at PDisk# " << PCtx->PDiskId + << " ShredGeneration# " << ShredGeneration + << " request# " << request.ToString()); + { + bool isLogged = false; + ui64 markedDirty = 0; + TGuard<TMutex> guard(StateMutex); + for (auto chunkIdx : request.ChunksToMarkDirty) { + if (chunkIdx >= ChunkState.size()) { + if (!isLogged) { + isLogged = true; + LOG_CRIT_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED, + "MarkDirty contains invalid chunkIdx# " << chunkIdx << " for PDisk# " << PCtx->PDiskId + << " ShredGeneration# " << ShredGeneration << " request# " << request.ToString()); + } + } else { + if (!ChunkState[chunkIdx].IsDirty) { + ChunkState[chunkIdx].IsDirty = true; + markedDirty++; + LOG_DEBUG_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED, + "PDisk# " << PCtx->PDiskId << " marked chunkIdx# " << chunkIdx << " as dirty" + << " chunk.ShredGeneration# " << ChunkState[chunkIdx].ShredGeneration + << " ShredGeneration# " << ShredGeneration); + } + } + } + if (markedDirty > 0) { + // TODO(cthulhu): save dirty chunks to syslog + } + } +} + +void TPDisk::ProcessChunkShredResult(TChunkShredResult& request) { + LOG_TRACE_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED, + "ProcessChunkShredResult at PDisk# " << PCtx->PDiskId + << " ShredGeneration# " << ShredGeneration + << " request# " << request.ToString()); + Y_ABORT_UNLESS(ChunkBeingShreddedInFlight > 0); + --ChunkBeingShreddedInFlight; + ProgressShredState(); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h index 94a1110ae3..6b7dcb4f40 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h @@ -157,7 +157,13 @@ public: }; EShredState ShredState = EShredStateDefault; ui64 ShredGeneration = 0; + TChunkIdx ChunkBeingShredded = 0; + ui64 ChunkBeingShreddedIteration = 0; + ui64 ChunkBeingShreddedNextSectorIdx = 0; + ui64 ShredReqIdx = 0; + std::atomic<ui64> ChunkBeingShreddedInFlight = 0; std::deque<std::tuple<TActorId, ui64>> ShredRequesters; + THolder<TAlignedData> ShredPayload[2]; // Chunks that are owned by killed owner, but have operations InFlight TVector<TChunkIdx> QuarantineChunks; @@ -399,11 +405,13 @@ public: void HandleNextWriteMetadata(); void ProcessWriteMetadataResult(TWriteMetadataResult& request); + TChunkIdx GetUnshreddedFreeChunk(); void ProgressShredState(); void ProcessShredPDisk(TShredPDisk& request); void ProcessPreShredCompactVDiskResult(TPreShredCompactVDiskResult& request); void ProcessShredVDiskResult(TShredVDiskResult& request); void ProcessMarkDirty(TMarkDirty& request); + void ProcessChunkShredResult(TChunkShredResult& request); void DropAllMetadataRequests(); diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp index 09ea4f50d3..50f188854a 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp @@ -1038,6 +1038,10 @@ NKikimrProto::EReplyStatus TPDisk::BeforeLoggingCommitRecord(const TLogWrite &lo if (logWrite.CommitRecord.DeleteToDecommitted) { for (ui32 chunkIdx : logWrite.CommitRecord.DeleteChunks) { TChunkState& state = ChunkState[chunkIdx]; + if (!state.IsDirty) { + // TODO(cthulhu): log that chunk got dirty + state.IsDirty = true; + } switch (state.CommitState) { case TChunkState::DATA_RESERVED: state.CommitState = TChunkState::DATA_RESERVED_DECOMMIT_IN_PROGRESS; @@ -1057,6 +1061,10 @@ NKikimrProto::EReplyStatus TPDisk::BeforeLoggingCommitRecord(const TLogWrite &lo } else { for (ui32 chunkIdx : logWrite.CommitRecord.DeleteChunks) { TChunkState& state = ChunkState[chunkIdx]; + if (!state.IsDirty) { + // TODO(cthulhu): log that chunk got dirty + state.IsDirty = true; + } if (state.HasAnyOperationsInProgress()) { switch (state.CommitState) { case TChunkState::DATA_RESERVED: diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_keeper.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_keeper.h index 313d7acfd0..54dfa6e207 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_keeper.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_keeper.h @@ -26,6 +26,8 @@ protected: TFreeChunks TrimmedFreeChunks; // Trimmed free chunk list for fast allocation TChunkTracker ChunkTracker; + + friend class TPDisk; public: TKeeper(TPDiskMon &mon, TIntrusivePtr<TPDiskConfig> cfg) diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.cpp index 24d1b19232..d9107d9186 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.cpp @@ -235,6 +235,7 @@ TPDiskMon::TPDiskMon(const TIntrusivePtr<::NMonitoring::TDynamicCounters>& count IO_REQ_INIT(PDiskGroup, WriteHuge, WriteHuge); IO_REQ_INIT(PDiskGroup, WriteComp, WriteComp); IO_REQ_INIT(PDiskGroup, Trim, WriteTrim); + IO_REQ_INIT(PDiskGroup, ChunkShred, WriteShred); IO_REQ_INIT_IF_EXTENDED(PDiskGroup, ReadSyncLog, ReadSyncLog); IO_REQ_INIT_IF_EXTENDED(PDiskGroup, ReadComp, ReadComp); diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.h index 360422822f..347b35e5df 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.h @@ -479,7 +479,7 @@ struct TPDiskMon { TIoCounters WriteHuge; TIoCounters WriteComp; TIoCounters Trim; - + TIoCounters ChunkShred; TIoCounters ReadSyncLog; TIoCounters ReadComp; TIoCounters ReadOnlineRt; diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_request_id.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_request_id.h index ace05ce80f..7a72906294 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_request_id.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_request_id.h @@ -93,6 +93,8 @@ struct TReqId { PreShredCompactVDiskResult = 74, ShredVDiskResult = 75, MarkDirty = 76, + ChunkShred = 77, + ChunkShredResult = 78, }; // 56 bit idx, 8 bit source @@ -161,6 +163,7 @@ enum class ERequestType { RequestPreShredCompactVDiskResult, RequestShredVDiskResult, RequestMarkDirty, + RequestChunkShredResult, }; inline IOutputStream& operator <<(IOutputStream& out, const TReqId& reqId) { diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h index a5f75eae97..97119c34a5 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h @@ -1018,6 +1018,36 @@ public: } }; +// +// TChunkShredResult +// +class TChunkShredResult : public TRequestBase { +public: + TChunkIdx Chunk; + ui32 SectorIdx; + ui64 ShredSize; + + TChunkShredResult(TChunkIdx chunk, ui32 sectorIdx, ui64 shredSize, TAtomicBase reqIdx) + : TRequestBase(TActorId(), TReqId(TReqId::ChunkShredResult, reqIdx), OwnerSystem, 0, NPriInternal::Other, {}) + , Chunk(chunk) + , SectorIdx(sectorIdx) + , ShredSize(shredSize) + {} + + ERequestType GetType() const override { + return ERequestType::RequestChunkShredResult; + } + + TString ToString() const { + TStringStream str; + str << "TChunkShredResult { Chunk# " << Chunk + << " SectorIdx# " << SectorIdx + << " ShredSize# " << ShredSize << "}"; + return str.Str(); + } +}; + + class TStopDevice : public TRequestBase { public: TStopDevice(TAtomicBase reqIdx) diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h index 86efd87864..56fa758383 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h @@ -279,8 +279,9 @@ struct TChunkState { std::atomic<i64> OperationsInProgress; TOwner OwnerId; ECommitState CommitState; + std::atomic<bool> IsDirty; ui64 CommitsInProgress; - + ui64 ShredGeneration; TChunkState() : Nonce(0) , CurrentNonce(0) @@ -288,7 +289,9 @@ struct TChunkState { , OperationsInProgress(0) , OwnerId(OwnerUnallocated) , CommitState(FREE) + , IsDirty(false) , CommitsInProgress(0) + , ShredGeneration(0) {} bool HasAnyOperationsInProgress() const { @@ -308,7 +311,9 @@ struct TChunkState { OUT_VAR(OperationsInProgress.load()); OUT_VAR(OwnerId); OUT_VAR(CommitState); + OUT_VAR(IsDirty.load()); OUT_VAR(CommitsInProgress); + OUT_VAR(ShredGeneration); str << "}"; return str.Str(); } diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp index f0e104abd6..36505c86fa 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp @@ -1448,6 +1448,21 @@ Y_UNIT_TEST_SUITE(ShredPDisk) { vdisk.SendEvLogSync(); testCtx.Send(new NPDisk::TEvShredPDisk(shredGeneration)); vdisk.RespondToPreShredCompact(shredGeneration, NKikimrProto::OK, ""); + THolder<NPDisk::TEvShredPDiskResult> res = testCtx.TestResponse<NPDisk::TEvShredPDiskResult>(nullptr, NKikimrProto::OK); + UNIT_ASSERT_VALUES_EQUAL(res->ErrorReason, ""); + UNIT_ASSERT_VALUES_EQUAL(res->ShredGeneration, shredGeneration); + } + Y_UNIT_TEST(SimpleShredDirtyChunks) { + ui64 shredGeneration = 1; + TActorTestContext testCtx{{}}; + TVDiskMock vdisk(&testCtx); + vdisk.InitFull(); + vdisk.SendEvLogSync(); + vdisk.ReserveChunk(); + vdisk.CommitReservedChunks(); + vdisk.MarkCommitedChunksDirty(); + testCtx.Send(new NPDisk::TEvShredPDisk(shredGeneration)); + vdisk.RespondToPreShredCompact(shredGeneration, NKikimrProto::OK, ""); vdisk.RespondToShred(shredGeneration, NKikimrProto::OK, ""); THolder<NPDisk::TEvShredPDiskResult> res = testCtx.TestResponse<NPDisk::TEvShredPDiskResult>(nullptr, NKikimrProto::OK); UNIT_ASSERT_VALUES_EQUAL(res->ErrorReason, ""); @@ -1474,6 +1489,9 @@ Y_UNIT_TEST_SUITE(ShredPDisk) { TVDiskMock vdisk(&testCtx); vdisk.InitFull(); vdisk.SendEvLogSync(); + vdisk.ReserveChunk(); + vdisk.CommitReservedChunks(); + vdisk.MarkCommitedChunksDirty(); testCtx.Send(new NPDisk::TEvShredPDisk(shredGeneration)); vdisk.RespondToPreShredCompact(shredGeneration, NKikimrProto::OK, ""); THolder<NPDisk::TEvShredVDisk> evReq = testCtx.Recv<NPDisk::TEvShredVDisk>(); @@ -1494,11 +1512,18 @@ Y_UNIT_TEST_SUITE(ShredPDisk) { vdisk2.InitFull(); vdisk2.SendEvLogSync(); testCtx.RestartPDiskSync(); + vdisk.InitFull(); vdisk.SendEvLogSync(); + vdisk.ReserveChunk(); + vdisk.CommitReservedChunks(); + vdisk.MarkCommitedChunksDirty(); testCtx.Send(new NPDisk::TEvShredPDisk(shredGeneration)); vdisk.RespondToPreShredCompact(shredGeneration, NKikimrProto::OK, ""); vdisk2.InitFull(); + vdisk2.ReserveChunk(); + vdisk2.CommitReservedChunks(); + vdisk2.MarkCommitedChunksDirty(); vdisk2.SendEvLogSync(); vdisk2.RespondToPreShredCompact(shredGeneration, NKikimrProto::OK, ""); vdisk.RespondToShred(shredGeneration, NKikimrProto::OK, ""); @@ -1513,6 +1538,9 @@ Y_UNIT_TEST_SUITE(ShredPDisk) { TVDiskMock vdisk(&testCtx); vdisk.InitFull(); vdisk.SendEvLogSync(); + vdisk.ReserveChunk(); + vdisk.CommitReservedChunks(); + vdisk.MarkCommitedChunksDirty(); testCtx.Send(new NPDisk::TEvShredPDisk(shredGeneration)); THolder<NPDisk::TEvPreShredCompactVDisk> evReq = testCtx.Recv<NPDisk::TEvPreShredCompactVDisk>(); UNIT_ASSERT_VALUES_UNEQUAL(evReq.Get(), nullptr); @@ -1530,6 +1558,9 @@ Y_UNIT_TEST_SUITE(ShredPDisk) { TVDiskMock vdisk(&testCtx); vdisk.InitFull(); vdisk.SendEvLogSync(); + vdisk.ReserveChunk(); + vdisk.CommitReservedChunks(); + vdisk.MarkCommitedChunksDirty(); testCtx.Send(new NPDisk::TEvShredPDisk(shredGeneration)); vdisk.RespondToPreShredCompact(shredGeneration, NKikimrProto::OK, ""); THolder<NPDisk::TEvShredVDisk> evReq = testCtx.Recv<NPDisk::TEvShredVDisk>(); diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h index 687dee85f9..130643646b 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h @@ -279,7 +279,6 @@ struct TVDiskMock { UNIT_ASSERT_C(commited.empty(), "there are leaked chunks# " << FormatList(commited)); } - void ReserveChunk() { const auto evReserveRes = TestCtx->TestResponse<NPDisk::TEvChunkReserveResult>( new NPDisk::TEvChunkReserve(PDiskParams->Owner, PDiskParams->OwnerRound, 1), @@ -298,6 +297,16 @@ struct TVDiskMock { reservedChunks.clear(); } + void MarkCommitedChunksDirty() { + auto& commited = Chunks[EChunkState::COMMITTED]; + TStackVec<TChunkIdx, 1> chunksToMark; + for (auto it = commited.begin(); it != commited.end(); ++it) { + chunksToMark.push_back(*it); + } + auto evMark = MakeHolder<NPDisk::TEvMarkDirty>(PDiskParams->Owner, PDiskParams->OwnerRound, chunksToMark); + TestCtx->Send(evMark.Release()); + } + void DeleteCommitedChunks() { auto& commited = Chunks[EChunkState::COMMITTED]; NPDisk::TCommitRecord rec; @@ -366,6 +375,17 @@ struct TVDiskMock { void RespondToShred(ui64 shredGeneration, NKikimrProto::EReplyStatus status, const TString& errorReason) { THolder<NPDisk::TEvShredVDisk> evReq = TestCtx->Recv<NPDisk::TEvShredVDisk>(); if (evReq) { + auto& commited = Chunks[EChunkState::COMMITTED]; + NPDisk::TCommitRecord rec; + rec.DeleteChunks = TVector<TChunkIdx>(); + for (const TChunkIdx &idx : evReq->ChunksToShred) { + if (commited.contains(idx)) { + rec.DeleteChunks.push_back(idx); + Chunks[EChunkState::DELETED].insert(idx); + commited.erase(idx); + } + } + SendEvLogImpl(1, rec); TestCtx->Send(new NPDisk::TEvShredVDiskResult(PDiskParams->Owner, PDiskParams->OwnerRound, shredGeneration, status, errorReason)); } |