diff options
author | cthulhu <cthulhu@ydb.tech> | 2022-08-31 10:13:43 +0300 |
---|---|---|
committer | cthulhu <cthulhu@ydb.tech> | 2022-08-31 10:13:43 +0300 |
commit | f834aa90cf558e4ede2fe69b1ffc7d7bb30a172a (patch) | |
tree | 336604e96c49de422496c69db5d34734268dbbde | |
parent | d04c6ee6620c95efeef71dbf0cae8b3fbd8906fa (diff) | |
download | ydb-f834aa90cf558e4ede2fe69b1ffc7d7bb30a172a.tar.gz |
Implement pdisk chunk decommit and forget,
Implement pdisk chunk decommit and forget,
16 files changed, 443 insertions, 36 deletions
diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h index 6e4275e9c6..d87a8f7297 100644 --- a/ydb/core/base/blobstorage.h +++ b/ydb/core/base/blobstorage.h @@ -683,6 +683,7 @@ struct TEvBlobStorage { EvHugeUnlockChunks, EvVDiskStatRequest, EvGetLogoBlobRequest, + EvChunkForget, EvYardInitResult = EvPut + 9 * 512, /// 268 636 672 EvLogResult, @@ -729,6 +730,7 @@ struct TEvBlobStorage { EvHugeStatResult, EvVDiskStatResponse, EvGetLogoBlobResponse, + EvChunkForgetResult, // internal proxy interface EvUnusedLocal1 = EvPut + 10 * 512, // Not used. /// 268 637 184 diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk.h index 32ced7d704..b7ae8cc7a9 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk.h @@ -26,10 +26,13 @@ struct TCommitRecord { TVector<TChunkIdx> CommitChunks; TVector<TChunkIdx> DeleteChunks; bool IsStartingPoint; + bool DeleteToDecommitted; // 1 == set chunks to Decommitted state that requires a ChunkForget event or a restart + // the value of DeleteToDecommitted is not stored as a part of the commit record. TCommitRecord() : FirstLsnToKeep(0) , IsStartingPoint(false) + , DeleteToDecommitted(false) {} void ValidateChunks(TVector<TChunkIdx> &chunks) { @@ -62,6 +65,7 @@ struct TCommitRecord { REQUEST_VALGRIND_CHECK_MEM_IS_DEFINED(&DeleteChunks[0], sizeof(DeleteChunks[0]) * DeleteChunks.size()); } REQUEST_VALGRIND_CHECK_MEM_IS_DEFINED(&IsStartingPoint, sizeof(IsStartingPoint)); + REQUEST_VALGRIND_CHECK_MEM_IS_DEFINED(&DeleteToDecommitted, sizeof(DeleteToDecommitted)); } TString ToString() const { @@ -69,6 +73,7 @@ struct TCommitRecord { str << "{CommitRecord"; str << " FirstLsnToKeep# " << FirstLsnToKeep; str << " IsStartingPoint# " << IsStartingPoint; + str << " DeleteToDecommitted# " << DeleteToDecommitted; str << " CommitChunks# "; PrintChunks(str, CommitChunks); str << " DeleteChunks# "; @@ -81,8 +86,7 @@ struct TCommitRecord { return sizeof(TCommitRecord) + (CommitChunks.size() + DeleteChunks.size()) * sizeof(ui32); } -protected: - void PrintChunks(IOutputStream &str, const TVector<TChunkIdx> vec) const { + static void PrintChunks(IOutputStream &str, const TVector<TChunkIdx> &vec) { str << "["; for (ui32 i = 0; i < vec.size(); i++) { if (i) @@ -639,6 +643,70 @@ struct TEvChunkReserveResult : public TEventLocal<TEvChunkReserveResult, TEvBlob }; //////////////////////////////////////////////////////////////////////////// +// CHUNK FORGET +//////////////////////////////////////////////////////////////////////////// +struct TEvChunkForget : public TEventLocal<TEvChunkForget, TEvBlobStorage::EvChunkForget> { + TOwner Owner; + TOwnerRound OwnerRound; + TVector<TChunkIdx> ForgetChunks; + + TEvChunkForget(TOwner owner, TOwnerRound ownerRound) + : Owner(owner) + , OwnerRound(ownerRound) + {} + + TEvChunkForget(TOwner owner, TOwnerRound ownerRound, TVector<TChunkIdx> forgetChunks) + : Owner(owner) + , OwnerRound(ownerRound) + , ForgetChunks(std::move(forgetChunks)) + {} + + TString ToString() const { + return ToString(*this); + } + + static TString ToString(const TEvChunkForget &record) { + TStringStream str; + str << "{EvChunkForget ownerId# " << (ui32)record.Owner; + str << " ownerRound# " << record.OwnerRound; + str << " ForgetChunks# "; + TCommitRecord::PrintChunks(str, record.ForgetChunks); + str << "}"; + return str.Str(); + } +}; + +struct TEvChunkForgetResult : public TEventLocal<TEvChunkForgetResult, TEvBlobStorage::EvChunkForgetResult> { + NKikimrProto::EReplyStatus Status; + TStatusFlags StatusFlags; + TString ErrorReason; + + TEvChunkForgetResult(NKikimrProto::EReplyStatus status, TStatusFlags statusFlags) + : Status(status) + , StatusFlags(statusFlags) + {} + + TEvChunkForgetResult(NKikimrProto::EReplyStatus status, TStatusFlags statusFlags, TString &errorReason) + : Status(status) + , StatusFlags(statusFlags) + , ErrorReason(errorReason) + {} + + TString ToString() const { + return ToString(*this); + } + + static TString ToString(const TEvChunkForgetResult &record) { + TStringStream str; + str << "{EvChunkForgetResult Status# " << NKikimrProto::EReplyStatus_Name(record.Status).data(); + str << " ErrorReason# \"" << record.ErrorReason << "\""; + str << " StatusFlags# " << StatusFlagsToString(record.StatusFlags); + str << "}"; + return str.Str(); + } +}; + +//////////////////////////////////////////////////////////////////////////// // CHUNK READ //////////////////////////////////////////////////////////////////////////// struct TEvChunkRead : public TEventLocal<TEvChunkRead, TEvBlobStorage::EvChunkRead> { diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp index ec4ff19681..f3951b855a 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp @@ -623,6 +623,12 @@ public: PDisk->Mon.ChunkReserve.CountResponse(); } + void ErrorHandle(NPDisk::TEvChunkForget::TPtr &ev) { + PDisk->Mon.ChunkForget.CountRequest(); + Send(ev->Sender, new NPDisk::TEvChunkForgetResult(NKikimrProto::CORRUPTED, 0, StateErrorReason)); + PDisk->Mon.ChunkForget.CountResponse(); + } + void ErrorHandle(NPDisk::TEvYardControl::TPtr &ev) { const NPDisk::TEvYardControl &evControl = *ev->Get(); Y_VERIFY(PDisk); @@ -727,6 +733,11 @@ public: PDisk->InputRequest(request); } + void Handle(NPDisk::TEvChunkForget::TPtr &ev) { + auto* request = PDisk->ReqCreator.CreateFromEv<TChunkForget>(*ev->Get(), ev->Sender); + PDisk->InputRequest(request); + } + void Handle(NPDisk::TEvChunksLock::TPtr &ev) { auto* request = PDisk->ReqCreator.CreateFromEv<TChunksLock>(*ev->Get(), ev->Sender); PDisk->InputRequest(request); @@ -1001,6 +1012,7 @@ public: hFunc(NPDisk::TEvHarakiri, ErrorHandle); hFunc(NPDisk::TEvSlay, InitHandle); hFunc(NPDisk::TEvChunkReserve, ErrorHandle); + hFunc(NPDisk::TEvChunkForget, ErrorHandle); hFunc(NPDisk::TEvYardControl, InitHandle); hFunc(NPDisk::TEvAskForCutLog, ErrorHandle); hFunc(NPDisk::TEvWhiteboardReportResult, Handle); @@ -1030,6 +1042,7 @@ public: hFunc(NPDisk::TEvHarakiri, Handle); hFunc(NPDisk::TEvSlay, Handle); hFunc(NPDisk::TEvChunkReserve, Handle); + hFunc(NPDisk::TEvChunkForget, Handle); hFunc(NPDisk::TEvChunksLock, Handle); hFunc(NPDisk::TEvChunksUnlock, Handle); hFunc(NPDisk::TEvYardControl, Handle); @@ -1059,6 +1072,7 @@ public: hFunc(NPDisk::TEvHarakiri, ErrorHandle); hFunc(NPDisk::TEvSlay, ErrorHandle); hFunc(NPDisk::TEvChunkReserve, ErrorHandle); + hFunc(NPDisk::TEvChunkForget, ErrorHandle); hFunc(NPDisk::TEvYardControl, ErrorHandle); hFunc(NPDisk::TEvAskForCutLog, ErrorHandle); hFunc(NPDisk::TEvWhiteboardReportResult, Handle); diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp index 937e44ec9b..54c671d312 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp @@ -85,6 +85,7 @@ TPDisk::TPDisk(const TIntrusivePtr<TPDiskConfig> cfg, const TIntrusivePtr<::NMon JointChunkWrites.reserve(16 << 10); JointLogWrites.reserve(16 << 10); JointCommits.reserve(16 << 10); + JointChunkForgets.reserve(16 << 10); } TString TPDisk::DynamicStateToString(bool isMultiline) { @@ -332,6 +333,7 @@ void TPDisk::Stop() { } JointLogWrites.clear(); JointCommits.clear(); + JointChunkForgets.clear(); for (const auto& req : FastOperationsQueue) { TRequestBase::AbortDelete(req.get(), ActorSystem); } @@ -1159,6 +1161,118 @@ void TPDisk::ChunkReserve(TChunkReserve &evChunkReserve) { guard.Release(); ActorSystem->Send(evChunkReserve.Sender, result.Release()); Mon.ChunkReserve.CountResponse(); + +} +bool TPDisk::ValidateForgetChunk(ui32 chunkIdx, TOwner owner, TStringStream& outErrorReason) { + TGuard<TMutex> guard(StateMutex); + if (chunkIdx >= ChunkState.size()) { + outErrorReason << "PDiskId# " << PDiskId + << " Can't forget chunkIdx# " << chunkIdx + << " > total# " << ChunkState.size() + << " ownerId# " << owner + << " Marker# BPD89"; + LOG_ERROR_S(*ActorSystem, NKikimrServices::BS_PDISK, outErrorReason.Str()); + return false; + } + if (ChunkState[chunkIdx].OwnerId != owner) { + outErrorReason << "PDiskId# " << PDiskId + << " Can't forget chunkIdx# " << chunkIdx + << ", ownerId# " << owner + << " != real ownerId# " << ChunkState[chunkIdx].OwnerId + << " Marker# BPD90"; + LOG_ERROR_S(*ActorSystem, NKikimrServices::BS_PDISK, outErrorReason.Str()); + return false; + } + if (ChunkState[chunkIdx].CommitState != TChunkState::DATA_RESERVED_DECOMMIT_IN_PROGRESS + && ChunkState[chunkIdx].CommitState != TChunkState::DATA_COMMITTED_DECOMMIT_IN_PROGRESS + && ChunkState[chunkIdx].CommitState != TChunkState::DATA_DECOMMITTED) { + outErrorReason << "PDiskId# " << PDiskId + << " Can't forget chunkIdx# " << chunkIdx + << " in CommitState# " << ChunkState[chunkIdx].CommitState + << " ownerId# " << owner << " Marker# BPD91"; + LOG_ERROR_S(*ActorSystem, NKikimrServices::BS_PDISK, outErrorReason.Str()); + return false; + } + return true; +} + +void TPDisk::ChunkForget(TChunkForget &evChunkForget) { + TStringStream errorReason; + TGuard<TMutex> guard(StateMutex); + + THolder<NPDisk::TEvChunkForgetResult> result; + + bool isOk = true; + + for (ui32 chunkIdx : evChunkForget.ForgetChunks) { + if (!ValidateForgetChunk(chunkIdx, evChunkForget.Owner, errorReason)) { + result = MakeHolder<NPDisk::TEvChunkForgetResult>(NKikimrProto::ERROR, + NotEnoughDiskSpaceStatusFlags(evChunkForget.Owner, evChunkForget.OwnerGroupType), + errorReason.Str()); + isOk = false; + break; + } + } + if (isOk) { + for (ui32 chunkIdx : evChunkForget.ForgetChunks) { + TChunkState& state = ChunkState[chunkIdx]; + if (state.HasAnyOperationsInProgress()) { + switch (state.CommitState) { + case TChunkState::DATA_RESERVED_DECOMMIT_IN_PROGRESS: + Mon.UncommitedDataChunks->Dec(); + state.CommitState = TChunkState::DATA_RESERVED_DELETE_ON_QUARANTINE; + QuarantineChunks.push_back(chunkIdx); + break; + case TChunkState::DATA_COMMITTED_DECOMMIT_IN_PROGRESS: + Mon.UncommitedDataChunks->Dec(); + state.CommitState = TChunkState::DATA_COMMITTED_DELETE_ON_QUARANTINE; + QuarantineChunks.push_back(chunkIdx); + break; + case TChunkState::DATA_DECOMMITTED: + Mon.UncommitedDataChunks->Dec(); + state.CommitState = TChunkState::DATA_ON_QUARANTINE; + QuarantineChunks.push_back(chunkIdx); + break; + default: + Y_FAIL_S("PDiskId# " << PDiskId + << " ChunkForget with in flight, ownerId# " << (ui32)evChunkForget.Owner + << " chunkIdx# " << chunkIdx << " unexpected commitState# " << state.CommitState); + } + } else { + switch (state.CommitState) { + case TChunkState::DATA_RESERVED_DECOMMIT_IN_PROGRESS: + Mon.UncommitedDataChunks->Dec(); + state.CommitState = TChunkState::DATA_RESERVED_DELETE_IN_PROGRESS; + break; + case TChunkState::DATA_COMMITTED_DECOMMIT_IN_PROGRESS: + Mon.UncommitedDataChunks->Dec(); + state.CommitState = TChunkState::DATA_COMMITTED_DELETE_IN_PROGRESS; + break; + case TChunkState::DATA_DECOMMITTED: + Y_VERIFY_S(state.CommitsInProgress == 0, + "PDiskId# " << PDiskId << " chunkIdx# " << chunkIdx << " state# " << state.ToString()); + LOG_INFO(*ActorSystem, NKikimrServices::BS_PDISK, "PDiskId# %" PRIu32 " chunkIdx# %" PRIu32 + " forgotten, ownerId# %" PRIu32 " -> %" PRIu32, + (ui32)PDiskId, (ui32)chunkIdx, (ui32)state.OwnerId, (ui32)OwnerUnallocated); + Y_VERIFY(state.OwnerId == evChunkForget.Owner); + Mon.UncommitedDataChunks->Dec(); + state.CommitState = TChunkState::FREE; + Keeper.PushFreeOwnerChunk(evChunkForget.Owner, chunkIdx); + break; + default: + Y_FAIL_S("PDiskId# " << PDiskId + << " ChunkForget, ownerId# " << (ui32)evChunkForget.Owner + << " chunkIdx# " << chunkIdx << " unexpected commitState# " << state.CommitState); + } + } + } + result = MakeHolder<NPDisk::TEvChunkForgetResult>(NKikimrProto::OK, 0); + result->StatusFlags = GetStatusFlags(evChunkForget.Owner, evChunkForget.OwnerGroupType); + } + + guard.Release(); + ActorSystem->Send(evChunkForget.Sender, result.Release()); + Mon.ChunkForget.CountResponse(); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -1692,7 +1806,10 @@ void TPDisk::KillOwner(TOwner owner, TOwnerRound killOwnerRound, TCompletionEven for (ui32 i = 0; i < ChunkState.size(); ++i) { TChunkState &state = ChunkState[i]; if (state.OwnerId == owner) { - if (state.CommitState == TChunkState::DATA_RESERVED) { + if (state.CommitState == TChunkState::DATA_RESERVED + || state.CommitState == TChunkState::DATA_DECOMMITTED + || state.CommitState == TChunkState::DATA_RESERVED_DECOMMIT_IN_PROGRESS + || state.CommitState == TChunkState::DATA_COMMITTED_DECOMMIT_IN_PROGRESS) { Mon.UncommitedDataChunks->Dec(); } else if (state.CommitState == TChunkState::DATA_COMMITTED) { Mon.CommitedDataChunks->Dec(); @@ -1712,14 +1829,23 @@ void TPDisk::KillOwner(TOwner owner, TOwnerRound killOwnerRound, TCompletionEven || state.CommitState == TChunkState::DATA_RESERVED_DELETE_IN_PROGRESS || state.CommitState == TChunkState::DATA_COMMITTED_DELETE_IN_PROGRESS || state.CommitState == TChunkState::DATA_RESERVED_DELETE_ON_QUARANTINE - || state.CommitState == TChunkState::DATA_COMMITTED_DELETE_ON_QUARANTINE) { + || state.CommitState == TChunkState::DATA_COMMITTED_DELETE_ON_QUARANTINE + || state.CommitState == TChunkState::DATA_DECOMMITTED + || state.CommitState == TChunkState::DATA_RESERVED_DECOMMIT_IN_PROGRESS + || state.CommitState == TChunkState::DATA_COMMITTED_DECOMMIT_IN_PROGRESS + ) { if (state.CommitState == TChunkState::DATA_RESERVED - || state.CommitState == TChunkState::DATA_COMMITTED) { + || state.CommitState == TChunkState::DATA_COMMITTED + || state.CommitState == TChunkState::DATA_DECOMMITTED) { state.CommitState = TChunkState::DATA_ON_QUARANTINE; } else if (state.CommitState == TChunkState::DATA_COMMITTED_DELETE_IN_PROGRESS) { state.CommitState = TChunkState::DATA_RESERVED_DELETE_IN_PROGRESS; } else if (state.CommitState == TChunkState::DATA_COMMITTED_DELETE_ON_QUARANTINE) { state.CommitState = TChunkState::DATA_RESERVED_DELETE_ON_QUARANTINE; + } else if (state.CommitState == TChunkState::DATA_RESERVED_DECOMMIT_IN_PROGRESS + || state.CommitState == TChunkState::DATA_COMMITTED_DECOMMIT_IN_PROGRESS) { + state.CommitState = TChunkState::DATA_RESERVED_DELETE_ON_QUARANTINE; + QuarantineChunks.push_back(i); } if (state.CommitState != TChunkState::DATA_RESERVED_DELETE_ON_QUARANTINE @@ -2102,13 +2228,24 @@ void TPDisk::ProcessFastOperationsQueue() { MarkChunksAsReleased(static_cast<TReleaseChunks&>(*req)); break; default: - Y_FAIL(); + Y_FAIL_S("Unexpected request type# " << (ui64)req->GetType()); break; } } FastOperationsQueue.clear(); } +void TPDisk::ProcessChunkForgetQueue() { + if (JointChunkForgets.empty()) + return; + + for (auto& req : JointChunkForgets) { + ChunkForget(*req); + } + + JointChunkForgets.clear(); +} + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Drive info and write cache //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -2561,6 +2698,14 @@ bool TPDisk::PreprocessRequest(TRequestBase *request) { ownerData.WriteThroughput.Increment(log->Data.size(), ActorSystem->Timestamp()); break; } + case ERequestType::RequestChunkForget: + { + auto *forget = static_cast<TChunkForget*>(request); + TOwnerData &ownerData = OwnerData[forget->Owner]; + forget->SetOwnerGroupType(ownerData.IsStaticGroupOwner()); + forget->JobKind = NSchLab::JobKindWrite; + break; + } case ERequestType::RequestYardInit: break; case ERequestType::RequestCheckSpace: @@ -2861,6 +3006,12 @@ void TPDisk::RouteRequest(TRequestBase *request) { } break; } + case ERequestType::RequestChunkForget: + { + TChunkForget *forget = static_cast<TChunkForget*>(request); + JointChunkForgets.push_back(std::unique_ptr<TChunkForget>(forget)); + break; + } default: Y_FAIL_S("RouteRequest, unexpected request type# " << ui64(request->GetType())); break; @@ -3080,7 +3231,7 @@ void TPDisk::Update() { // Processing bool isNonLogWorkloadPresent = !JointChunkWrites.empty() || !FastOperationsQueue.empty() || - !JointChunkReads.empty() || !JointLogReads.empty() || !JointChunkTrims.empty(); + !JointChunkReads.empty() || !JointLogReads.empty() || !JointChunkTrims.empty() || !JointChunkForgets.empty(); bool isLogWorkloadPresent = !JointLogWrites.empty(); bool isNothingToDo = true; if (isLogWorkloadPresent || isNonLogWorkloadPresent) { @@ -3146,6 +3297,7 @@ void TPDisk::Update() { if (tact != ETact::TactLc) { ProcessLogWriteQueueAndCommits(); } + ProcessChunkForgetQueue(); LastTact = tact; diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h index 37a86da126..e0704efdd9 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h @@ -73,7 +73,7 @@ public: TVector<TLogWrite*> JointLogWrites; TVector<TLogWrite*> JointCommits; TVector<TChunkTrim*> JointChunkTrims; - + TVector<std::unique_ptr<TChunkForget>> JointChunkForgets; TVector<std::unique_ptr<TRequestBase>> FastOperationsQueue; TDeque<TRequestBase*> PausedQueue; std::set<std::unique_ptr<TYardInit>> PendingYardInits; @@ -273,6 +273,8 @@ public: // Chunk reservation TVector<TChunkIdx> AllocateChunkForOwner(const TRequestBase *req, const ui32 count, TString &errorReason); void ChunkReserve(TChunkReserve &evChunkReserve); + bool ValidateForgetChunk(ui32 chunkIdx, TOwner owner, TStringStream& outErrorReason); + void ChunkForget(TChunkForget &evChunkForget); //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Whiteboard and HTTP reports creation void WhiteboardReport(TWhiteboardReport &whiteboardReport); // Called by actor @@ -313,6 +315,7 @@ public: //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Update process void ProcessLogWriteQueueAndCommits(); + void ProcessChunkForgetQueue(); void ProcessChunkWriteQueue(); void ProcessChunkReadQueue(); void ProcessLogReadQueue(); diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp index 5db5cae52b..68aee602f8 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp @@ -866,44 +866,65 @@ NKikimrProto::EReplyStatus TPDisk::BeforeLoggingCommitRecord(const TLogWrite &lo } ++ChunkState[chunkIdx].CommitsInProgress; } - for (ui32 chunkIdx : logWrite.CommitRecord.DeleteChunks) { - TChunkState& state = ChunkState[chunkIdx]; - if (state.HasAnyOperationsInProgress()) { + if (logWrite.CommitRecord.DeleteToDecommitted) { + for (ui32 chunkIdx : logWrite.CommitRecord.DeleteChunks) { + TChunkState& state = ChunkState[chunkIdx]; switch (state.CommitState) { case TChunkState::DATA_RESERVED: - Mon.UncommitedDataChunks->Dec(); - state.CommitState = TChunkState::DATA_RESERVED_DELETE_ON_QUARANTINE; + state.CommitState = TChunkState::DATA_RESERVED_DECOMMIT_IN_PROGRESS; break; case TChunkState::DATA_COMMITTED: Mon.CommitedDataChunks->Dec(); - LOG_DEBUG(*ActorSystem, NKikimrServices::BS_PDISK, "PDiskId# %" PRIu32 - " Line# %" PRIu32 " --CommitedDataChunks# %" PRIi64 " chunkIdx# %" PRIu32 " Marker# BPD10", - (ui32)PDiskId, (ui32)__LINE__, (i64)Mon.CommitedDataChunks->Val(), (ui32)chunkIdx); - state.CommitState = TChunkState::DATA_COMMITTED_DELETE_ON_QUARANTINE; + Mon.UncommitedDataChunks->Inc(); + state.CommitState = TChunkState::DATA_COMMITTED_DECOMMIT_IN_PROGRESS; break; default: - Y_FAIL_S("PDiskID# " << PDiskId << " can't delete chunkIdx# " << chunkIdx + Y_FAIL_S("PDiskID# " << PDiskId << " can't delete to decomitted chunkIdx# " << chunkIdx << " request ownerId# " << logWrite.Owner - << " with operations in progress as it is in unexpected CommitState# " << state.ToString()); + << " as it is in unexpected CommitState# " << state.ToString()); break; } - QuarantineChunks.push_back(chunkIdx); - LOG_INFO_S(*ActorSystem, NKikimrServices::BS_PDISK, "PDiskId# " << PDiskId - << " push chunk on QuarantineChunks because it has operations in flight" - << " chunkIdx# " << chunkIdx - << " ownerId# " << logWrite.Owner - << " state# " << state.ToString() - << " Marker# BPD78"); - } else if (state.CommitState == TChunkState::DATA_RESERVED) { - Mon.UncommitedDataChunks->Dec(); - state.CommitState = TChunkState::DATA_RESERVED_DELETE_IN_PROGRESS; - } else if (state.CommitState == TChunkState::DATA_COMMITTED) { - Mon.CommitedDataChunks->Dec(); - state.CommitState = TChunkState::DATA_COMMITTED_DELETE_IN_PROGRESS; - } else { - Y_FAIL_S("PDiskID# " << PDiskId << " can't delete chunkIdx# " << chunkIdx - << " request ownerId# " << logWrite.Owner - << " as it is in unexpected CommitState# " << state.ToString()); + } + } else { + for (ui32 chunkIdx : logWrite.CommitRecord.DeleteChunks) { + TChunkState& state = ChunkState[chunkIdx]; + if (state.HasAnyOperationsInProgress()) { + switch (state.CommitState) { + case TChunkState::DATA_RESERVED: + Mon.UncommitedDataChunks->Dec(); + state.CommitState = TChunkState::DATA_RESERVED_DELETE_ON_QUARANTINE; + break; + case TChunkState::DATA_COMMITTED: + Mon.CommitedDataChunks->Dec(); + LOG_DEBUG(*ActorSystem, NKikimrServices::BS_PDISK, "PDiskId# %" PRIu32 + " Line# %" PRIu32 " --CommitedDataChunks# %" PRIi64 " chunkIdx# %" PRIu32 " Marker# BPD10", + (ui32)PDiskId, (ui32)__LINE__, (i64)Mon.CommitedDataChunks->Val(), (ui32)chunkIdx); + state.CommitState = TChunkState::DATA_COMMITTED_DELETE_ON_QUARANTINE; + break; + default: + Y_FAIL_S("PDiskID# " << PDiskId << " can't delete chunkIdx# " << chunkIdx + << " request ownerId# " << logWrite.Owner + << " with operations in progress as it is in unexpected CommitState# " << state.ToString()); + break; + } + QuarantineChunks.push_back(chunkIdx); + LOG_INFO_S(*ActorSystem, NKikimrServices::BS_PDISK, "PDiskId# " << PDiskId + << " push chunk on QuarantineChunks because it has operations in flight" + << " chunkIdx# " << chunkIdx + << " ownerId# " << logWrite.Owner + << " state# " << state.ToString() + << " Marker# BPD78"); + } else if (state.CommitState == TChunkState::DATA_RESERVED) { + Mon.UncommitedDataChunks->Dec(); + state.CommitState = TChunkState::DATA_RESERVED_DELETE_IN_PROGRESS; + } else if (state.CommitState == TChunkState::DATA_COMMITTED) { + Mon.CommitedDataChunks->Dec(); + state.CommitState = TChunkState::DATA_COMMITTED_DELETE_IN_PROGRESS; + } else { + Y_FAIL_S("PDiskID# " << PDiskId << " can't delete chunkIdx# " << chunkIdx + << " request ownerId# " << logWrite.Owner + << " as it is in unexpected CommitState# " << state.ToString()); + } } } @@ -1034,6 +1055,12 @@ void TPDisk::DeleteChunk(ui32 chunkIdx, TOwner owner) { Y_VERIFY(state.OwnerId == owner); // TODO DELETE state.CommitState = TChunkState::DATA_ON_QUARANTINE; break; + case TChunkState::DATA_COMMITTED_DECOMMIT_IN_PROGRESS: + [[fallthrough]]; + case TChunkState::DATA_RESERVED_DECOMMIT_IN_PROGRESS: + state.CommitState = TChunkState::DATA_DECOMMITTED; + break; + default: Y_FAIL_S("PDiskID# " << PDiskId << " can't delete chunkIdx# " << chunkIdx << " requesting ownerId# " << owner diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.cpp index 7417cd837f..a19631f8ed 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.cpp @@ -188,6 +188,7 @@ TPDiskMon::TPDiskMon(const TIntrusivePtr<::NMonitoring::TDynamicCounters>& count CheckSpace.Setup(PDiskGroup, "YardCheckSpace"); YardConfigureScheduler.Setup(PDiskGroup, "YardConfigureScheduler"); ChunkReserve.Setup(PDiskGroup, "YardChunkReserve"); + ChunkForget.Setup(PDiskGroup, "YardChunkForget"); Harakiri.Setup(PDiskGroup, "YardHarakiri"); YardSlay.Setup(PDiskGroup, "YardSlay"); YardControl.Setup(PDiskGroup, "YardControl"); diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.h index 82efac7bad..84fd08d685 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.h @@ -710,6 +710,7 @@ struct TPDiskMon { TReqCounters CheckSpace; TReqCounters YardConfigureScheduler; TReqCounters ChunkReserve; + TReqCounters ChunkForget; TReqCounters Harakiri; TReqCounters YardSlay; TReqCounters YardControl; diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_req_creator.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_req_creator.h index 737ad571c7..88adf2d6e1 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_req_creator.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_req_creator.h @@ -52,6 +52,10 @@ private: request->GateId = GateLog; request->IsSensitive = true; return; + case ERequestType::RequestChunkForget: + request->GateId = GateLog; + request->IsSensitive = true; + return; case ERequestType::RequestChunkRead: request->IsFast = (request->PriorityClass == NPriRead::HullOnlineOther); request->IsSensitive = (request->PriorityClass == NPriRead::HullOnlineRt); diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_request_id.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_request_id.h index c0824b6002..21d680884f 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_request_id.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_request_id.h @@ -82,6 +82,7 @@ struct TReqId { TryTrimChunk = 63, ReleaseChunks = 64, StopDevice = 65, + ChunkForget = 66, }; // 56 bit idx, 8 bit source @@ -139,6 +140,7 @@ enum class ERequestType { RequestTryTrimChunk, RequestReleaseChunks, RequestStopDevice, + RequestChunkForget, }; inline IOutputStream& operator <<(IOutputStream& out, const TReqId& reqId) { diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h index 61c0996e08..9d7797adbf 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h @@ -667,6 +667,27 @@ public: }; // +// TChunkForget +// +class TChunkForget : public TRequestBase { +public: + TVector<TChunkIdx> ForgetChunks; + + TChunkForget(const NPDisk::TEvChunkForget &ev, const TActorId &sender, TAtomicBase reqIdx) + : TRequestBase(sender, TReqId(TReqId::ChunkForget, reqIdx), ev.Owner, ev.OwnerRound, NPriInternal::LogWrite) + , ForgetChunks(std::move(ev.ForgetChunks)) + {} + + ERequestType GetType() const override { + return ERequestType::RequestChunkForget; + } + + void EstimateCost(const TDriveModel &) override { + Cost = 1; + } +}; + +// // TWhiteboardReport // class TWhiteboardReport : public TRequestBase { diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h index bafa957dc0..d2982f5fc5 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h @@ -153,6 +153,9 @@ struct TChunkState { LOG_RESERVED, LOG_COMMITTED, DATA_RESERVED_DELETE_ON_QUARANTINE, + DATA_DECOMMITTED, + DATA_RESERVED_DECOMMIT_IN_PROGRESS, + DATA_COMMITTED_DECOMMIT_IN_PROGRESS, }; ui64 Nonce; diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_actions.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_actions.cpp index 305cd7a8ea..3340d314fd 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_actions.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_actions.cpp @@ -815,6 +815,85 @@ void TTestChunkDelete2::TestFSM(const TActorContext &ctx) { TestStep += 10; } +void TTestChunkForget1::TestFSM(const TActorContext &ctx) { + constexpr ui32 toReserve = 535; + VERBOSE_COUT("Test step " << TestStep); + switch (TestStep) { + case 0: + ASSERT_YTHROW(LastResponse.Status == NKikimrProto::OK, StatusToString(LastResponse.Status)); + VERBOSE_COUT(" Sending TEvInit"); + ctx.Send(Yard, new NPDisk::TEvYardInit(2, VDiskID, *PDiskGuid)); + break; + case 10: + { + TEST_RESPONSE(EvYardInitResult, OK); + Owner = LastResponse.Owner; + OwnerRound = LastResponse.OwnerRound; + VERBOSE_COUT(" Sending TEvChunkReserve"); + ctx.Send(Yard, new NPDisk::TEvChunkReserve(Owner, OwnerRound, toReserve)); + break; + case 20: + TEST_RESPONSE(EvChunkReserveResult, OK); + ASSERT_YTHROW(LastResponse.ChunkIds.size() == toReserve, + "Unexpected ChunkIds.size() == " << LastResponse.ChunkIds.size()); + ReservedChunks = LastResponse.ChunkIds; + VERBOSE_COUT(" Sending TEvLog to commit Chunks"); + for (ui32 i = 0; i < ReservedChunks.size(); ++i) { + VERBOSE_COUT(" id = " << ReservedChunks[i]); + } + CommitData = TString::Uninitialized(sizeof(ui32) * ReservedChunks.size()); + memcpy((void*)CommitData.data(), &(ReservedChunks[0]), sizeof(ui32) * ReservedChunks.size()); + NPDisk::TCommitRecord commitRecord; + commitRecord.CommitChunks = ReservedChunks; + commitRecord.IsStartingPoint = true; + ctx.Send(Yard, new NPDisk::TEvLog(Owner, OwnerRound, 0, commitRecord, CommitData, TLsnSeg(1, 1), + (void*)43)); + break; + } + case 30: + TEST_RESPONSE(EvLogResult, OK); + VERBOSE_COUT(" Sending TEvReserve to make sure no more chunks can be reserved"); + ctx.Send(Yard, new NPDisk::TEvChunkReserve(Owner, OwnerRound, toReserve)); + break; + case 40: + TEST_RESPONSE(EvChunkReserveResult, OUT_OF_SPACE); + VERBOSE_COUT(" Sending TEvLog to delete Chunks"); + CommitData = TString(); + { + NPDisk::TCommitRecord commitRecord; + commitRecord.DeleteChunks = ReservedChunks; + commitRecord.IsStartingPoint = true; + commitRecord.DeleteToDecommitted = true; + ctx.Send(Yard, new NPDisk::TEvLog(Owner, OwnerRound, 0, commitRecord, CommitData, TLsnSeg(2, 2), + (void*)43)); + break; + } + case 50: + TEST_RESPONSE(EvLogResult, OK); + VERBOSE_COUT(" Sending TEvReserve to make sure no more chunks can be reserved"); + ctx.Send(Yard, new NPDisk::TEvChunkReserve(Owner, OwnerRound, toReserve)); + break; + case 60: + TEST_RESPONSE(EvChunkReserveResult, OUT_OF_SPACE); + ctx.Send(Yard, new NPDisk::TEvChunkForget(Owner, OwnerRound, ReservedChunks)); + break; + case 70: + TEST_RESPONSE(EvChunkForgetResult, OK); + VERBOSE_COUT(" Sending TEvReserve to make sure some chunks can be reserved"); + ctx.Send(Yard, new NPDisk::TEvChunkReserve(Owner, OwnerRound, toReserve)); + break; + case 80: + TEST_RESPONSE(EvChunkReserveResult, OK); + VERBOSE_COUT("Done"); + SignalDoneEvent(); + break; + default: + ythrow TWithBackTrace<yexception>() << "Unexpected TestStep " << TestStep << Endl; + break; + } + TestStep += 10; +} + void TTestInitStartingPoints::TestFSM(const TActorContext &ctx) { TString data("testdata"); VERBOSE_COUT("Test step " << TestStep); diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_actions.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_actions.h index 7eeb5db0bb..1c230ebc7f 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_actions.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_actions.h @@ -1030,6 +1030,20 @@ public: {} }; +class TTestChunkForget1 : public TBaseTest { + NPDisk::TOwner Owner; + NPDisk::TOwnerRound OwnerRound; + TString ChunkWriteData; + TString CommitData; + TVector<ui32> ReservedChunks; + + void TestFSM(const TActorContext &ctx); +public: + TTestChunkForget1(const TIntrusivePtr<TTestConfig> &cfg) + : TBaseTest(cfg) + {} +}; + template<ui32 WishDataSize> class TTestChunk3WriteRead : public TBaseTest { NPDisk::TOwner Owner; diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_base_test.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_base_test.h index e6c75e6000..d7d0cd3522 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_base_test.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_base_test.h @@ -268,6 +268,16 @@ protected: ActTestFSM(ctx); } + void Handle(NPDisk::TEvChunkForgetResult::TPtr &ev, const TActorContext &ctx) { + NPDisk::TEvChunkForgetResult &result = *(ev->Get()); + LastResponse.Status = result.Status; + LastResponse.EventType = (TEvBlobStorage::EEv)result.Type(); + LastResponse.StatusFlags = result.StatusFlags; + VERBOSE_COUT("Got " << result.ToString()); + ActTestFSM(ctx); + } + + void Handle(NPDisk::TEvChunksLockResult::TPtr &ev, const TActorContext &ctx) { NPDisk::TEvChunksLockResult &result = *(ev->Get()); LastResponse.Status = result.Status; @@ -368,6 +378,7 @@ public: HFunc(NPDisk::TEvSlayResult, Handle); HFunc(NPDisk::TEvYardControlResult, Handle); HFunc(NPDisk::TEvCutLog, Handle); + HFunc(NPDisk::TEvChunkForgetResult, Handle); HFunc(NMon::TEvHttpInfoRes, Handle); HFunc(NNodeWhiteboard::TEvWhiteboard::TEvPDiskStateUpdate, Handle); HFunc(NNodeWhiteboard::TEvWhiteboard::TEvVDiskStateUpdate, Handle); diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_yard.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_yard.cpp index b8ea34e9d9..8aa8d23ef3 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_yard.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_yard.cpp @@ -396,6 +396,11 @@ YARD_UNIT_TEST(TestChunkDelete) { Run<TTestChunkDelete2>(&tc, 1, MIN_CHUNK_SIZE); } +YARD_UNIT_TEST(TestChunkForget) { + TTestContext tc(false, true); + Run<TTestChunkForget1>(&tc, 1, MIN_CHUNK_SIZE); +} + YARD_UNIT_TEST(Test3HugeAsyncLog) { TTestContext tc(false, true); constexpr ui32 chunkSize = MIN_CHUNK_SIZE; |