aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorcthulhu <cthulhu@ydb.tech>2022-08-31 10:13:43 +0300
committercthulhu <cthulhu@ydb.tech>2022-08-31 10:13:43 +0300
commitf834aa90cf558e4ede2fe69b1ffc7d7bb30a172a (patch)
tree336604e96c49de422496c69db5d34734268dbbde
parentd04c6ee6620c95efeef71dbf0cae8b3fbd8906fa (diff)
downloadydb-f834aa90cf558e4ede2fe69b1ffc7d7bb30a172a.tar.gz
Implement pdisk chunk decommit and forget,
Implement pdisk chunk decommit and forget,
-rw-r--r--ydb/core/base/blobstorage.h2
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk.h72
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp14
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp162
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h5
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp83
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.cpp1
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.h1
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_req_creator.h4
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_request_id.h2
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h21
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h3
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_actions.cpp79
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_actions.h14
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_base_test.h11
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_yard.cpp5
16 files changed, 443 insertions, 36 deletions
diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h
index 6e4275e9c6..d87a8f7297 100644
--- a/ydb/core/base/blobstorage.h
+++ b/ydb/core/base/blobstorage.h
@@ -683,6 +683,7 @@ struct TEvBlobStorage {
EvHugeUnlockChunks,
EvVDiskStatRequest,
EvGetLogoBlobRequest,
+ EvChunkForget,
EvYardInitResult = EvPut + 9 * 512, /// 268 636 672
EvLogResult,
@@ -729,6 +730,7 @@ struct TEvBlobStorage {
EvHugeStatResult,
EvVDiskStatResponse,
EvGetLogoBlobResponse,
+ EvChunkForgetResult,
// internal proxy interface
EvUnusedLocal1 = EvPut + 10 * 512, // Not used. /// 268 637 184
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk.h
index 32ced7d704..b7ae8cc7a9 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk.h
@@ -26,10 +26,13 @@ struct TCommitRecord {
TVector<TChunkIdx> CommitChunks;
TVector<TChunkIdx> DeleteChunks;
bool IsStartingPoint;
+ bool DeleteToDecommitted; // 1 == set chunks to Decommitted state that requires a ChunkForget event or a restart
+ // the value of DeleteToDecommitted is not stored as a part of the commit record.
TCommitRecord()
: FirstLsnToKeep(0)
, IsStartingPoint(false)
+ , DeleteToDecommitted(false)
{}
void ValidateChunks(TVector<TChunkIdx> &chunks) {
@@ -62,6 +65,7 @@ struct TCommitRecord {
REQUEST_VALGRIND_CHECK_MEM_IS_DEFINED(&DeleteChunks[0], sizeof(DeleteChunks[0]) * DeleteChunks.size());
}
REQUEST_VALGRIND_CHECK_MEM_IS_DEFINED(&IsStartingPoint, sizeof(IsStartingPoint));
+ REQUEST_VALGRIND_CHECK_MEM_IS_DEFINED(&DeleteToDecommitted, sizeof(DeleteToDecommitted));
}
TString ToString() const {
@@ -69,6 +73,7 @@ struct TCommitRecord {
str << "{CommitRecord";
str << " FirstLsnToKeep# " << FirstLsnToKeep;
str << " IsStartingPoint# " << IsStartingPoint;
+ str << " DeleteToDecommitted# " << DeleteToDecommitted;
str << " CommitChunks# ";
PrintChunks(str, CommitChunks);
str << " DeleteChunks# ";
@@ -81,8 +86,7 @@ struct TCommitRecord {
return sizeof(TCommitRecord) + (CommitChunks.size() + DeleteChunks.size()) * sizeof(ui32);
}
-protected:
- void PrintChunks(IOutputStream &str, const TVector<TChunkIdx> vec) const {
+ static void PrintChunks(IOutputStream &str, const TVector<TChunkIdx> &vec) {
str << "[";
for (ui32 i = 0; i < vec.size(); i++) {
if (i)
@@ -639,6 +643,70 @@ struct TEvChunkReserveResult : public TEventLocal<TEvChunkReserveResult, TEvBlob
};
////////////////////////////////////////////////////////////////////////////
+// CHUNK FORGET
+////////////////////////////////////////////////////////////////////////////
+struct TEvChunkForget : public TEventLocal<TEvChunkForget, TEvBlobStorage::EvChunkForget> {
+ TOwner Owner;
+ TOwnerRound OwnerRound;
+ TVector<TChunkIdx> ForgetChunks;
+
+ TEvChunkForget(TOwner owner, TOwnerRound ownerRound)
+ : Owner(owner)
+ , OwnerRound(ownerRound)
+ {}
+
+ TEvChunkForget(TOwner owner, TOwnerRound ownerRound, TVector<TChunkIdx> forgetChunks)
+ : Owner(owner)
+ , OwnerRound(ownerRound)
+ , ForgetChunks(std::move(forgetChunks))
+ {}
+
+ TString ToString() const {
+ return ToString(*this);
+ }
+
+ static TString ToString(const TEvChunkForget &record) {
+ TStringStream str;
+ str << "{EvChunkForget ownerId# " << (ui32)record.Owner;
+ str << " ownerRound# " << record.OwnerRound;
+ str << " ForgetChunks# ";
+ TCommitRecord::PrintChunks(str, record.ForgetChunks);
+ str << "}";
+ return str.Str();
+ }
+};
+
+struct TEvChunkForgetResult : public TEventLocal<TEvChunkForgetResult, TEvBlobStorage::EvChunkForgetResult> {
+ NKikimrProto::EReplyStatus Status;
+ TStatusFlags StatusFlags;
+ TString ErrorReason;
+
+ TEvChunkForgetResult(NKikimrProto::EReplyStatus status, TStatusFlags statusFlags)
+ : Status(status)
+ , StatusFlags(statusFlags)
+ {}
+
+ TEvChunkForgetResult(NKikimrProto::EReplyStatus status, TStatusFlags statusFlags, TString &errorReason)
+ : Status(status)
+ , StatusFlags(statusFlags)
+ , ErrorReason(errorReason)
+ {}
+
+ TString ToString() const {
+ return ToString(*this);
+ }
+
+ static TString ToString(const TEvChunkForgetResult &record) {
+ TStringStream str;
+ str << "{EvChunkForgetResult Status# " << NKikimrProto::EReplyStatus_Name(record.Status).data();
+ str << " ErrorReason# \"" << record.ErrorReason << "\"";
+ str << " StatusFlags# " << StatusFlagsToString(record.StatusFlags);
+ str << "}";
+ return str.Str();
+ }
+};
+
+////////////////////////////////////////////////////////////////////////////
// CHUNK READ
////////////////////////////////////////////////////////////////////////////
struct TEvChunkRead : public TEventLocal<TEvChunkRead, TEvBlobStorage::EvChunkRead> {
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp
index ec4ff19681..f3951b855a 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp
@@ -623,6 +623,12 @@ public:
PDisk->Mon.ChunkReserve.CountResponse();
}
+ void ErrorHandle(NPDisk::TEvChunkForget::TPtr &ev) {
+ PDisk->Mon.ChunkForget.CountRequest();
+ Send(ev->Sender, new NPDisk::TEvChunkForgetResult(NKikimrProto::CORRUPTED, 0, StateErrorReason));
+ PDisk->Mon.ChunkForget.CountResponse();
+ }
+
void ErrorHandle(NPDisk::TEvYardControl::TPtr &ev) {
const NPDisk::TEvYardControl &evControl = *ev->Get();
Y_VERIFY(PDisk);
@@ -727,6 +733,11 @@ public:
PDisk->InputRequest(request);
}
+ void Handle(NPDisk::TEvChunkForget::TPtr &ev) {
+ auto* request = PDisk->ReqCreator.CreateFromEv<TChunkForget>(*ev->Get(), ev->Sender);
+ PDisk->InputRequest(request);
+ }
+
void Handle(NPDisk::TEvChunksLock::TPtr &ev) {
auto* request = PDisk->ReqCreator.CreateFromEv<TChunksLock>(*ev->Get(), ev->Sender);
PDisk->InputRequest(request);
@@ -1001,6 +1012,7 @@ public:
hFunc(NPDisk::TEvHarakiri, ErrorHandle);
hFunc(NPDisk::TEvSlay, InitHandle);
hFunc(NPDisk::TEvChunkReserve, ErrorHandle);
+ hFunc(NPDisk::TEvChunkForget, ErrorHandle);
hFunc(NPDisk::TEvYardControl, InitHandle);
hFunc(NPDisk::TEvAskForCutLog, ErrorHandle);
hFunc(NPDisk::TEvWhiteboardReportResult, Handle);
@@ -1030,6 +1042,7 @@ public:
hFunc(NPDisk::TEvHarakiri, Handle);
hFunc(NPDisk::TEvSlay, Handle);
hFunc(NPDisk::TEvChunkReserve, Handle);
+ hFunc(NPDisk::TEvChunkForget, Handle);
hFunc(NPDisk::TEvChunksLock, Handle);
hFunc(NPDisk::TEvChunksUnlock, Handle);
hFunc(NPDisk::TEvYardControl, Handle);
@@ -1059,6 +1072,7 @@ public:
hFunc(NPDisk::TEvHarakiri, ErrorHandle);
hFunc(NPDisk::TEvSlay, ErrorHandle);
hFunc(NPDisk::TEvChunkReserve, ErrorHandle);
+ hFunc(NPDisk::TEvChunkForget, ErrorHandle);
hFunc(NPDisk::TEvYardControl, ErrorHandle);
hFunc(NPDisk::TEvAskForCutLog, ErrorHandle);
hFunc(NPDisk::TEvWhiteboardReportResult, Handle);
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp
index 937e44ec9b..54c671d312 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp
@@ -85,6 +85,7 @@ TPDisk::TPDisk(const TIntrusivePtr<TPDiskConfig> cfg, const TIntrusivePtr<::NMon
JointChunkWrites.reserve(16 << 10);
JointLogWrites.reserve(16 << 10);
JointCommits.reserve(16 << 10);
+ JointChunkForgets.reserve(16 << 10);
}
TString TPDisk::DynamicStateToString(bool isMultiline) {
@@ -332,6 +333,7 @@ void TPDisk::Stop() {
}
JointLogWrites.clear();
JointCommits.clear();
+ JointChunkForgets.clear();
for (const auto& req : FastOperationsQueue) {
TRequestBase::AbortDelete(req.get(), ActorSystem);
}
@@ -1159,6 +1161,118 @@ void TPDisk::ChunkReserve(TChunkReserve &evChunkReserve) {
guard.Release();
ActorSystem->Send(evChunkReserve.Sender, result.Release());
Mon.ChunkReserve.CountResponse();
+
+}
+bool TPDisk::ValidateForgetChunk(ui32 chunkIdx, TOwner owner, TStringStream& outErrorReason) {
+ TGuard<TMutex> guard(StateMutex);
+ if (chunkIdx >= ChunkState.size()) {
+ outErrorReason << "PDiskId# " << PDiskId
+ << " Can't forget chunkIdx# " << chunkIdx
+ << " > total# " << ChunkState.size()
+ << " ownerId# " << owner
+ << " Marker# BPD89";
+ LOG_ERROR_S(*ActorSystem, NKikimrServices::BS_PDISK, outErrorReason.Str());
+ return false;
+ }
+ if (ChunkState[chunkIdx].OwnerId != owner) {
+ outErrorReason << "PDiskId# " << PDiskId
+ << " Can't forget chunkIdx# " << chunkIdx
+ << ", ownerId# " << owner
+ << " != real ownerId# " << ChunkState[chunkIdx].OwnerId
+ << " Marker# BPD90";
+ LOG_ERROR_S(*ActorSystem, NKikimrServices::BS_PDISK, outErrorReason.Str());
+ return false;
+ }
+ if (ChunkState[chunkIdx].CommitState != TChunkState::DATA_RESERVED_DECOMMIT_IN_PROGRESS
+ && ChunkState[chunkIdx].CommitState != TChunkState::DATA_COMMITTED_DECOMMIT_IN_PROGRESS
+ && ChunkState[chunkIdx].CommitState != TChunkState::DATA_DECOMMITTED) {
+ outErrorReason << "PDiskId# " << PDiskId
+ << " Can't forget chunkIdx# " << chunkIdx
+ << " in CommitState# " << ChunkState[chunkIdx].CommitState
+ << " ownerId# " << owner << " Marker# BPD91";
+ LOG_ERROR_S(*ActorSystem, NKikimrServices::BS_PDISK, outErrorReason.Str());
+ return false;
+ }
+ return true;
+}
+
+void TPDisk::ChunkForget(TChunkForget &evChunkForget) {
+ TStringStream errorReason;
+ TGuard<TMutex> guard(StateMutex);
+
+ THolder<NPDisk::TEvChunkForgetResult> result;
+
+ bool isOk = true;
+
+ for (ui32 chunkIdx : evChunkForget.ForgetChunks) {
+ if (!ValidateForgetChunk(chunkIdx, evChunkForget.Owner, errorReason)) {
+ result = MakeHolder<NPDisk::TEvChunkForgetResult>(NKikimrProto::ERROR,
+ NotEnoughDiskSpaceStatusFlags(evChunkForget.Owner, evChunkForget.OwnerGroupType),
+ errorReason.Str());
+ isOk = false;
+ break;
+ }
+ }
+ if (isOk) {
+ for (ui32 chunkIdx : evChunkForget.ForgetChunks) {
+ TChunkState& state = ChunkState[chunkIdx];
+ if (state.HasAnyOperationsInProgress()) {
+ switch (state.CommitState) {
+ case TChunkState::DATA_RESERVED_DECOMMIT_IN_PROGRESS:
+ Mon.UncommitedDataChunks->Dec();
+ state.CommitState = TChunkState::DATA_RESERVED_DELETE_ON_QUARANTINE;
+ QuarantineChunks.push_back(chunkIdx);
+ break;
+ case TChunkState::DATA_COMMITTED_DECOMMIT_IN_PROGRESS:
+ Mon.UncommitedDataChunks->Dec();
+ state.CommitState = TChunkState::DATA_COMMITTED_DELETE_ON_QUARANTINE;
+ QuarantineChunks.push_back(chunkIdx);
+ break;
+ case TChunkState::DATA_DECOMMITTED:
+ Mon.UncommitedDataChunks->Dec();
+ state.CommitState = TChunkState::DATA_ON_QUARANTINE;
+ QuarantineChunks.push_back(chunkIdx);
+ break;
+ default:
+ Y_FAIL_S("PDiskId# " << PDiskId
+ << " ChunkForget with in flight, ownerId# " << (ui32)evChunkForget.Owner
+ << " chunkIdx# " << chunkIdx << " unexpected commitState# " << state.CommitState);
+ }
+ } else {
+ switch (state.CommitState) {
+ case TChunkState::DATA_RESERVED_DECOMMIT_IN_PROGRESS:
+ Mon.UncommitedDataChunks->Dec();
+ state.CommitState = TChunkState::DATA_RESERVED_DELETE_IN_PROGRESS;
+ break;
+ case TChunkState::DATA_COMMITTED_DECOMMIT_IN_PROGRESS:
+ Mon.UncommitedDataChunks->Dec();
+ state.CommitState = TChunkState::DATA_COMMITTED_DELETE_IN_PROGRESS;
+ break;
+ case TChunkState::DATA_DECOMMITTED:
+ Y_VERIFY_S(state.CommitsInProgress == 0,
+ "PDiskId# " << PDiskId << " chunkIdx# " << chunkIdx << " state# " << state.ToString());
+ LOG_INFO(*ActorSystem, NKikimrServices::BS_PDISK, "PDiskId# %" PRIu32 " chunkIdx# %" PRIu32
+ " forgotten, ownerId# %" PRIu32 " -> %" PRIu32,
+ (ui32)PDiskId, (ui32)chunkIdx, (ui32)state.OwnerId, (ui32)OwnerUnallocated);
+ Y_VERIFY(state.OwnerId == evChunkForget.Owner);
+ Mon.UncommitedDataChunks->Dec();
+ state.CommitState = TChunkState::FREE;
+ Keeper.PushFreeOwnerChunk(evChunkForget.Owner, chunkIdx);
+ break;
+ default:
+ Y_FAIL_S("PDiskId# " << PDiskId
+ << " ChunkForget, ownerId# " << (ui32)evChunkForget.Owner
+ << " chunkIdx# " << chunkIdx << " unexpected commitState# " << state.CommitState);
+ }
+ }
+ }
+ result = MakeHolder<NPDisk::TEvChunkForgetResult>(NKikimrProto::OK, 0);
+ result->StatusFlags = GetStatusFlags(evChunkForget.Owner, evChunkForget.OwnerGroupType);
+ }
+
+ guard.Release();
+ ActorSystem->Send(evChunkForget.Sender, result.Release());
+ Mon.ChunkForget.CountResponse();
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -1692,7 +1806,10 @@ void TPDisk::KillOwner(TOwner owner, TOwnerRound killOwnerRound, TCompletionEven
for (ui32 i = 0; i < ChunkState.size(); ++i) {
TChunkState &state = ChunkState[i];
if (state.OwnerId == owner) {
- if (state.CommitState == TChunkState::DATA_RESERVED) {
+ if (state.CommitState == TChunkState::DATA_RESERVED
+ || state.CommitState == TChunkState::DATA_DECOMMITTED
+ || state.CommitState == TChunkState::DATA_RESERVED_DECOMMIT_IN_PROGRESS
+ || state.CommitState == TChunkState::DATA_COMMITTED_DECOMMIT_IN_PROGRESS) {
Mon.UncommitedDataChunks->Dec();
} else if (state.CommitState == TChunkState::DATA_COMMITTED) {
Mon.CommitedDataChunks->Dec();
@@ -1712,14 +1829,23 @@ void TPDisk::KillOwner(TOwner owner, TOwnerRound killOwnerRound, TCompletionEven
|| state.CommitState == TChunkState::DATA_RESERVED_DELETE_IN_PROGRESS
|| state.CommitState == TChunkState::DATA_COMMITTED_DELETE_IN_PROGRESS
|| state.CommitState == TChunkState::DATA_RESERVED_DELETE_ON_QUARANTINE
- || state.CommitState == TChunkState::DATA_COMMITTED_DELETE_ON_QUARANTINE) {
+ || state.CommitState == TChunkState::DATA_COMMITTED_DELETE_ON_QUARANTINE
+ || state.CommitState == TChunkState::DATA_DECOMMITTED
+ || state.CommitState == TChunkState::DATA_RESERVED_DECOMMIT_IN_PROGRESS
+ || state.CommitState == TChunkState::DATA_COMMITTED_DECOMMIT_IN_PROGRESS
+ ) {
if (state.CommitState == TChunkState::DATA_RESERVED
- || state.CommitState == TChunkState::DATA_COMMITTED) {
+ || state.CommitState == TChunkState::DATA_COMMITTED
+ || state.CommitState == TChunkState::DATA_DECOMMITTED) {
state.CommitState = TChunkState::DATA_ON_QUARANTINE;
} else if (state.CommitState == TChunkState::DATA_COMMITTED_DELETE_IN_PROGRESS) {
state.CommitState = TChunkState::DATA_RESERVED_DELETE_IN_PROGRESS;
} else if (state.CommitState == TChunkState::DATA_COMMITTED_DELETE_ON_QUARANTINE) {
state.CommitState = TChunkState::DATA_RESERVED_DELETE_ON_QUARANTINE;
+ } else if (state.CommitState == TChunkState::DATA_RESERVED_DECOMMIT_IN_PROGRESS
+ || state.CommitState == TChunkState::DATA_COMMITTED_DECOMMIT_IN_PROGRESS) {
+ state.CommitState = TChunkState::DATA_RESERVED_DELETE_ON_QUARANTINE;
+ QuarantineChunks.push_back(i);
}
if (state.CommitState != TChunkState::DATA_RESERVED_DELETE_ON_QUARANTINE
@@ -2102,13 +2228,24 @@ void TPDisk::ProcessFastOperationsQueue() {
MarkChunksAsReleased(static_cast<TReleaseChunks&>(*req));
break;
default:
- Y_FAIL();
+ Y_FAIL_S("Unexpected request type# " << (ui64)req->GetType());
break;
}
}
FastOperationsQueue.clear();
}
+void TPDisk::ProcessChunkForgetQueue() {
+ if (JointChunkForgets.empty())
+ return;
+
+ for (auto& req : JointChunkForgets) {
+ ChunkForget(*req);
+ }
+
+ JointChunkForgets.clear();
+}
+
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Drive info and write cache
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -2561,6 +2698,14 @@ bool TPDisk::PreprocessRequest(TRequestBase *request) {
ownerData.WriteThroughput.Increment(log->Data.size(), ActorSystem->Timestamp());
break;
}
+ case ERequestType::RequestChunkForget:
+ {
+ auto *forget = static_cast<TChunkForget*>(request);
+ TOwnerData &ownerData = OwnerData[forget->Owner];
+ forget->SetOwnerGroupType(ownerData.IsStaticGroupOwner());
+ forget->JobKind = NSchLab::JobKindWrite;
+ break;
+ }
case ERequestType::RequestYardInit:
break;
case ERequestType::RequestCheckSpace:
@@ -2861,6 +3006,12 @@ void TPDisk::RouteRequest(TRequestBase *request) {
}
break;
}
+ case ERequestType::RequestChunkForget:
+ {
+ TChunkForget *forget = static_cast<TChunkForget*>(request);
+ JointChunkForgets.push_back(std::unique_ptr<TChunkForget>(forget));
+ break;
+ }
default:
Y_FAIL_S("RouteRequest, unexpected request type# " << ui64(request->GetType()));
break;
@@ -3080,7 +3231,7 @@ void TPDisk::Update() {
// Processing
bool isNonLogWorkloadPresent = !JointChunkWrites.empty() || !FastOperationsQueue.empty() ||
- !JointChunkReads.empty() || !JointLogReads.empty() || !JointChunkTrims.empty();
+ !JointChunkReads.empty() || !JointLogReads.empty() || !JointChunkTrims.empty() || !JointChunkForgets.empty();
bool isLogWorkloadPresent = !JointLogWrites.empty();
bool isNothingToDo = true;
if (isLogWorkloadPresent || isNonLogWorkloadPresent) {
@@ -3146,6 +3297,7 @@ void TPDisk::Update() {
if (tact != ETact::TactLc) {
ProcessLogWriteQueueAndCommits();
}
+ ProcessChunkForgetQueue();
LastTact = tact;
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h
index 37a86da126..e0704efdd9 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h
@@ -73,7 +73,7 @@ public:
TVector<TLogWrite*> JointLogWrites;
TVector<TLogWrite*> JointCommits;
TVector<TChunkTrim*> JointChunkTrims;
-
+ TVector<std::unique_ptr<TChunkForget>> JointChunkForgets;
TVector<std::unique_ptr<TRequestBase>> FastOperationsQueue;
TDeque<TRequestBase*> PausedQueue;
std::set<std::unique_ptr<TYardInit>> PendingYardInits;
@@ -273,6 +273,8 @@ public:
// Chunk reservation
TVector<TChunkIdx> AllocateChunkForOwner(const TRequestBase *req, const ui32 count, TString &errorReason);
void ChunkReserve(TChunkReserve &evChunkReserve);
+ bool ValidateForgetChunk(ui32 chunkIdx, TOwner owner, TStringStream& outErrorReason);
+ void ChunkForget(TChunkForget &evChunkForget);
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Whiteboard and HTTP reports creation
void WhiteboardReport(TWhiteboardReport &whiteboardReport); // Called by actor
@@ -313,6 +315,7 @@ public:
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Update process
void ProcessLogWriteQueueAndCommits();
+ void ProcessChunkForgetQueue();
void ProcessChunkWriteQueue();
void ProcessChunkReadQueue();
void ProcessLogReadQueue();
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp
index 5db5cae52b..68aee602f8 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp
@@ -866,44 +866,65 @@ NKikimrProto::EReplyStatus TPDisk::BeforeLoggingCommitRecord(const TLogWrite &lo
}
++ChunkState[chunkIdx].CommitsInProgress;
}
- for (ui32 chunkIdx : logWrite.CommitRecord.DeleteChunks) {
- TChunkState& state = ChunkState[chunkIdx];
- if (state.HasAnyOperationsInProgress()) {
+ if (logWrite.CommitRecord.DeleteToDecommitted) {
+ for (ui32 chunkIdx : logWrite.CommitRecord.DeleteChunks) {
+ TChunkState& state = ChunkState[chunkIdx];
switch (state.CommitState) {
case TChunkState::DATA_RESERVED:
- Mon.UncommitedDataChunks->Dec();
- state.CommitState = TChunkState::DATA_RESERVED_DELETE_ON_QUARANTINE;
+ state.CommitState = TChunkState::DATA_RESERVED_DECOMMIT_IN_PROGRESS;
break;
case TChunkState::DATA_COMMITTED:
Mon.CommitedDataChunks->Dec();
- LOG_DEBUG(*ActorSystem, NKikimrServices::BS_PDISK, "PDiskId# %" PRIu32
- " Line# %" PRIu32 " --CommitedDataChunks# %" PRIi64 " chunkIdx# %" PRIu32 " Marker# BPD10",
- (ui32)PDiskId, (ui32)__LINE__, (i64)Mon.CommitedDataChunks->Val(), (ui32)chunkIdx);
- state.CommitState = TChunkState::DATA_COMMITTED_DELETE_ON_QUARANTINE;
+ Mon.UncommitedDataChunks->Inc();
+ state.CommitState = TChunkState::DATA_COMMITTED_DECOMMIT_IN_PROGRESS;
break;
default:
- Y_FAIL_S("PDiskID# " << PDiskId << " can't delete chunkIdx# " << chunkIdx
+ Y_FAIL_S("PDiskID# " << PDiskId << " can't delete to decomitted chunkIdx# " << chunkIdx
<< " request ownerId# " << logWrite.Owner
- << " with operations in progress as it is in unexpected CommitState# " << state.ToString());
+ << " as it is in unexpected CommitState# " << state.ToString());
break;
}
- QuarantineChunks.push_back(chunkIdx);
- LOG_INFO_S(*ActorSystem, NKikimrServices::BS_PDISK, "PDiskId# " << PDiskId
- << " push chunk on QuarantineChunks because it has operations in flight"
- << " chunkIdx# " << chunkIdx
- << " ownerId# " << logWrite.Owner
- << " state# " << state.ToString()
- << " Marker# BPD78");
- } else if (state.CommitState == TChunkState::DATA_RESERVED) {
- Mon.UncommitedDataChunks->Dec();
- state.CommitState = TChunkState::DATA_RESERVED_DELETE_IN_PROGRESS;
- } else if (state.CommitState == TChunkState::DATA_COMMITTED) {
- Mon.CommitedDataChunks->Dec();
- state.CommitState = TChunkState::DATA_COMMITTED_DELETE_IN_PROGRESS;
- } else {
- Y_FAIL_S("PDiskID# " << PDiskId << " can't delete chunkIdx# " << chunkIdx
- << " request ownerId# " << logWrite.Owner
- << " as it is in unexpected CommitState# " << state.ToString());
+ }
+ } else {
+ for (ui32 chunkIdx : logWrite.CommitRecord.DeleteChunks) {
+ TChunkState& state = ChunkState[chunkIdx];
+ if (state.HasAnyOperationsInProgress()) {
+ switch (state.CommitState) {
+ case TChunkState::DATA_RESERVED:
+ Mon.UncommitedDataChunks->Dec();
+ state.CommitState = TChunkState::DATA_RESERVED_DELETE_ON_QUARANTINE;
+ break;
+ case TChunkState::DATA_COMMITTED:
+ Mon.CommitedDataChunks->Dec();
+ LOG_DEBUG(*ActorSystem, NKikimrServices::BS_PDISK, "PDiskId# %" PRIu32
+ " Line# %" PRIu32 " --CommitedDataChunks# %" PRIi64 " chunkIdx# %" PRIu32 " Marker# BPD10",
+ (ui32)PDiskId, (ui32)__LINE__, (i64)Mon.CommitedDataChunks->Val(), (ui32)chunkIdx);
+ state.CommitState = TChunkState::DATA_COMMITTED_DELETE_ON_QUARANTINE;
+ break;
+ default:
+ Y_FAIL_S("PDiskID# " << PDiskId << " can't delete chunkIdx# " << chunkIdx
+ << " request ownerId# " << logWrite.Owner
+ << " with operations in progress as it is in unexpected CommitState# " << state.ToString());
+ break;
+ }
+ QuarantineChunks.push_back(chunkIdx);
+ LOG_INFO_S(*ActorSystem, NKikimrServices::BS_PDISK, "PDiskId# " << PDiskId
+ << " push chunk on QuarantineChunks because it has operations in flight"
+ << " chunkIdx# " << chunkIdx
+ << " ownerId# " << logWrite.Owner
+ << " state# " << state.ToString()
+ << " Marker# BPD78");
+ } else if (state.CommitState == TChunkState::DATA_RESERVED) {
+ Mon.UncommitedDataChunks->Dec();
+ state.CommitState = TChunkState::DATA_RESERVED_DELETE_IN_PROGRESS;
+ } else if (state.CommitState == TChunkState::DATA_COMMITTED) {
+ Mon.CommitedDataChunks->Dec();
+ state.CommitState = TChunkState::DATA_COMMITTED_DELETE_IN_PROGRESS;
+ } else {
+ Y_FAIL_S("PDiskID# " << PDiskId << " can't delete chunkIdx# " << chunkIdx
+ << " request ownerId# " << logWrite.Owner
+ << " as it is in unexpected CommitState# " << state.ToString());
+ }
}
}
@@ -1034,6 +1055,12 @@ void TPDisk::DeleteChunk(ui32 chunkIdx, TOwner owner) {
Y_VERIFY(state.OwnerId == owner); // TODO DELETE
state.CommitState = TChunkState::DATA_ON_QUARANTINE;
break;
+ case TChunkState::DATA_COMMITTED_DECOMMIT_IN_PROGRESS:
+ [[fallthrough]];
+ case TChunkState::DATA_RESERVED_DECOMMIT_IN_PROGRESS:
+ state.CommitState = TChunkState::DATA_DECOMMITTED;
+ break;
+
default:
Y_FAIL_S("PDiskID# " << PDiskId << " can't delete chunkIdx# " << chunkIdx
<< " requesting ownerId# " << owner
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.cpp
index 7417cd837f..a19631f8ed 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.cpp
@@ -188,6 +188,7 @@ TPDiskMon::TPDiskMon(const TIntrusivePtr<::NMonitoring::TDynamicCounters>& count
CheckSpace.Setup(PDiskGroup, "YardCheckSpace");
YardConfigureScheduler.Setup(PDiskGroup, "YardConfigureScheduler");
ChunkReserve.Setup(PDiskGroup, "YardChunkReserve");
+ ChunkForget.Setup(PDiskGroup, "YardChunkForget");
Harakiri.Setup(PDiskGroup, "YardHarakiri");
YardSlay.Setup(PDiskGroup, "YardSlay");
YardControl.Setup(PDiskGroup, "YardControl");
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.h
index 82efac7bad..84fd08d685 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.h
@@ -710,6 +710,7 @@ struct TPDiskMon {
TReqCounters CheckSpace;
TReqCounters YardConfigureScheduler;
TReqCounters ChunkReserve;
+ TReqCounters ChunkForget;
TReqCounters Harakiri;
TReqCounters YardSlay;
TReqCounters YardControl;
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_req_creator.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_req_creator.h
index 737ad571c7..88adf2d6e1 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_req_creator.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_req_creator.h
@@ -52,6 +52,10 @@ private:
request->GateId = GateLog;
request->IsSensitive = true;
return;
+ case ERequestType::RequestChunkForget:
+ request->GateId = GateLog;
+ request->IsSensitive = true;
+ return;
case ERequestType::RequestChunkRead:
request->IsFast = (request->PriorityClass == NPriRead::HullOnlineOther);
request->IsSensitive = (request->PriorityClass == NPriRead::HullOnlineRt);
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_request_id.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_request_id.h
index c0824b6002..21d680884f 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_request_id.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_request_id.h
@@ -82,6 +82,7 @@ struct TReqId {
TryTrimChunk = 63,
ReleaseChunks = 64,
StopDevice = 65,
+ ChunkForget = 66,
};
// 56 bit idx, 8 bit source
@@ -139,6 +140,7 @@ enum class ERequestType {
RequestTryTrimChunk,
RequestReleaseChunks,
RequestStopDevice,
+ RequestChunkForget,
};
inline IOutputStream& operator <<(IOutputStream& out, const TReqId& reqId) {
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h
index 61c0996e08..9d7797adbf 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h
@@ -667,6 +667,27 @@ public:
};
//
+// TChunkForget
+//
+class TChunkForget : public TRequestBase {
+public:
+ TVector<TChunkIdx> ForgetChunks;
+
+ TChunkForget(const NPDisk::TEvChunkForget &ev, const TActorId &sender, TAtomicBase reqIdx)
+ : TRequestBase(sender, TReqId(TReqId::ChunkForget, reqIdx), ev.Owner, ev.OwnerRound, NPriInternal::LogWrite)
+ , ForgetChunks(std::move(ev.ForgetChunks))
+ {}
+
+ ERequestType GetType() const override {
+ return ERequestType::RequestChunkForget;
+ }
+
+ void EstimateCost(const TDriveModel &) override {
+ Cost = 1;
+ }
+};
+
+//
// TWhiteboardReport
//
class TWhiteboardReport : public TRequestBase {
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h
index bafa957dc0..d2982f5fc5 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h
@@ -153,6 +153,9 @@ struct TChunkState {
LOG_RESERVED,
LOG_COMMITTED,
DATA_RESERVED_DELETE_ON_QUARANTINE,
+ DATA_DECOMMITTED,
+ DATA_RESERVED_DECOMMIT_IN_PROGRESS,
+ DATA_COMMITTED_DECOMMIT_IN_PROGRESS,
};
ui64 Nonce;
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_actions.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_actions.cpp
index 305cd7a8ea..3340d314fd 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_actions.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_actions.cpp
@@ -815,6 +815,85 @@ void TTestChunkDelete2::TestFSM(const TActorContext &ctx) {
TestStep += 10;
}
+void TTestChunkForget1::TestFSM(const TActorContext &ctx) {
+ constexpr ui32 toReserve = 535;
+ VERBOSE_COUT("Test step " << TestStep);
+ switch (TestStep) {
+ case 0:
+ ASSERT_YTHROW(LastResponse.Status == NKikimrProto::OK, StatusToString(LastResponse.Status));
+ VERBOSE_COUT(" Sending TEvInit");
+ ctx.Send(Yard, new NPDisk::TEvYardInit(2, VDiskID, *PDiskGuid));
+ break;
+ case 10:
+ {
+ TEST_RESPONSE(EvYardInitResult, OK);
+ Owner = LastResponse.Owner;
+ OwnerRound = LastResponse.OwnerRound;
+ VERBOSE_COUT(" Sending TEvChunkReserve");
+ ctx.Send(Yard, new NPDisk::TEvChunkReserve(Owner, OwnerRound, toReserve));
+ break;
+ case 20:
+ TEST_RESPONSE(EvChunkReserveResult, OK);
+ ASSERT_YTHROW(LastResponse.ChunkIds.size() == toReserve,
+ "Unexpected ChunkIds.size() == " << LastResponse.ChunkIds.size());
+ ReservedChunks = LastResponse.ChunkIds;
+ VERBOSE_COUT(" Sending TEvLog to commit Chunks");
+ for (ui32 i = 0; i < ReservedChunks.size(); ++i) {
+ VERBOSE_COUT(" id = " << ReservedChunks[i]);
+ }
+ CommitData = TString::Uninitialized(sizeof(ui32) * ReservedChunks.size());
+ memcpy((void*)CommitData.data(), &(ReservedChunks[0]), sizeof(ui32) * ReservedChunks.size());
+ NPDisk::TCommitRecord commitRecord;
+ commitRecord.CommitChunks = ReservedChunks;
+ commitRecord.IsStartingPoint = true;
+ ctx.Send(Yard, new NPDisk::TEvLog(Owner, OwnerRound, 0, commitRecord, CommitData, TLsnSeg(1, 1),
+ (void*)43));
+ break;
+ }
+ case 30:
+ TEST_RESPONSE(EvLogResult, OK);
+ VERBOSE_COUT(" Sending TEvReserve to make sure no more chunks can be reserved");
+ ctx.Send(Yard, new NPDisk::TEvChunkReserve(Owner, OwnerRound, toReserve));
+ break;
+ case 40:
+ TEST_RESPONSE(EvChunkReserveResult, OUT_OF_SPACE);
+ VERBOSE_COUT(" Sending TEvLog to delete Chunks");
+ CommitData = TString();
+ {
+ NPDisk::TCommitRecord commitRecord;
+ commitRecord.DeleteChunks = ReservedChunks;
+ commitRecord.IsStartingPoint = true;
+ commitRecord.DeleteToDecommitted = true;
+ ctx.Send(Yard, new NPDisk::TEvLog(Owner, OwnerRound, 0, commitRecord, CommitData, TLsnSeg(2, 2),
+ (void*)43));
+ break;
+ }
+ case 50:
+ TEST_RESPONSE(EvLogResult, OK);
+ VERBOSE_COUT(" Sending TEvReserve to make sure no more chunks can be reserved");
+ ctx.Send(Yard, new NPDisk::TEvChunkReserve(Owner, OwnerRound, toReserve));
+ break;
+ case 60:
+ TEST_RESPONSE(EvChunkReserveResult, OUT_OF_SPACE);
+ ctx.Send(Yard, new NPDisk::TEvChunkForget(Owner, OwnerRound, ReservedChunks));
+ break;
+ case 70:
+ TEST_RESPONSE(EvChunkForgetResult, OK);
+ VERBOSE_COUT(" Sending TEvReserve to make sure some chunks can be reserved");
+ ctx.Send(Yard, new NPDisk::TEvChunkReserve(Owner, OwnerRound, toReserve));
+ break;
+ case 80:
+ TEST_RESPONSE(EvChunkReserveResult, OK);
+ VERBOSE_COUT("Done");
+ SignalDoneEvent();
+ break;
+ default:
+ ythrow TWithBackTrace<yexception>() << "Unexpected TestStep " << TestStep << Endl;
+ break;
+ }
+ TestStep += 10;
+}
+
void TTestInitStartingPoints::TestFSM(const TActorContext &ctx) {
TString data("testdata");
VERBOSE_COUT("Test step " << TestStep);
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_actions.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_actions.h
index 7eeb5db0bb..1c230ebc7f 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_actions.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_actions.h
@@ -1030,6 +1030,20 @@ public:
{}
};
+class TTestChunkForget1 : public TBaseTest {
+ NPDisk::TOwner Owner;
+ NPDisk::TOwnerRound OwnerRound;
+ TString ChunkWriteData;
+ TString CommitData;
+ TVector<ui32> ReservedChunks;
+
+ void TestFSM(const TActorContext &ctx);
+public:
+ TTestChunkForget1(const TIntrusivePtr<TTestConfig> &cfg)
+ : TBaseTest(cfg)
+ {}
+};
+
template<ui32 WishDataSize>
class TTestChunk3WriteRead : public TBaseTest {
NPDisk::TOwner Owner;
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_base_test.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_base_test.h
index e6c75e6000..d7d0cd3522 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_base_test.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_base_test.h
@@ -268,6 +268,16 @@ protected:
ActTestFSM(ctx);
}
+ void Handle(NPDisk::TEvChunkForgetResult::TPtr &ev, const TActorContext &ctx) {
+ NPDisk::TEvChunkForgetResult &result = *(ev->Get());
+ LastResponse.Status = result.Status;
+ LastResponse.EventType = (TEvBlobStorage::EEv)result.Type();
+ LastResponse.StatusFlags = result.StatusFlags;
+ VERBOSE_COUT("Got " << result.ToString());
+ ActTestFSM(ctx);
+ }
+
+
void Handle(NPDisk::TEvChunksLockResult::TPtr &ev, const TActorContext &ctx) {
NPDisk::TEvChunksLockResult &result = *(ev->Get());
LastResponse.Status = result.Status;
@@ -368,6 +378,7 @@ public:
HFunc(NPDisk::TEvSlayResult, Handle);
HFunc(NPDisk::TEvYardControlResult, Handle);
HFunc(NPDisk::TEvCutLog, Handle);
+ HFunc(NPDisk::TEvChunkForgetResult, Handle);
HFunc(NMon::TEvHttpInfoRes, Handle);
HFunc(NNodeWhiteboard::TEvWhiteboard::TEvPDiskStateUpdate, Handle);
HFunc(NNodeWhiteboard::TEvWhiteboard::TEvVDiskStateUpdate, Handle);
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_yard.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_yard.cpp
index b8ea34e9d9..8aa8d23ef3 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_yard.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_yard.cpp
@@ -396,6 +396,11 @@ YARD_UNIT_TEST(TestChunkDelete) {
Run<TTestChunkDelete2>(&tc, 1, MIN_CHUNK_SIZE);
}
+YARD_UNIT_TEST(TestChunkForget) {
+ TTestContext tc(false, true);
+ Run<TTestChunkForget1>(&tc, 1, MIN_CHUNK_SIZE);
+}
+
YARD_UNIT_TEST(Test3HugeAsyncLog) {
TTestContext tc(false, true);
constexpr ui32 chunkSize = MIN_CHUNK_SIZE;