aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCthulhu <cthulhu@ydb.tech>2025-02-11 19:24:38 +0300
committerGitHub <noreply@github.com>2025-02-11 19:24:38 +0300
commit4ac9f23d125ed4bbc5321e0051358c5c6049dc1a (patch)
tree5a220d33d6856b84adbd39041db6805fab9f8c47
parent108ab9a4612caa48f0ed14f2c8ddc7c7b57f7376 (diff)
downloadydb-4ac9f23d125ed4bbc5321e0051358c5c6049dc1a.tar.gz
Shred sectors and track dirty in-memory #12483 (#14411)
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.cpp12
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.h24
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_free_chunks.h27
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp178
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h8
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp8
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_keeper.h2
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.cpp1
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.h2
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_request_id.h3
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h30
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h7
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp31
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h22
14 files changed, 335 insertions, 20 deletions
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.cpp
index 1fe8683792..93398e0a93 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.cpp
@@ -404,5 +404,17 @@ void TChunkTrimCompletion::Exec(TActorSystem *actorSystem) {
delete this;
}
+void TChunkShredCompletion::Exec(TActorSystem *actorSystem) {
+ LOG_TRACE_S(*actorSystem, NKikimrServices::BS_PDISK_SHRED,
+ "PDiskId# " << PDisk->PCtx->PDiskId << " ReqId# " << ReqId
+ << " TChunkShredCompletion Chunk# " << Chunk
+ << " SectorIdx# " << SectorIdx
+ << " SizeBytes# " << SizeBytes);
+ PDisk->Mon.ChunkShred.CountResponse();
+ TChunkShredResult *shredResult = PDisk->ReqCreator.CreateFromArgs<TChunkShredResult>(Chunk, SectorIdx, SizeBytes);
+ PDisk->InputRequest(shredResult);
+ delete this;
+}
+
} // NPDisk
} // NKikimr
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.h
index bd2d99dfe3..5659eb5f76 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.h
@@ -359,6 +359,30 @@ public:
}
};
+class TChunkShredCompletion : public TCompletionAction {
+ TPDisk *PDisk;
+ TChunkIdx Chunk;
+ ui32 SectorIdx;
+ size_t SizeBytes;
+ TReqId ReqId;
+
+public:
+ TChunkShredCompletion(TPDisk *pdisk, TChunkIdx chunk, ui32 sectorIdx, size_t sizeBytes, TReqId reqId)
+ : PDisk(pdisk)
+ , Chunk(chunk)
+ , SectorIdx(sectorIdx)
+ , SizeBytes(sizeBytes)
+ , ReqId(reqId)
+ {}
+
+ void Exec(TActorSystem *actorSystem) override;
+
+ void Release(TActorSystem *actorSystem) override {
+ Y_UNUSED(actorSystem);
+ delete this;
+ }
+};
+
class TCompletionSequence : public TCompletionAction {
TVector<TCompletionAction*> Actions;
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_free_chunks.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_free_chunks.h
index da59aa6a19..ee51f096ed 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_free_chunks.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_free_chunks.h
@@ -53,9 +53,32 @@ public:
return idx;
}
- ui32 Size() const {
- return AtomicGet(FreeChunkCount);
+ TDeque<TChunkIdx>::const_iterator begin() const {
+ return FreeChunks.begin();
}
+
+ TDeque<TChunkIdx>::const_iterator end() const {
+ return FreeChunks.end();
+ }
+
+ TChunkIdx PopAt(TDeque<TChunkIdx>::const_iterator it) {
+ Y_VERIFY(it != FreeChunks.end());
+ Y_VERIFY(FreeChunks.size() > 0);
+ TChunkIdx idx = *it;
+ FreeChunks.erase(it);
+ AtomicDecrement(FreeChunkCount);
+ MonFreeChunks->Dec();
+ return idx;
+ }
+
+ void PushFront(TChunkIdx idx) {
+ FreeChunks.push_front(idx);
+ AtomicIncrement(FreeChunkCount);
+ MonFreeChunks->Inc();
+ }
+
+ // A thread-safe function that returns the current number of free chunks.
+ ui32 Size() const { return AtomicGet(FreeChunkCount); }
};
} // NPDisk
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp
index 95ff2dc59e..f3151d250b 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp
@@ -2577,6 +2577,9 @@ void TPDisk::ProcessFastOperationsQueue() {
case ERequestType::RequestMarkDirty:
ProcessMarkDirty(static_cast<TMarkDirty&>(*req));
break;
+ case ERequestType::RequestChunkShredResult:
+ ProcessChunkShredResult(static_cast<TChunkShredResult&>(*req));
+ break;
default:
Y_FAIL_S("Unexpected request type# " << TypeName(*req));
break;
@@ -3187,6 +3190,7 @@ bool TPDisk::PreprocessRequest(TRequestBase *request) {
case ERequestType::RequestPreShredCompactVDiskResult:
case ERequestType::RequestShredVDiskResult:
case ERequestType::RequestMarkDirty:
+ case ERequestType::RequestChunkShredResult:
break;
case ERequestType::RequestStopDevice:
BlockDevice->Stop();
@@ -3883,6 +3887,7 @@ bool TPDisk::HandleReadOnlyIfWrite(TRequestBase *request) {
case ERequestType::RequestPreShredCompactVDiskResult:
case ERequestType::RequestShredVDiskResult:
case ERequestType::RequestMarkDirty:
+ case ERequestType::RequestChunkShredResult:
// These requests don't require response.
return true;
}
@@ -3913,8 +3918,31 @@ void TPDisk::AddCbsSet(ui32 ownerId) {
SchedulerConfigure(conf);
}
+TChunkIdx TPDisk::GetUnshreddedFreeChunk() {
+ // Find a free unshredded chunk
+ for (TFreeChunks* freeChunks : {&Keeper.UntrimmedFreeChunks, &Keeper.TrimmedFreeChunks}) {
+ for (auto it = freeChunks->begin(); it != freeChunks->end(); ++it) {
+ TChunkIdx chunkIdx = *it;
+ TChunkState& state = ChunkState[chunkIdx];
+ // Look for free chunks that haven't been shredded in this generation
+ if (state.CommitState == TChunkState::FREE && state.IsDirty && state.ShredGeneration < ShredGeneration) {
+ // Found an unshredded free chunk
+ TChunkIdx unshreddedChunkIdx = freeChunks->PopAt(it);
+ Y_VERIFY(unshreddedChunkIdx == chunkIdx);
+ // Mark it as being shredded and update its generation
+ LOG_DEBUG_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED,
+ "PDisk# " << PCtx->PDiskId
+ << " found unshredded free chunk# " << chunkIdx
+ << " ShredGeneration# " << ShredGeneration);
+ return unshreddedChunkIdx;
+ }
+ }
+ }
+ return 0;
+}
+
void TPDisk::ProgressShredState() {
- LOG_DEBUG_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED,
+ LOG_TRACE_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED,
"ProgressShredState at PDisk# " << PCtx->PDiskId
<< " ShredGeneration# " << ShredGeneration
<< " ShredState# " << (ui32)ShredState);
@@ -3959,9 +3987,80 @@ void TPDisk::ProgressShredState() {
<< " has finished all pre-shred compact VDisk requests"
<< " ShredGeneration# " << ShredGeneration
<< " finishedCount# " << finishedCount);
+ // All preparations are done, no junk chunks can be unmarked,
+ // Update chunk states and start shredding the empty space
+ for (TChunkIdx chunkIdx = 0; chunkIdx < ChunkState.size(); ++chunkIdx) {
+ TChunkState& state = ChunkState[chunkIdx];
+ // Update shred generation for all the clean chunks
+ if (!state.IsDirty) {
+ state.ShredGeneration = ShredGeneration;
+ }
+ }
ShredState = EShredStateSendShredVDisk;
}
if (ShredState == EShredStateSendShredVDisk) {
+ // Shred free space while possible
+ if (ChunkBeingShredded == 0) {
+ ChunkBeingShredded = GetUnshreddedFreeChunk();
+ }
+ if (ChunkBeingShredded != 0) {
+ // Continue shredding the free chunk
+ while (true) {
+ if (ChunkBeingShreddedInFlight >= 2) {
+ // We have enough in-flight requests, don't start a new one
+ return;
+ }
+ if (ChunkBeingShreddedNextSectorIdx * Format.SectorSize >= Format.ChunkSize) {
+ ++ChunkBeingShreddedIteration;
+ ChunkBeingShreddedNextSectorIdx = 0;
+ }
+ if (ChunkBeingShreddedIteration >= 2) {
+ // We have enough iterations, don't start a new one, just wait for the in-flight requests to finish
+ if (ChunkBeingShreddedInFlight > 0) {
+ return;
+ }
+ // Done shredding the chunk, mark it clean and push it back to the free chunks
+ LOG_DEBUG_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED, "PDisk# " << PCtx->PDiskId
+ << " is done shredding chunk ChunkBeingShredded# " << ChunkBeingShredded);
+ TChunkState &state = ChunkState[ChunkBeingShredded];
+ state.OperationsInProgress--;
+ state.IsDirty = false;
+ state.ShredGeneration = ShredGeneration;
+ Y_VERIFY(ChunkState[ChunkBeingShredded].OperationsInProgress == 0);
+ Keeper.UntrimmedFreeChunks.PushFront(ChunkBeingShredded);
+ ChunkBeingShredded = GetUnshreddedFreeChunk();
+ ChunkBeingShreddedIteration = 0;
+ ChunkBeingShreddedNextSectorIdx = 0;
+ }
+ if (ChunkBeingShredded) {
+ if (ChunkBeingShreddedIteration == 0 && ChunkBeingShreddedNextSectorIdx == 0) {
+ Y_VERIFY(ChunkState[ChunkBeingShredded].OperationsInProgress == 0);
+ ChunkState[ChunkBeingShredded].OperationsInProgress++;
+ }
+ // Continue shredding the chunk: send a write request to the device using the iteration-specific pattern
+ THolder<TAlignedData>& payload = ShredPayload[ChunkBeingShreddedIteration];
+ if (payload == nullptr) {
+ payload = MakeHolder<TAlignedData>(Format.RoundUpToSectorSize(2097152));
+ ui8* data = payload->Get();
+ memset(data, ChunkBeingShreddedIteration == 0 ? 0x55 : 0xaa, payload->Size());
+ }
+ ui64 size = std::min((ui64)Format.ChunkSize - ChunkBeingShreddedNextSectorIdx * Format.SectorSize, (ui64)payload->Size());
+ ui64 offset = Format.Offset(ChunkBeingShredded, ChunkBeingShreddedNextSectorIdx);
+ ui64 reqIdx = ShredReqIdx++;
+ TCompletionAction *completionAction = new TChunkShredCompletion(this, ChunkBeingShredded, ChunkBeingShreddedNextSectorIdx, size, TReqId(TReqId::ChunkShred, reqIdx));
+ ++ChunkBeingShreddedInFlight;
+ ChunkBeingShreddedNextSectorIdx += size / Format.SectorSize;
+ Mon.ChunkShred.CountRequest(size);
+ BlockDevice->PwriteAsync(payload->Get(), size, offset, completionAction,
+ TReqId(TReqId::ChunkShred, reqIdx), {});
+ return;
+ }
+ break;
+ }
+ }
+
+ // If there are no free chunks unshredded, we should ask a vdisk to shred its free space
+ ui32 shreddedFreeChunks = Keeper.GetFreeChunkCount();
ui32 finishedCount = 0;
for (ui32 ownerId = 0; ownerId < OwnerData.size(); ++ownerId) {
TOwnerData &data = OwnerData[ownerId];
@@ -3971,22 +4070,31 @@ void TPDisk::ProgressShredState() {
} else if (data.ShredState != TOwnerData::VDISK_SHRED_STATE_SHRED_REQUESTED
&& data.ShredState != TOwnerData::VDISK_SHRED_STATE_SHRED_FINISHED) {
std::vector<TChunkIdx> chunksToShred;
- chunksToShred.reserve(ChunkState.size());
+ chunksToShred.reserve(shreddedFreeChunks/2);
for (TChunkIdx chunkIdx = 0; chunkIdx < ChunkState.size(); ++chunkIdx) {
- if (ChunkState[chunkIdx].OwnerId == ownerId) {
- // TODO(cthulhu): check if chunk is dirty
+ TChunkState& state = ChunkState[chunkIdx];
+ // We need to shred only chunks that got dirty before the current shred generation
+ if (state.OwnerId == ownerId && state.IsDirty && state.ShredGeneration < ShredGeneration) {
chunksToShred.push_back(chunkIdx);
+ if (chunksToShred.size() >= shreddedFreeChunks/2) {
+ break;
+ }
}
}
- THolder<TEvShredVDisk> shredRequest(new TEvShredVDisk(ShredGeneration, chunksToShred));
- LOG_DEBUG_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED,
- "PDisk# " << PCtx->PDiskId
- << " sends shred request to VDisk# " << data.VDiskId
- << " ownerId# " << ownerId
- << " request# " << shredRequest->ToString());
- PCtx->ActorSystem->Send(new IEventHandle(data.CutLogId, PCtx->PDiskActor, shredRequest.Release()));
- data.ShredState = TOwnerData::VDISK_SHRED_STATE_SHRED_REQUESTED;
- data.LastShredGeneration = ShredGeneration;
+ if (chunksToShred.size() > 0) {
+ THolder<TEvShredVDisk> shredRequest(new TEvShredVDisk(ShredGeneration, chunksToShred));
+ LOG_DEBUG_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED,
+ "PDisk# " << PCtx->PDiskId
+ << " sends shred request to VDisk# " << data.VDiskId
+ << " ownerId# " << ownerId
+ << " request# " << shredRequest->ToString());
+ PCtx->ActorSystem->Send(new IEventHandle(data.CutLogId, PCtx->PDiskActor, shredRequest.Release()));
+ data.ShredState = TOwnerData::VDISK_SHRED_STATE_SHRED_REQUESTED;
+ data.LastShredGeneration = ShredGeneration;
+ } else {
+ data.ShredState = TOwnerData::VDISK_SHRED_STATE_SHRED_FINISHED;
+ data.LastShredGeneration = ShredGeneration;
+ }
}
if (data.ShredState != TOwnerData::VDISK_SHRED_STATE_SHRED_FINISHED) {
LOG_DEBUG_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED,
@@ -4170,12 +4278,52 @@ void TPDisk::ProcessShredVDiskResult(TShredVDiskResult& request) {
ShredRequesters.clear();
return;
}
- OwnerData[request.Owner].ShredState = TOwnerData::VDISK_SHRED_STATE_SHRED_FINISHED;
+ OwnerData[request.Owner].ShredState = TOwnerData::VDISK_SHRED_STATE_COMPACT_FINISHED;
ProgressShredState();
}
void TPDisk::ProcessMarkDirty(TMarkDirty& request) {
- Y_UNUSED(request);
+ LOG_DEBUG_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED,
+ "ProcessMarkDirty at PDisk# " << PCtx->PDiskId
+ << " ShredGeneration# " << ShredGeneration
+ << " request# " << request.ToString());
+ {
+ bool isLogged = false;
+ ui64 markedDirty = 0;
+ TGuard<TMutex> guard(StateMutex);
+ for (auto chunkIdx : request.ChunksToMarkDirty) {
+ if (chunkIdx >= ChunkState.size()) {
+ if (!isLogged) {
+ isLogged = true;
+ LOG_CRIT_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED,
+ "MarkDirty contains invalid chunkIdx# " << chunkIdx << " for PDisk# " << PCtx->PDiskId
+ << " ShredGeneration# " << ShredGeneration << " request# " << request.ToString());
+ }
+ } else {
+ if (!ChunkState[chunkIdx].IsDirty) {
+ ChunkState[chunkIdx].IsDirty = true;
+ markedDirty++;
+ LOG_DEBUG_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED,
+ "PDisk# " << PCtx->PDiskId << " marked chunkIdx# " << chunkIdx << " as dirty"
+ << " chunk.ShredGeneration# " << ChunkState[chunkIdx].ShredGeneration
+ << " ShredGeneration# " << ShredGeneration);
+ }
+ }
+ }
+ if (markedDirty > 0) {
+ // TODO(cthulhu): save dirty chunks to syslog
+ }
+ }
+}
+
+void TPDisk::ProcessChunkShredResult(TChunkShredResult& request) {
+ LOG_TRACE_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED,
+ "ProcessChunkShredResult at PDisk# " << PCtx->PDiskId
+ << " ShredGeneration# " << ShredGeneration
+ << " request# " << request.ToString());
+ Y_ABORT_UNLESS(ChunkBeingShreddedInFlight > 0);
+ --ChunkBeingShreddedInFlight;
+ ProgressShredState();
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h
index 94a1110ae3..6b7dcb4f40 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h
@@ -157,7 +157,13 @@ public:
};
EShredState ShredState = EShredStateDefault;
ui64 ShredGeneration = 0;
+ TChunkIdx ChunkBeingShredded = 0;
+ ui64 ChunkBeingShreddedIteration = 0;
+ ui64 ChunkBeingShreddedNextSectorIdx = 0;
+ ui64 ShredReqIdx = 0;
+ std::atomic<ui64> ChunkBeingShreddedInFlight = 0;
std::deque<std::tuple<TActorId, ui64>> ShredRequesters;
+ THolder<TAlignedData> ShredPayload[2];
// Chunks that are owned by killed owner, but have operations InFlight
TVector<TChunkIdx> QuarantineChunks;
@@ -399,11 +405,13 @@ public:
void HandleNextWriteMetadata();
void ProcessWriteMetadataResult(TWriteMetadataResult& request);
+ TChunkIdx GetUnshreddedFreeChunk();
void ProgressShredState();
void ProcessShredPDisk(TShredPDisk& request);
void ProcessPreShredCompactVDiskResult(TPreShredCompactVDiskResult& request);
void ProcessShredVDiskResult(TShredVDiskResult& request);
void ProcessMarkDirty(TMarkDirty& request);
+ void ProcessChunkShredResult(TChunkShredResult& request);
void DropAllMetadataRequests();
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp
index 09ea4f50d3..50f188854a 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp
@@ -1038,6 +1038,10 @@ NKikimrProto::EReplyStatus TPDisk::BeforeLoggingCommitRecord(const TLogWrite &lo
if (logWrite.CommitRecord.DeleteToDecommitted) {
for (ui32 chunkIdx : logWrite.CommitRecord.DeleteChunks) {
TChunkState& state = ChunkState[chunkIdx];
+ if (!state.IsDirty) {
+ // TODO(cthulhu): log that chunk got dirty
+ state.IsDirty = true;
+ }
switch (state.CommitState) {
case TChunkState::DATA_RESERVED:
state.CommitState = TChunkState::DATA_RESERVED_DECOMMIT_IN_PROGRESS;
@@ -1057,6 +1061,10 @@ NKikimrProto::EReplyStatus TPDisk::BeforeLoggingCommitRecord(const TLogWrite &lo
} else {
for (ui32 chunkIdx : logWrite.CommitRecord.DeleteChunks) {
TChunkState& state = ChunkState[chunkIdx];
+ if (!state.IsDirty) {
+ // TODO(cthulhu): log that chunk got dirty
+ state.IsDirty = true;
+ }
if (state.HasAnyOperationsInProgress()) {
switch (state.CommitState) {
case TChunkState::DATA_RESERVED:
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_keeper.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_keeper.h
index 313d7acfd0..54dfa6e207 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_keeper.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_keeper.h
@@ -26,6 +26,8 @@ protected:
TFreeChunks TrimmedFreeChunks; // Trimmed free chunk list for fast allocation
TChunkTracker ChunkTracker;
+
+ friend class TPDisk;
public:
TKeeper(TPDiskMon &mon, TIntrusivePtr<TPDiskConfig> cfg)
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.cpp
index 24d1b19232..d9107d9186 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.cpp
@@ -235,6 +235,7 @@ TPDiskMon::TPDiskMon(const TIntrusivePtr<::NMonitoring::TDynamicCounters>& count
IO_REQ_INIT(PDiskGroup, WriteHuge, WriteHuge);
IO_REQ_INIT(PDiskGroup, WriteComp, WriteComp);
IO_REQ_INIT(PDiskGroup, Trim, WriteTrim);
+ IO_REQ_INIT(PDiskGroup, ChunkShred, WriteShred);
IO_REQ_INIT_IF_EXTENDED(PDiskGroup, ReadSyncLog, ReadSyncLog);
IO_REQ_INIT_IF_EXTENDED(PDiskGroup, ReadComp, ReadComp);
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.h
index 360422822f..347b35e5df 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.h
@@ -479,7 +479,7 @@ struct TPDiskMon {
TIoCounters WriteHuge;
TIoCounters WriteComp;
TIoCounters Trim;
-
+ TIoCounters ChunkShred;
TIoCounters ReadSyncLog;
TIoCounters ReadComp;
TIoCounters ReadOnlineRt;
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_request_id.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_request_id.h
index ace05ce80f..7a72906294 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_request_id.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_request_id.h
@@ -93,6 +93,8 @@ struct TReqId {
PreShredCompactVDiskResult = 74,
ShredVDiskResult = 75,
MarkDirty = 76,
+ ChunkShred = 77,
+ ChunkShredResult = 78,
};
// 56 bit idx, 8 bit source
@@ -161,6 +163,7 @@ enum class ERequestType {
RequestPreShredCompactVDiskResult,
RequestShredVDiskResult,
RequestMarkDirty,
+ RequestChunkShredResult,
};
inline IOutputStream& operator <<(IOutputStream& out, const TReqId& reqId) {
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h
index a5f75eae97..97119c34a5 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h
@@ -1018,6 +1018,36 @@ public:
}
};
+//
+// TChunkShredResult
+//
+class TChunkShredResult : public TRequestBase {
+public:
+ TChunkIdx Chunk;
+ ui32 SectorIdx;
+ ui64 ShredSize;
+
+ TChunkShredResult(TChunkIdx chunk, ui32 sectorIdx, ui64 shredSize, TAtomicBase reqIdx)
+ : TRequestBase(TActorId(), TReqId(TReqId::ChunkShredResult, reqIdx), OwnerSystem, 0, NPriInternal::Other, {})
+ , Chunk(chunk)
+ , SectorIdx(sectorIdx)
+ , ShredSize(shredSize)
+ {}
+
+ ERequestType GetType() const override {
+ return ERequestType::RequestChunkShredResult;
+ }
+
+ TString ToString() const {
+ TStringStream str;
+ str << "TChunkShredResult { Chunk# " << Chunk
+ << " SectorIdx# " << SectorIdx
+ << " ShredSize# " << ShredSize << "}";
+ return str.Str();
+ }
+};
+
+
class TStopDevice : public TRequestBase {
public:
TStopDevice(TAtomicBase reqIdx)
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h
index 86efd87864..56fa758383 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h
@@ -279,8 +279,9 @@ struct TChunkState {
std::atomic<i64> OperationsInProgress;
TOwner OwnerId;
ECommitState CommitState;
+ std::atomic<bool> IsDirty;
ui64 CommitsInProgress;
-
+ ui64 ShredGeneration;
TChunkState()
: Nonce(0)
, CurrentNonce(0)
@@ -288,7 +289,9 @@ struct TChunkState {
, OperationsInProgress(0)
, OwnerId(OwnerUnallocated)
, CommitState(FREE)
+ , IsDirty(false)
, CommitsInProgress(0)
+ , ShredGeneration(0)
{}
bool HasAnyOperationsInProgress() const {
@@ -308,7 +311,9 @@ struct TChunkState {
OUT_VAR(OperationsInProgress.load());
OUT_VAR(OwnerId);
OUT_VAR(CommitState);
+ OUT_VAR(IsDirty.load());
OUT_VAR(CommitsInProgress);
+ OUT_VAR(ShredGeneration);
str << "}";
return str.Str();
}
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp
index f0e104abd6..36505c86fa 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp
@@ -1448,6 +1448,21 @@ Y_UNIT_TEST_SUITE(ShredPDisk) {
vdisk.SendEvLogSync();
testCtx.Send(new NPDisk::TEvShredPDisk(shredGeneration));
vdisk.RespondToPreShredCompact(shredGeneration, NKikimrProto::OK, "");
+ THolder<NPDisk::TEvShredPDiskResult> res = testCtx.TestResponse<NPDisk::TEvShredPDiskResult>(nullptr, NKikimrProto::OK);
+ UNIT_ASSERT_VALUES_EQUAL(res->ErrorReason, "");
+ UNIT_ASSERT_VALUES_EQUAL(res->ShredGeneration, shredGeneration);
+ }
+ Y_UNIT_TEST(SimpleShredDirtyChunks) {
+ ui64 shredGeneration = 1;
+ TActorTestContext testCtx{{}};
+ TVDiskMock vdisk(&testCtx);
+ vdisk.InitFull();
+ vdisk.SendEvLogSync();
+ vdisk.ReserveChunk();
+ vdisk.CommitReservedChunks();
+ vdisk.MarkCommitedChunksDirty();
+ testCtx.Send(new NPDisk::TEvShredPDisk(shredGeneration));
+ vdisk.RespondToPreShredCompact(shredGeneration, NKikimrProto::OK, "");
vdisk.RespondToShred(shredGeneration, NKikimrProto::OK, "");
THolder<NPDisk::TEvShredPDiskResult> res = testCtx.TestResponse<NPDisk::TEvShredPDiskResult>(nullptr, NKikimrProto::OK);
UNIT_ASSERT_VALUES_EQUAL(res->ErrorReason, "");
@@ -1474,6 +1489,9 @@ Y_UNIT_TEST_SUITE(ShredPDisk) {
TVDiskMock vdisk(&testCtx);
vdisk.InitFull();
vdisk.SendEvLogSync();
+ vdisk.ReserveChunk();
+ vdisk.CommitReservedChunks();
+ vdisk.MarkCommitedChunksDirty();
testCtx.Send(new NPDisk::TEvShredPDisk(shredGeneration));
vdisk.RespondToPreShredCompact(shredGeneration, NKikimrProto::OK, "");
THolder<NPDisk::TEvShredVDisk> evReq = testCtx.Recv<NPDisk::TEvShredVDisk>();
@@ -1494,11 +1512,18 @@ Y_UNIT_TEST_SUITE(ShredPDisk) {
vdisk2.InitFull();
vdisk2.SendEvLogSync();
testCtx.RestartPDiskSync();
+
vdisk.InitFull();
vdisk.SendEvLogSync();
+ vdisk.ReserveChunk();
+ vdisk.CommitReservedChunks();
+ vdisk.MarkCommitedChunksDirty();
testCtx.Send(new NPDisk::TEvShredPDisk(shredGeneration));
vdisk.RespondToPreShredCompact(shredGeneration, NKikimrProto::OK, "");
vdisk2.InitFull();
+ vdisk2.ReserveChunk();
+ vdisk2.CommitReservedChunks();
+ vdisk2.MarkCommitedChunksDirty();
vdisk2.SendEvLogSync();
vdisk2.RespondToPreShredCompact(shredGeneration, NKikimrProto::OK, "");
vdisk.RespondToShred(shredGeneration, NKikimrProto::OK, "");
@@ -1513,6 +1538,9 @@ Y_UNIT_TEST_SUITE(ShredPDisk) {
TVDiskMock vdisk(&testCtx);
vdisk.InitFull();
vdisk.SendEvLogSync();
+ vdisk.ReserveChunk();
+ vdisk.CommitReservedChunks();
+ vdisk.MarkCommitedChunksDirty();
testCtx.Send(new NPDisk::TEvShredPDisk(shredGeneration));
THolder<NPDisk::TEvPreShredCompactVDisk> evReq = testCtx.Recv<NPDisk::TEvPreShredCompactVDisk>();
UNIT_ASSERT_VALUES_UNEQUAL(evReq.Get(), nullptr);
@@ -1530,6 +1558,9 @@ Y_UNIT_TEST_SUITE(ShredPDisk) {
TVDiskMock vdisk(&testCtx);
vdisk.InitFull();
vdisk.SendEvLogSync();
+ vdisk.ReserveChunk();
+ vdisk.CommitReservedChunks();
+ vdisk.MarkCommitedChunksDirty();
testCtx.Send(new NPDisk::TEvShredPDisk(shredGeneration));
vdisk.RespondToPreShredCompact(shredGeneration, NKikimrProto::OK, "");
THolder<NPDisk::TEvShredVDisk> evReq = testCtx.Recv<NPDisk::TEvShredVDisk>();
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h
index 687dee85f9..130643646b 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h
@@ -279,7 +279,6 @@ struct TVDiskMock {
UNIT_ASSERT_C(commited.empty(), "there are leaked chunks# " << FormatList(commited));
}
-
void ReserveChunk() {
const auto evReserveRes = TestCtx->TestResponse<NPDisk::TEvChunkReserveResult>(
new NPDisk::TEvChunkReserve(PDiskParams->Owner, PDiskParams->OwnerRound, 1),
@@ -298,6 +297,16 @@ struct TVDiskMock {
reservedChunks.clear();
}
+ void MarkCommitedChunksDirty() {
+ auto& commited = Chunks[EChunkState::COMMITTED];
+ TStackVec<TChunkIdx, 1> chunksToMark;
+ for (auto it = commited.begin(); it != commited.end(); ++it) {
+ chunksToMark.push_back(*it);
+ }
+ auto evMark = MakeHolder<NPDisk::TEvMarkDirty>(PDiskParams->Owner, PDiskParams->OwnerRound, chunksToMark);
+ TestCtx->Send(evMark.Release());
+ }
+
void DeleteCommitedChunks() {
auto& commited = Chunks[EChunkState::COMMITTED];
NPDisk::TCommitRecord rec;
@@ -366,6 +375,17 @@ struct TVDiskMock {
void RespondToShred(ui64 shredGeneration, NKikimrProto::EReplyStatus status, const TString& errorReason) {
THolder<NPDisk::TEvShredVDisk> evReq = TestCtx->Recv<NPDisk::TEvShredVDisk>();
if (evReq) {
+ auto& commited = Chunks[EChunkState::COMMITTED];
+ NPDisk::TCommitRecord rec;
+ rec.DeleteChunks = TVector<TChunkIdx>();
+ for (const TChunkIdx &idx : evReq->ChunksToShred) {
+ if (commited.contains(idx)) {
+ rec.DeleteChunks.push_back(idx);
+ Chunks[EChunkState::DELETED].insert(idx);
+ commited.erase(idx);
+ }
+ }
+ SendEvLogImpl(1, rec);
TestCtx->Send(new NPDisk::TEvShredVDiskResult(PDiskParams->Owner, PDiskParams->OwnerRound,
shredGeneration, status, errorReason));
}