diff options
author | Alexander Rutkovsky <alexvru@ydb.tech> | 2025-01-28 18:38:04 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-01-28 15:38:04 +0000 |
commit | faa5f83347e212beb824e53cde09ea6b97f16e39 (patch) | |
tree | 362b5752e6934f70a2ee9f2fa9f6eab559810539 | |
parent | 30de7128afc75edae241e313aac2c54a0d9b9f02 (diff) | |
download | ydb-faa5f83347e212beb824e53cde09ea6b97f16e39.tar.gz |
Fix shred interface in PDisk (#13926)
4 files changed, 30 insertions, 23 deletions
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp index bab180b275..8b0c3ca485 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp @@ -65,11 +65,13 @@ class TPDiskActor : public TActorBootstrapped<TPDiskActor> { ui32 SlotId = 0; bool IsShred = false; ui64 ShredGeneration = 0; + ui64 Cookie; - TInitQueueItem(const TActorId sender, const ui64 shredGeneration) + TInitQueueItem(const TActorId sender, const ui64 shredGeneration, ui64 cookie) : Sender(sender) , IsShred(true) , ShredGeneration(shredGeneration) + , Cookie(cookie) {} TInitQueueItem(TOwnerRound ownerRound, TVDiskID vDisk, ui64 pDiskGuid, TActorId sender, TActorId cutLogId, @@ -299,7 +301,7 @@ public: Become(&TThis::StateError); for (TList<TInitQueueItem>::iterator it = InitQueue.begin(); it != InitQueue.end(); ++it) { if (it->IsShred) { - Send(it->Sender, new NPDisk::TEvShredPDiskResult(NKikimrProto::CORRUPTED, it->ShredGeneration, errorReason)); + Send(it->Sender, new NPDisk::TEvShredPDiskResult(NKikimrProto::CORRUPTED, it->ShredGeneration, errorReason), 0, it->Cookie); if (PDisk) { PDisk->Mon.ShredPDisk.CountResponse(); } @@ -617,6 +619,7 @@ public: if (it->IsShred) { NPDisk::TEvShredPDisk evShredPDisk(it->ShredGeneration); auto* request = PDisk->ReqCreator.CreateFromEv<NPDisk::TShredPDisk>(evShredPDisk, it->Sender); + request->Cookie = it->Cookie; PDisk->InputRequest(request); } else { NPDisk::TEvYardInit evInit(it->OwnerRound, it->VDisk, it->PDiskGuid, it->CutLogId, it->WhiteboardProxyId, @@ -667,7 +670,7 @@ public: void InitHandle(NPDisk::TEvShredPDisk::TPtr &ev) { const NPDisk::TEvShredPDisk &evShredPDisk = *ev->Get(); - InitQueue.emplace_back(ev->Sender, evShredPDisk.ShredGeneration); + InitQueue.emplace_back(ev->Sender, evShredPDisk.ShredGeneration, ev->Cookie); } void InitHandle(NPDisk::TEvPreShredCompactVDiskResult::TPtr &ev) { @@ -843,7 +846,7 @@ public: void ErrorHandle(NPDisk::TEvShredPDisk::TPtr &ev) { // Respond with error, can't shred in this state. - Send(ev->Sender, new NPDisk::TEvShredPDiskResult(NKikimrProto::CORRUPTED, 0, StateErrorReason)); + Send(ev->Sender, new NPDisk::TEvShredPDiskResult(NKikimrProto::CORRUPTED, 0, StateErrorReason), 0, ev->Cookie); } void ErrorHandle(NPDisk::TEvPreShredCompactVDiskResult::TPtr &ev) { @@ -1060,6 +1063,7 @@ public: void Handle(NPDisk::TEvShredPDisk::TPtr &ev) { auto* request = PDisk->ReqCreator.CreateFromEv<TShredPDisk>(*ev->Get(), ev->Sender); + request->Cookie = ev->Cookie; PDisk->InputRequest(request); } diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp index f3c157bc11..95ff2dc59e 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp @@ -3935,7 +3935,7 @@ void TPDisk::ProgressShredState() { << " sends compact request to VDisk# " << data.VDiskId << " ownerId# " << ownerId << " request# " << compactRequest->ToString()); - PCtx->ActorSystem->Send(data.CutLogId, compactRequest.Release()); + PCtx->ActorSystem->Send(new IEventHandle(data.CutLogId, PCtx->PDiskActor, compactRequest.Release())); data.LastShredGeneration = ShredGeneration; data.ShredState = TOwnerData::VDISK_SHRED_STATE_COMPACT_REQUESTED; } @@ -3984,7 +3984,7 @@ void TPDisk::ProgressShredState() { << " sends shred request to VDisk# " << data.VDiskId << " ownerId# " << ownerId << " request# " << shredRequest->ToString()); - PCtx->ActorSystem->Send(data.CutLogId, shredRequest.Release()); + PCtx->ActorSystem->Send(new IEventHandle(data.CutLogId, PCtx->PDiskActor, shredRequest.Release())); data.ShredState = TOwnerData::VDISK_SHRED_STATE_SHRED_REQUESTED; data.LastShredGeneration = ShredGeneration; } @@ -4011,8 +4011,9 @@ void TPDisk::ProgressShredState() { LOG_NOTICE_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED, "Shred request is finished at PDisk# " << PCtx->PDiskId << " ShredGeneration# " << ShredGeneration); - for (TActorId &requester : ShredRequesters) { - PCtx->ActorSystem->Send(requester, new TEvShredPDiskResult(NKikimrProto::OK, ShredGeneration, "")); + for (auto& [requester, cookie] : ShredRequesters) { + PCtx->ActorSystem->Send(new IEventHandle(requester, PCtx->PDiskActor, new TEvShredPDiskResult( + NKikimrProto::OK, ShredGeneration, ""), 0, cookie)); } ShredRequesters.clear(); } @@ -4029,27 +4030,28 @@ void TPDisk::ProcessShredPDisk(TShredPDisk& request) { TGuard<TMutex> guard(StateMutex); if (request.ShredGeneration < ShredGeneration) { guard.Release(); - PCtx->ActorSystem->Send(request.Sender, - new TEvShredPDiskResult(NKikimrProto::RACE, request.ShredGeneration, - "A shred request with a higher generation is already in progress")); + PCtx->ActorSystem->Send(new IEventHandle(request.Sender, PCtx->PDiskActor, new TEvShredPDiskResult( + NKikimrProto::RACE, request.ShredGeneration, "A shred request with a higher generation is already in progress"), + 0, request.Cookie)); return; } if (request.ShredGeneration == ShredGeneration) { // Do nothing, since we already have a shred request with the same generation. // Just add the sender to the list of requesters. - ShredRequesters.push_back(request.Sender); + ShredRequesters.emplace_back(request.Sender, request.Cookie); return; } // ShredGeneration > request.ShredGeneration if (ShredRequesters.size() > 0) { - for (TActorId &requester : ShredRequesters) { - PCtx->ActorSystem->Send(requester, new TEvShredPDiskResult(NKikimrProto::RACE, request.ShredGeneration, - "A shred request with a higher generation is received")); + for (auto& [requester, cookie] : ShredRequesters) { + PCtx->ActorSystem->Send(new IEventHandle(requester, PCtx->PDiskActor, new TEvShredPDiskResult( + NKikimrProto::RACE, request.ShredGeneration, "A shred request with a higher generation is received"), 0, + cookie)); } ShredRequesters.clear(); } ShredGeneration = request.ShredGeneration; - ShredRequesters.push_back(request.Sender); + ShredRequesters.emplace_back(request.Sender, request.Cookie); ShredState = EShredStateSendPreShredCompactVDisk; ProgressShredState(); } @@ -4094,7 +4096,7 @@ void TPDisk::ProcessPreShredCompactVDiskResult(TPreShredCompactVDiskResult& requ } if (request.Status != NKikimrProto::OK) { ShredState = EShredStateFailed; - for (TActorId &requester : ShredRequesters) { + for (auto& [requester, cookie] : ShredRequesters) { TStringStream str; str << "Shred request failed at PDisk# " << PCtx->PDiskId << " for shredGeneration# " << request.ShredGeneration @@ -4103,8 +4105,8 @@ void TPDisk::ProcessPreShredCompactVDiskResult(TPreShredCompactVDiskResult& requ << " replied with PreShredCompactVDiskResult status# " << request.Status << " and ErrorReason# " << request.ErrorReason; LOG_ERROR_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED, str.Str()); - PCtx->ActorSystem->Send(requester, new TEvShredPDiskResult(NKikimrProto::ERROR, request.ShredGeneration, - str.Str())); + PCtx->ActorSystem->Send(new IEventHandle(requester, PCtx->PDiskActor, new TEvShredPDiskResult( + NKikimrProto::ERROR, request.ShredGeneration, str.Str()), 0, cookie)); } ShredRequesters.clear(); return; @@ -4153,7 +4155,7 @@ void TPDisk::ProcessShredVDiskResult(TShredVDiskResult& request) { } if (request.Status != NKikimrProto::OK) { ShredState = EShredStateFailed; - for (TActorId &requester : ShredRequesters) { + for (auto& [requester, cookie] : ShredRequesters) { TStringStream str; str << "Shred request failed at PDisk# " << PCtx->PDiskId << " for shredGeneration# " << request.ShredGeneration @@ -4162,8 +4164,8 @@ void TPDisk::ProcessShredVDiskResult(TShredVDiskResult& request) { << " replied with status# " << request.Status << " and ErrorReason# " << request.ErrorReason; LOG_ERROR_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED, str.Str()); - PCtx->ActorSystem->Send(requester, new TEvShredPDiskResult(NKikimrProto::ERROR, request.ShredGeneration, - str.Str())); + PCtx->ActorSystem->Send(new IEventHandle(requester, PCtx->PDiskActor, new TEvShredPDiskResult( + NKikimrProto::ERROR, request.ShredGeneration, str.Str()), 0, cookie)); } ShredRequesters.clear(); return; diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h index d0d9386b4f..94a1110ae3 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h @@ -157,7 +157,7 @@ public: }; EShredState ShredState = EShredStateDefault; ui64 ShredGeneration = 0; - std::deque<TActorId> ShredRequesters; + std::deque<std::tuple<TActorId, ui64>> ShredRequesters; // Chunks that are owned by killed owner, but have operations InFlight TVector<TChunkIdx> QuarantineChunks; diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h index fd4790dfdb..a5f75eae97 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h @@ -1144,6 +1144,7 @@ public: class TShredPDisk : public TRequestBase { public: ui64 ShredGeneration; + ui64 Cookie; TShredPDisk(NPDisk::TEvShredPDisk& ev, TActorId sender, TAtomicBase reqIdx) : TRequestBase(sender, TReqId(TReqId::ShredPDisk, reqIdx), OwnerSystem, 0, NPriInternal::Other) |