aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexander Rutkovsky <alexvru@ydb.tech>2025-01-28 18:38:04 +0300
committerGitHub <noreply@github.com>2025-01-28 15:38:04 +0000
commitfaa5f83347e212beb824e53cde09ea6b97f16e39 (patch)
tree362b5752e6934f70a2ee9f2fa9f6eab559810539
parent30de7128afc75edae241e313aac2c54a0d9b9f02 (diff)
downloadydb-faa5f83347e212beb824e53cde09ea6b97f16e39.tar.gz
Fix shred interface in PDisk (#13926)
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp12
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp38
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h2
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h1
4 files changed, 30 insertions, 23 deletions
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp
index bab180b275..8b0c3ca485 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp
@@ -65,11 +65,13 @@ class TPDiskActor : public TActorBootstrapped<TPDiskActor> {
ui32 SlotId = 0;
bool IsShred = false;
ui64 ShredGeneration = 0;
+ ui64 Cookie;
- TInitQueueItem(const TActorId sender, const ui64 shredGeneration)
+ TInitQueueItem(const TActorId sender, const ui64 shredGeneration, ui64 cookie)
: Sender(sender)
, IsShred(true)
, ShredGeneration(shredGeneration)
+ , Cookie(cookie)
{}
TInitQueueItem(TOwnerRound ownerRound, TVDiskID vDisk, ui64 pDiskGuid, TActorId sender, TActorId cutLogId,
@@ -299,7 +301,7 @@ public:
Become(&TThis::StateError);
for (TList<TInitQueueItem>::iterator it = InitQueue.begin(); it != InitQueue.end(); ++it) {
if (it->IsShred) {
- Send(it->Sender, new NPDisk::TEvShredPDiskResult(NKikimrProto::CORRUPTED, it->ShredGeneration, errorReason));
+ Send(it->Sender, new NPDisk::TEvShredPDiskResult(NKikimrProto::CORRUPTED, it->ShredGeneration, errorReason), 0, it->Cookie);
if (PDisk) {
PDisk->Mon.ShredPDisk.CountResponse();
}
@@ -617,6 +619,7 @@ public:
if (it->IsShred) {
NPDisk::TEvShredPDisk evShredPDisk(it->ShredGeneration);
auto* request = PDisk->ReqCreator.CreateFromEv<NPDisk::TShredPDisk>(evShredPDisk, it->Sender);
+ request->Cookie = it->Cookie;
PDisk->InputRequest(request);
} else {
NPDisk::TEvYardInit evInit(it->OwnerRound, it->VDisk, it->PDiskGuid, it->CutLogId, it->WhiteboardProxyId,
@@ -667,7 +670,7 @@ public:
void InitHandle(NPDisk::TEvShredPDisk::TPtr &ev) {
const NPDisk::TEvShredPDisk &evShredPDisk = *ev->Get();
- InitQueue.emplace_back(ev->Sender, evShredPDisk.ShredGeneration);
+ InitQueue.emplace_back(ev->Sender, evShredPDisk.ShredGeneration, ev->Cookie);
}
void InitHandle(NPDisk::TEvPreShredCompactVDiskResult::TPtr &ev) {
@@ -843,7 +846,7 @@ public:
void ErrorHandle(NPDisk::TEvShredPDisk::TPtr &ev) {
// Respond with error, can't shred in this state.
- Send(ev->Sender, new NPDisk::TEvShredPDiskResult(NKikimrProto::CORRUPTED, 0, StateErrorReason));
+ Send(ev->Sender, new NPDisk::TEvShredPDiskResult(NKikimrProto::CORRUPTED, 0, StateErrorReason), 0, ev->Cookie);
}
void ErrorHandle(NPDisk::TEvPreShredCompactVDiskResult::TPtr &ev) {
@@ -1060,6 +1063,7 @@ public:
void Handle(NPDisk::TEvShredPDisk::TPtr &ev) {
auto* request = PDisk->ReqCreator.CreateFromEv<TShredPDisk>(*ev->Get(), ev->Sender);
+ request->Cookie = ev->Cookie;
PDisk->InputRequest(request);
}
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp
index f3c157bc11..95ff2dc59e 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp
@@ -3935,7 +3935,7 @@ void TPDisk::ProgressShredState() {
<< " sends compact request to VDisk# " << data.VDiskId
<< " ownerId# " << ownerId
<< " request# " << compactRequest->ToString());
- PCtx->ActorSystem->Send(data.CutLogId, compactRequest.Release());
+ PCtx->ActorSystem->Send(new IEventHandle(data.CutLogId, PCtx->PDiskActor, compactRequest.Release()));
data.LastShredGeneration = ShredGeneration;
data.ShredState = TOwnerData::VDISK_SHRED_STATE_COMPACT_REQUESTED;
}
@@ -3984,7 +3984,7 @@ void TPDisk::ProgressShredState() {
<< " sends shred request to VDisk# " << data.VDiskId
<< " ownerId# " << ownerId
<< " request# " << shredRequest->ToString());
- PCtx->ActorSystem->Send(data.CutLogId, shredRequest.Release());
+ PCtx->ActorSystem->Send(new IEventHandle(data.CutLogId, PCtx->PDiskActor, shredRequest.Release()));
data.ShredState = TOwnerData::VDISK_SHRED_STATE_SHRED_REQUESTED;
data.LastShredGeneration = ShredGeneration;
}
@@ -4011,8 +4011,9 @@ void TPDisk::ProgressShredState() {
LOG_NOTICE_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED,
"Shred request is finished at PDisk# " << PCtx->PDiskId
<< " ShredGeneration# " << ShredGeneration);
- for (TActorId &requester : ShredRequesters) {
- PCtx->ActorSystem->Send(requester, new TEvShredPDiskResult(NKikimrProto::OK, ShredGeneration, ""));
+ for (auto& [requester, cookie] : ShredRequesters) {
+ PCtx->ActorSystem->Send(new IEventHandle(requester, PCtx->PDiskActor, new TEvShredPDiskResult(
+ NKikimrProto::OK, ShredGeneration, ""), 0, cookie));
}
ShredRequesters.clear();
}
@@ -4029,27 +4030,28 @@ void TPDisk::ProcessShredPDisk(TShredPDisk& request) {
TGuard<TMutex> guard(StateMutex);
if (request.ShredGeneration < ShredGeneration) {
guard.Release();
- PCtx->ActorSystem->Send(request.Sender,
- new TEvShredPDiskResult(NKikimrProto::RACE, request.ShredGeneration,
- "A shred request with a higher generation is already in progress"));
+ PCtx->ActorSystem->Send(new IEventHandle(request.Sender, PCtx->PDiskActor, new TEvShredPDiskResult(
+ NKikimrProto::RACE, request.ShredGeneration, "A shred request with a higher generation is already in progress"),
+ 0, request.Cookie));
return;
}
if (request.ShredGeneration == ShredGeneration) {
// Do nothing, since we already have a shred request with the same generation.
// Just add the sender to the list of requesters.
- ShredRequesters.push_back(request.Sender);
+ ShredRequesters.emplace_back(request.Sender, request.Cookie);
return;
}
// ShredGeneration > request.ShredGeneration
if (ShredRequesters.size() > 0) {
- for (TActorId &requester : ShredRequesters) {
- PCtx->ActorSystem->Send(requester, new TEvShredPDiskResult(NKikimrProto::RACE, request.ShredGeneration,
- "A shred request with a higher generation is received"));
+ for (auto& [requester, cookie] : ShredRequesters) {
+ PCtx->ActorSystem->Send(new IEventHandle(requester, PCtx->PDiskActor, new TEvShredPDiskResult(
+ NKikimrProto::RACE, request.ShredGeneration, "A shred request with a higher generation is received"), 0,
+ cookie));
}
ShredRequesters.clear();
}
ShredGeneration = request.ShredGeneration;
- ShredRequesters.push_back(request.Sender);
+ ShredRequesters.emplace_back(request.Sender, request.Cookie);
ShredState = EShredStateSendPreShredCompactVDisk;
ProgressShredState();
}
@@ -4094,7 +4096,7 @@ void TPDisk::ProcessPreShredCompactVDiskResult(TPreShredCompactVDiskResult& requ
}
if (request.Status != NKikimrProto::OK) {
ShredState = EShredStateFailed;
- for (TActorId &requester : ShredRequesters) {
+ for (auto& [requester, cookie] : ShredRequesters) {
TStringStream str;
str << "Shred request failed at PDisk# " << PCtx->PDiskId
<< " for shredGeneration# " << request.ShredGeneration
@@ -4103,8 +4105,8 @@ void TPDisk::ProcessPreShredCompactVDiskResult(TPreShredCompactVDiskResult& requ
<< " replied with PreShredCompactVDiskResult status# " << request.Status
<< " and ErrorReason# " << request.ErrorReason;
LOG_ERROR_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED, str.Str());
- PCtx->ActorSystem->Send(requester, new TEvShredPDiskResult(NKikimrProto::ERROR, request.ShredGeneration,
- str.Str()));
+ PCtx->ActorSystem->Send(new IEventHandle(requester, PCtx->PDiskActor, new TEvShredPDiskResult(
+ NKikimrProto::ERROR, request.ShredGeneration, str.Str()), 0, cookie));
}
ShredRequesters.clear();
return;
@@ -4153,7 +4155,7 @@ void TPDisk::ProcessShredVDiskResult(TShredVDiskResult& request) {
}
if (request.Status != NKikimrProto::OK) {
ShredState = EShredStateFailed;
- for (TActorId &requester : ShredRequesters) {
+ for (auto& [requester, cookie] : ShredRequesters) {
TStringStream str;
str << "Shred request failed at PDisk# " << PCtx->PDiskId
<< " for shredGeneration# " << request.ShredGeneration
@@ -4162,8 +4164,8 @@ void TPDisk::ProcessShredVDiskResult(TShredVDiskResult& request) {
<< " replied with status# " << request.Status
<< " and ErrorReason# " << request.ErrorReason;
LOG_ERROR_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED, str.Str());
- PCtx->ActorSystem->Send(requester, new TEvShredPDiskResult(NKikimrProto::ERROR, request.ShredGeneration,
- str.Str()));
+ PCtx->ActorSystem->Send(new IEventHandle(requester, PCtx->PDiskActor, new TEvShredPDiskResult(
+ NKikimrProto::ERROR, request.ShredGeneration, str.Str()), 0, cookie));
}
ShredRequesters.clear();
return;
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h
index d0d9386b4f..94a1110ae3 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h
@@ -157,7 +157,7 @@ public:
};
EShredState ShredState = EShredStateDefault;
ui64 ShredGeneration = 0;
- std::deque<TActorId> ShredRequesters;
+ std::deque<std::tuple<TActorId, ui64>> ShredRequesters;
// Chunks that are owned by killed owner, but have operations InFlight
TVector<TChunkIdx> QuarantineChunks;
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h
index fd4790dfdb..a5f75eae97 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h
@@ -1144,6 +1144,7 @@ public:
class TShredPDisk : public TRequestBase {
public:
ui64 ShredGeneration;
+ ui64 Cookie;
TShredPDisk(NPDisk::TEvShredPDisk& ev, TActorId sender, TAtomicBase reqIdx)
: TRequestBase(sender, TReqId(TReqId::ShredPDisk, reqIdx), OwnerSystem, 0, NPriInternal::Other)