diff options
author | Alexander Rutkovsky <alexander.rutkovsky@gmail.com> | 2022-06-28 13:44:34 +0300 |
---|---|---|
committer | Alexander Rutkovsky <alexander.rutkovsky@gmail.com> | 2022-06-28 13:44:34 +0300 |
commit | 3f2d373fed52ba10087ee33afe3842f6a678e518 (patch) | |
tree | 2ea5eb8766b6624d5bb4da655b96f27bb4467a57 | |
parent | b13fbd750496fa994449f37c8c8a49bed18ccd6e (diff) | |
download | ydb-3f2d373fed52ba10087ee33afe3842f6a678e518.tar.gz |
Support ExtraBlockChecks in EvPuts KIKIMR-14867
ref:4acb5bec675beb89cfeb968755c7f60155644e3f
27 files changed, 295 insertions, 84 deletions
diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h index c9e6504c45f..dd442e1724e 100644 --- a/ydb/core/base/blobstorage.h +++ b/ydb/core/base/blobstorage.h @@ -890,6 +890,7 @@ struct TEvBlobStorage { const ETactic Tactic; mutable NLWTrace::TOrbit Orbit; ui32 RestartCounter = 0; + std::vector<std::pair<ui64, ui32>> ExtraBlockChecks; // (TabletId, Generation) pairs TEvPut(const TLogoBlobID &id, const TString &buffer, TInstant deadline, NKikimrBlobStorage::EPutHandleClass handleClass = NKikimrBlobStorage::TabletLog, diff --git a/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp b/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp index ed38409713b..f4ae30da968 100644 --- a/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp +++ b/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp @@ -66,6 +66,8 @@ class TVDiskBackpressureClientActor : public TActorBootstrapped<TVDiskBackpressu return "<unknown>"; } + bool ExtraBlockChecksSupport = false; + public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::BS_QUEUE_ACTOR; @@ -453,7 +455,7 @@ private: case EState::READY: QLOG_NOTICE_S("BSQ96", "connection lost status# " << NKikimrProto::EReplyStatus_Name(status) << " errorReason# " << errorReason << " timeout# " << timeout); - ctx.Send(BlobStorageProxy, new TEvProxyQueueState(VDiskId, QueueId, false)); + ctx.Send(BlobStorageProxy, new TEvProxyQueueState(VDiskId, QueueId, false, false)); Queue.DrainQueue(status, TStringBuilder() << "BS_QUEUE: " << errorReason, ctx); DrainStatus(status, ctx); break; @@ -551,7 +553,8 @@ private: const auto& record = ev->Get()->Record; if (record.GetStatus() != NKikimrProto::NOTREADY) { - ctx.Send(BlobStorageProxy, new TEvProxyQueueState(VDiskId, QueueId, true)); + ExtraBlockChecksSupport = record.GetExtraBlockChecksSupport(); + ctx.Send(BlobStorageProxy, new TEvProxyQueueState(VDiskId, QueueId, true, ExtraBlockChecksSupport)); if (record.HasExpectedMsgId()) { Queue.SetMessageId(NBackpressure::TMessageId(record.GetExpectedMsgId())); } @@ -758,7 +761,7 @@ private: << " RemoteVDisk# " << RemoteVDisk << " VDiskId# " << VDiskId << " IsConnected# " << isConnected); - ctx.Send(ev->Sender, new TEvProxyQueueState(VDiskId, QueueId, isConnected)); + ctx.Send(ev->Sender, new TEvProxyQueueState(VDiskId, QueueId, isConnected, isConnected && ExtraBlockChecksSupport)); } #define QueueRequestHFunc(TEvType) \ diff --git a/ydb/core/blobstorage/backpressure/queue_backpressure_client.h b/ydb/core/blobstorage/backpressure/queue_backpressure_client.h index e6cc006a7a9..74c0f13edc7 100644 --- a/ydb/core/blobstorage/backpressure/queue_backpressure_client.h +++ b/ydb/core/blobstorage/backpressure/queue_backpressure_client.h @@ -16,11 +16,14 @@ namespace NKikimr { TVDiskID VDiskId; NKikimrBlobStorage::EVDiskQueueId QueueId; bool IsConnected; + bool ExtraBlockChecksSupport; - TEvProxyQueueState(const TVDiskID &vDiskId, NKikimrBlobStorage::EVDiskQueueId queueId, bool isConnected) + TEvProxyQueueState(const TVDiskID &vDiskId, NKikimrBlobStorage::EVDiskQueueId queueId, bool isConnected, + bool extraBlockChecksSupport) : VDiskId(vDiskId) , QueueId(queueId) , IsConnected(isConnected) + , ExtraBlockChecksSupport(extraBlockChecksSupport) {} TString ToString() const { @@ -28,6 +31,7 @@ namespace NKikimr { str << "{VDiskId# " << VDiskId.ToString(); str << " QueueId# " << static_cast<ui32>(QueueId); str << " IsConnected# " << (IsConnected ? "true" : "false"); + str << " ExtraBlockChecksSupport# " << (ExtraBlockChecksSupport ? "true" : "false"); str << "}"; return str.Str(); } diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp index eb6d8394f02..344fde247ef 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp @@ -346,9 +346,11 @@ void TGroupDiskRequests::AddGet(const ui32 diskOrderNumber, const TLogoBlobID &i } void TGroupDiskRequests::AddPut(const ui32 diskOrderNumber, const TLogoBlobID &id, TString buffer, - TDiskPutRequest::EPutReason putReason, bool isHandoff, ui8 blobIdx) { + TDiskPutRequest::EPutReason putReason, bool isHandoff, std::vector<std::pair<ui64, ui32>> *extraBlockChecks, + ui8 blobIdx) { Y_VERIFY(diskOrderNumber < DiskRequestsForOrderNumber.size()); - DiskRequestsForOrderNumber[diskOrderNumber].PutsToSend.emplace_back(id, buffer, putReason, isHandoff, blobIdx); + DiskRequestsForOrderNumber[diskOrderNumber].PutsToSend.emplace_back(id, buffer, putReason, isHandoff, + extraBlockChecks, blobIdx); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -378,13 +380,17 @@ void TBlackboard::AddNeeded(const TLogoBlobID &id, ui32 inShift, ui32 inSize) { } } -void TBlackboard::AddPartToPut(const TLogoBlobID &id, ui32 partIdx, TString &partData) { +void TBlackboard::AddPartToPut(const TLogoBlobID &id, ui32 partIdx, TString &partData, + std::vector<std::pair<ui64, ui32>> *extraBlockChecks) { Y_VERIFY(bool(id)); Y_VERIFY(id.PartId() == 0); Y_VERIFY(id.BlobSize() != 0); TBlobState &state = BlobStates[id]; - if (!bool(state.Id)) { + if (!state.Id) { state.Init(id, *Info); + state.ExtraBlockChecks = extraBlockChecks; + } else { + Y_VERIFY(state.ExtraBlockChecks == extraBlockChecks); } state.AddPartToPut(partIdx, partData); } diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h index fe926648a83..cbffa37eb0c 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h @@ -81,6 +81,7 @@ struct TBlobState { NKikimrProto::EReplyStatus Status = NKikimrProto::UNKNOWN; bool IsChanged = false; bool IsDone = false; + std::vector<std::pair<ui64, ui32>> *ExtraBlockChecks = nullptr; void Init(const TLogoBlobID &id, const TBlobStorageGroupInfo &Info); void AddNeeded(ui64 begin, ui64 size); @@ -127,13 +128,16 @@ struct TDiskPutRequest { EPutReason Reason; bool IsHandoff; ui8 BlobIdx; + std::vector<std::pair<ui64, ui32>> *ExtraBlockChecks; - TDiskPutRequest(const TLogoBlobID &id, TString buffer, EPutReason reason, bool isHandoff, ui8 blobIdx = 0) + TDiskPutRequest(const TLogoBlobID &id, TString buffer, EPutReason reason, bool isHandoff, + std::vector<std::pair<ui64, ui32>> *extraBlockChecks, ui8 blobIdx = 0) : Id(id) , Buffer(std::move(buffer)) , Reason(reason) , IsHandoff(isHandoff) , BlobIdx(blobIdx) + , ExtraBlockChecks(extraBlockChecks) {} }; @@ -151,7 +155,8 @@ struct TGroupDiskRequests { void AddGet(const ui32 diskOrderNumber, const TLogoBlobID &id, const TIntervalSet<i32> &intervalSet); void AddGet(const ui32 diskOrderNumber, const TLogoBlobID &id, const ui32 shift, const ui32 size); void AddPut(const ui32 diskOrderNumber, const TLogoBlobID &id, TString buffer, - TDiskPutRequest::EPutReason putReason, bool isHandoff, ui8 blobIdx = 0); + TDiskPutRequest::EPutReason putReason, bool isHandoff, std::vector<std::pair<ui64, ui32>> *extraBlockChecks, + ui8 blobIdx = 0); }; struct TBlackboard; @@ -194,7 +199,7 @@ struct TBlackboard { {} void AddNeeded(const TLogoBlobID &id, ui32 inShift, ui32 inSize); - void AddPartToPut(const TLogoBlobID &id, ui32 partIdx, TString &partData); + void AddPartToPut(const TLogoBlobID &id, ui32 partIdx, TString &partData, std::vector<std::pair<ui64, ui32>> *extraBlockChecks); void MarkBlobReadyToPut(const TLogoBlobID &id, ui8 blobIdx = 0); void MoveBlobStateToDone(const TLogoBlobID &id); void AddResponseData(const TLogoBlobID &id, ui32 orderNumber, ui32 shift, TString &data); diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp index 0583b454df6..f1d01776b6c 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp @@ -352,7 +352,7 @@ void TGetImpl::PrepareVPuts(TLogContext &logCtx, } bytes += put.Buffer.size(); lastItemCount++; - vMultiPut->AddVPut(put.Id, put.Buffer, &cookie); + vMultiPut->AddVPut(put.Id, put.Buffer, &cookie, put.ExtraBlockChecks); } vMultiPut->Record.SetCookie(TVMultiPutCookie(diskOrderNumber, lastItemCount, VMultiPutRequests)); ++VMultiPutRequests; diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp index df51c9a78e0..e2b717d13f6 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp @@ -45,9 +45,10 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt NWilson::TTraceId TraceId; NLWTrace::TOrbit Orbit; bool Replied = false; + std::vector<std::pair<ui64, ui32>> ExtraBlockChecks; TMultiPutItemInfo(TLogoBlobID id, const TString& buffer, TActorId recipient, ui64 cookie, - NWilson::TTraceId traceId, NLWTrace::TOrbit &&orbit) + NWilson::TTraceId traceId, NLWTrace::TOrbit &&orbit, std::vector<std::pair<ui64, ui32>> extraBlockChecks) : BlobId(id) , Buffer(buffer) , BufferSize(buffer.size()) @@ -55,6 +56,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt , Cookie(cookie) , TraceId(std::move(traceId)) , Orbit(std::move(orbit)) + , ExtraBlockChecks(std::move(extraBlockChecks)) {} }; @@ -92,6 +94,8 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt const bool IsMultiPutMode; + bool RequireExtraBlockChecks = false; + void SanityCheck() { if (RequestsSent <= MaxSaneRequests) { return; @@ -458,11 +462,13 @@ public: , IsAccelerated(false) , IsAccelerateScheduled(false) , IsMultiPutMode(false) + , RequireExtraBlockChecks(!ev->ExtraBlockChecks.empty()) { if (ev->Orbit.HasShuttles()) { RootCauseTrack.IsOn = true; } - ItemsInfo.emplace_back(ev->Id, ev->Buffer, source, cookie, NWilson::TTraceId(), std::move(ev->Orbit)); + ItemsInfo.emplace_back(ev->Id, ev->Buffer, source, cookie, NWilson::TTraceId(), std::move(ev->Orbit), + std::move(ev->ExtraBlockChecks)); LWPROBE(DSProxyBlobPutTactics, ItemsInfo[0].BlobId.TabletID(), Info->GroupID, ItemsInfo[0].BlobId.ToString(), Tactic, NKikimrBlobStorage::EPutHandleClass_Name(HandleClass)); ReportBytes(ItemsInfo[0].Buffer.capacity() + sizeof(*this)); @@ -507,17 +513,22 @@ public: { Y_VERIFY_DEBUG(events.size() <= MaxBatchedPutRequests); for (auto &ev : events) { - Deadline = Max(Deadline, ev->Get()->Deadline); - if (ev->Get()->Orbit.HasShuttles()) { + auto& msg = *ev->Get(); + Deadline = Max(Deadline, msg.Deadline); + if (msg.Orbit.HasShuttles()) { RootCauseTrack.IsOn = true; } + if (!msg.ExtraBlockChecks.empty()) { + RequireExtraBlockChecks = true; + } ItemsInfo.emplace_back( - ev->Get()->Id, - ev->Get()->Buffer, + msg.Id, + msg.Buffer, ev->Sender, ev->Cookie, std::move(ev->TraceId), - std::move(ev->Get()->Orbit) + std::move(msg.Orbit), + std::move(msg.ExtraBlockChecks) ); LWPROBE(DSProxyBlobPutTactics, ItemsInfo.back().BlobId.TabletID(), Info->GroupID, ItemsInfo.back().BlobId.ToString(), Tactic, NKikimrBlobStorage::EPutHandleClass_Name(HandleClass)); diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp index 70b774636ca..757b313cfb0 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp @@ -59,15 +59,15 @@ void TPutImpl::PrepareOneReply(NKikimrProto::EReplyStatus status, TLogoBlobID bl void TPutImpl::PrepareReply(NKikimrProto::EReplyStatus status, TLogContext &logCtx, TString errorReason, TPutResultVec &outPutResults) { A_LOG_DEBUG_SX(logCtx, "BPP34", "PrepareReply status# " << status << " errorReason# " << errorReason); - for (ui64 idx = 0; idx < BlobIds.size(); ++idx) { + for (ui64 idx = 0; idx < Blobs.size(); ++idx) { if (IsDone[idx]) { - A_LOG_DEBUG_SX(logCtx, "BPP35", "blob# " << BlobIds[idx].ToString() << + A_LOG_DEBUG_SX(logCtx, "BPP35", "blob# " << Blobs[idx].ToString() << " idx# " << idx << " is sent, skipped"); continue; } - outPutResults.emplace_back(idx, new TEvBlobStorage::TEvPutResult(status, BlobIds[idx], StatusFlags, Info->GroupID, - ApproximateFreeSpaceShare)); + outPutResults.emplace_back(idx, new TEvBlobStorage::TEvPutResult(status, Blobs[idx].Id, StatusFlags, + Info->GroupID, ApproximateFreeSpaceShare)); outPutResults.back().second->ErrorReason = errorReason; NLog::EPriority priority = GetPriorityForReply(Info->PutErrorMuteChecker, status); @@ -87,7 +87,7 @@ void TPutImpl::PrepareReply(TLogContext &logCtx, TString errorReason, for (auto item : finished) { auto &[blobId, state] = *item; const ui64 idx = state.BlobIdx; - Y_VERIFY(blobId == BlobIds[idx], "BlobIdx# %" PRIu64 " BlobState# %s Blackboard# %s", + Y_VERIFY(blobId == Blobs[idx].Id, "BlobIdx# %" PRIu64 " BlobState# %s Blackboard# %s", idx, state.ToString().c_str(), Blackboard.ToString().c_str()); Y_VERIFY(!IsDone[idx]); Y_VERIFY(state.Status != NKikimrProto::UNKNOWN); @@ -126,7 +126,7 @@ TString TPutImpl::DumpFullState() const { str << Endl; str << " Blackboard# " << Blackboard.ToString(); str << Endl; - str << " BlobIds# " << BlobIds.ToString(); + str << " Blobs# " << Blobs.ToString(); str << Endl; str << "IsDone# " << IsDone.ToString(); str << Endl; @@ -147,12 +147,16 @@ TString TPutImpl::DumpFullState() const { } bool TPutImpl::MarkBlobAsSent(ui64 idx) { - Y_VERIFY(idx < BlobIds.size()); + Y_VERIFY(idx < Blobs.size()); Y_VERIFY(!IsDone[idx]); - Blackboard.MoveBlobStateToDone(BlobIds[idx]); + Blackboard.MoveBlobStateToDone(Blobs[idx].Id); IsDone[idx] = true; DoneBlobs++; return true; } }//NKikimr + +Y_DECLARE_OUT_SPEC(, NKikimr::TPutImpl::TBlobInfo, stream, value) { + value.Output(stream); +} diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h index b8b74c1cde7..b95d0cf3485 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h @@ -44,7 +44,31 @@ private: const TEvBlobStorage::TEvPut::ETactic Tactic; - TBatchedVec<TLogoBlobID> BlobIds; + struct TBlobInfo { + TLogoBlobID Id; + std::vector<std::pair<ui64, ui32>> ExtraBlockChecks; + + void Output(IOutputStream& s) const { + s << Id; + if (!ExtraBlockChecks.empty()) { + s << "{"; + for (auto it = ExtraBlockChecks.begin(); it != ExtraBlockChecks.end(); ++it) { + if (it != ExtraBlockChecks.begin()) { + s << ", "; + } + s << it->first << ":" << it->second; + } + s << "}"; + } + } + + TString ToString() const { + TStringStream s; + Output(s); + return s.Str(); + } + }; + TBatchedVec<TBlobInfo> Blobs; TStackVec<bool, MaxBatchedPutRequests * TypicalDisksInSubring> ReceivedVPutResponses; TStackVec<bool, MaxBatchedPutRequests * TypicalDisksInSubring> ReceivedVMultiPutResponses; @@ -53,6 +77,8 @@ private: TString ErrorDescription; + friend void ::Out<TBlobInfo>(IOutputStream&, const TBlobInfo&); + public: TPutImpl(const TIntrusivePtr<TBlobStorageGroupInfo> &info, const TIntrusivePtr<TGroupQueues> &state, TEvBlobStorage::TEvPut *ev, const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, @@ -66,10 +92,10 @@ public: , Mon(mon) , EnableRequestMod3x3ForMinLatecy(enableRequestMod3x3ForMinLatecy) , Tactic(ev->Tactic) - , BlobIds({ev->Id}) + , Blobs({{ev->Id, std::move(ev->ExtraBlockChecks)}}) { - Y_VERIFY(BlobIds.size()); - Y_VERIFY(BlobIds.size() <= MaxBatchedPutRequests); + Y_VERIFY(Blobs.size()); + Y_VERIFY(Blobs.size() <= MaxBatchedPutRequests); } TPutImpl(const TIntrusivePtr<TBlobStorageGroupInfo> &info, const TIntrusivePtr<TGroupQueues> &state, @@ -88,13 +114,14 @@ public: { Y_VERIFY(events.size(), "TEvPut vector is empty"); for (auto &ev : events) { - Y_VERIFY(ev->Get()->HandleClass == putHandleClass); - Y_VERIFY(ev->Get()->Tactic == tactic); - BlobIds.push_back(ev->Get()->Id); - Deadline = Max(Deadline, ev->Get()->Deadline); + auto& msg = *ev->Get(); + Y_VERIFY(msg.HandleClass == putHandleClass); + Y_VERIFY(msg.Tactic == tactic); + Blobs.push_back({msg.Id, std::move(msg.ExtraBlockChecks)}); + Deadline = Max(Deadline, msg.Deadline); } - Y_VERIFY(BlobIds.size()); - Y_VERIFY(BlobIds.size() <= MaxBatchedPutRequests); + Y_VERIFY(Blobs.size()); + Y_VERIFY(Blobs.size() <= MaxBatchedPutRequests); } NKikimrBlobStorage::EPutHandleClass GetPutHandleClass() const { @@ -109,17 +136,17 @@ public: void GenerateInitialRequests(TLogContext &logCtx, TBatchedVec<TDataPartSet> &partSets, TDeque<std::unique_ptr<TVPutEvent>> &outVPuts) { Y_UNUSED(logCtx); - Y_VERIFY_S(partSets.size() == BlobIds.size(), "partSets.size# " << partSets.size() - << " BlobIds.size# " << BlobIds.size()); + Y_VERIFY_S(partSets.size() == Blobs.size(), "partSets.size# " << partSets.size() + << " Blobs.size# " << Blobs.size()); const ui32 totalParts = Info->Type.TotalPartCount(); - for (ui64 blobIdx = 0; blobIdx < BlobIds.size(); ++blobIdx) { - TLogoBlobID &blobId = BlobIds[blobIdx]; + for (ui64 blobIdx = 0; blobIdx < Blobs.size(); ++blobIdx) { + TBlobInfo& blob = Blobs[blobIdx]; for (ui32 i = 0; i < totalParts; ++i) { REQUEST_VALGRIND_CHECK_MEM_IS_DEFINED(partSets[blobIdx].Parts[i].OwnedString.Data(), partSets[blobIdx].Parts[i].OwnedString.Size()); - Blackboard.AddPartToPut(blobId, i, partSets[blobIdx].Parts[i].OwnedString); + Blackboard.AddPartToPut(blob.Id, i, partSets[blobIdx].Parts[i].OwnedString, &blob.ExtraBlockChecks); } - Blackboard.MarkBlobReadyToPut(blobId, blobIdx); + Blackboard.MarkBlobReadyToPut(blob.Id, blobIdx); } TPutResultVec putResults; @@ -322,7 +349,7 @@ public: } Step(logCtx, outVPutEvents, outPutResults); - Y_VERIFY_S(DoneBlobs == BlobIds.size() || requests > responses, + Y_VERIFY_S(DoneBlobs == Blobs.size() || requests > responses, "No put result while" << " Type# " << putType << " DoneBlobs# " << DoneBlobs @@ -420,13 +447,21 @@ protected: if constexpr (isVPut) { auto vPut = std::make_unique<TEvBlobStorage::TEvVPut>(put.Id, put.Buffer, vDiskId, false, &cookie, Deadline, Blackboard.PutHandleClass); + auto& record = vPut->Record; + if (put.ExtraBlockChecks) { + for (const auto& [tabletId, generation] : *put.ExtraBlockChecks) { + auto *p = record.AddExtraBlockChecks(); + p->SetTabletId(tabletId); + p->SetGeneration(generation); + } + } R_LOG_DEBUG_SX(logCtx, "BPP20", "Send put to orderNumber# " << diskOrderNumber << " idx# " << idx << " vPut# " << vPut->ToString()); outVPutEvents.push_back(std::move(vPut)); ++VPutRequests; ReceivedVPutResponses.push_back(false); } else if constexpr (isVMultiPut) { - outVPutEvents.back()->AddVPut(put.Id, put.Buffer, &cookie); + outVPutEvents.back()->AddVPut(put.Id, put.Buffer, &cookie, put.ExtraBlockChecks); } if (put.IsHandoff) { diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_state.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_state.cpp index 9cf4b55e589..f2dcdce5881 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_state.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_state.cpp @@ -205,7 +205,8 @@ namespace NKikimr { Y_VERIFY(Sessions); auto *msg = ev->Get(); Y_VERIFY(Topology); - Sessions->QueueConnectUpdate(Topology->GetOrderNumber(msg->VDiskId), msg->QueueId, msg->IsConnected); + Sessions->QueueConnectUpdate(Topology->GetOrderNumber(msg->VDiskId), msg->QueueId, msg->IsConnected, + msg->ExtraBlockChecksSupport, *Topology); if (msg->IsConnected && (CurrentStateFunc() == &TThis::StateEstablishingSessions || CurrentStateFunc() == &TThis::StateEstablishingSessionsTimeout)) { SwitchToWorkWhenGoodToGo(); diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp index c73c20712ec..89304e2cd34 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp @@ -333,7 +333,8 @@ void TStrategyBase::PreparePutsForPartPlacement(TLogContext &logCtx, TBlobState << " blob Id# " << partId.ToString()); Y_VERIFY(state.Parts[record.PartIdx].Data.IsMonolith()); groupDiskRequests.AddPut(disk.OrderNumber, partId, state.Parts[record.PartIdx].Data.GetMonolith(), - TDiskPutRequest::ReasonInitial, info.Type.IsHandoffInSubgroup(record.VDiskIdx), state.BlobIdx); + TDiskPutRequest::ReasonInitial, info.Type.IsHandoffInSubgroup(record.VDiskIdx), + state.ExtraBlockChecks, state.BlobIdx); disk.DiskParts[record.PartIdx].Situation = TBlobState::ESituation::Sent; } } diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3of4.h b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3of4.h index b372e891ebd..5ef2c7ddf4d 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3of4.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3of4.h @@ -140,6 +140,7 @@ protected: GetDataBuffer(state, info), diskIdx == group.DiskIdx[0] ? TDiskPutRequest::ReasonInitial : TDiskPutRequest::ReasonError, diskIdx != group.DiskIdx[0], + state.ExtraBlockChecks, state.BlobIdx); s = TBlobState::ESituation::Sent; any |= {&info.GetTopology(), diskIdx}; @@ -172,6 +173,7 @@ protected: TString(), handoff ? TDiskPutRequest::ReasonError : TDiskPutRequest::ReasonInitial, handoff, + state.ExtraBlockChecks, state.BlobIdx); part.Situation = TBlobState::ESituation::Sent; any |= {&info.GetTopology(), (ui8)diskIdx}; diff --git a/ydb/core/blobstorage/dsproxy/group_sessions.cpp b/ydb/core/blobstorage/dsproxy/group_sessions.cpp index 9923c6b7843..27223501628 100644 --- a/ydb/core/blobstorage/dsproxy/group_sessions.cpp +++ b/ydb/core/blobstorage/dsproxy/group_sessions.cpp @@ -85,6 +85,7 @@ TGroupSessions::TGroupSessions(const TIntrusivePtr<TBlobStorageGroupInfo>& info, auto& q = stateVDisk.Queues.GetQueue(queueId); q.ActorId = queue; q.FlowRecord = std::move(flowRecord); + q.ExtraBlockChecksSupport = false; } } } @@ -114,11 +115,18 @@ bool TGroupSessions::GoodToGo(const TBlobStorageGroupInfo::TTopology& topology, : topology.GetQuorumChecker().CheckQuorumForGroup(connected); } -void TGroupSessions::QueueConnectUpdate(ui32 orderNumber, NKikimrBlobStorage::EVDiskQueueId queueId, bool connected) { +void TGroupSessions::QueueConnectUpdate(ui32 orderNumber, NKikimrBlobStorage::EVDiskQueueId queueId, bool connected, + bool extraGroupChecksSupport, const TBlobStorageGroupInfo::TTopology& topology) { + const auto v = topology.GetVDiskId(orderNumber); + const ui32 fdom = topology.GetFailDomainOrderNumber(v); + auto& q = GroupQueues->FailDomains[fdom].VDisks[v.VDisk].Queues.GetQueue(queueId); + if (connected) { ConnectedQueuesMask[orderNumber] |= 1 << queueId; + q.ExtraBlockChecksSupport = extraGroupChecksSupport; } else { ConnectedQueuesMask[orderNumber] &= ~(1 << queueId); + q.ExtraBlockChecksSupport = false; } } diff --git a/ydb/core/blobstorage/dsproxy/group_sessions.h b/ydb/core/blobstorage/dsproxy/group_sessions.h index 9284051408a..120ad2da4c9 100644 --- a/ydb/core/blobstorage/dsproxy/group_sessions.h +++ b/ydb/core/blobstorage/dsproxy/group_sessions.h @@ -21,6 +21,7 @@ namespace NKikimr { struct TQueue { TActorId ActorId; TIntrusivePtr<NBackpressure::TFlowRecord> FlowRecord; + bool ExtraBlockChecksSupport; }; TQueue PutTabletLog; TQueue PutAsyncBlob; @@ -65,10 +66,6 @@ namespace NKikimr { } } - TActorId& QueueForQueueId(NKikimrBlobStorage::EVDiskQueueId queueId) { - return GetQueue(queueId).ActorId; - } - TIntrusivePtr<NBackpressure::TFlowRecord>& FlowRecordForQueueId(NKikimrBlobStorage::EVDiskQueueId queueId) { return GetQueue(queueId).FlowRecord; } @@ -78,9 +75,25 @@ namespace NKikimr { return flowRecord ? flowRecord->GetPredictedDelayNs() : 0; } + template<typename T> + static void ValidateEvent(TQueue& /*queue*/, const T& /*event*/) + {} + + static void ValidateEvent(TQueue& queue, const TEvBlobStorage::TEvVPut& event) { + Y_VERIFY(!event.Record.ExtraBlockChecksSize() || queue.ExtraBlockChecksSupport); + } + + static void ValidateEvent(TQueue& queue, const TEvBlobStorage::TEvVMultiPut& event) { + for (const auto& item : event.Record.GetItems()) { + Y_VERIFY(!item.ExtraBlockChecksSize() || queue.ExtraBlockChecksSupport); + } + } + template<typename TEvent> TActorId QueueForEvent(const TEvent& event) { - return QueueForQueueId(VDiskQueueId(event)); + TQueue& queue = GetQueue(VDiskQueueId(event)); + ValidateEvent(queue, event); + return queue.ActorId; } TString ToString() const { @@ -220,7 +233,8 @@ namespace NKikimr { const TActorId& monActor, const TActorId& proxyActor); void Poison(); bool GoodToGo(const TBlobStorageGroupInfo::TTopology& topology, bool waitForAllVDisks); - void QueueConnectUpdate(ui32 orderNumber, NKikimrBlobStorage::EVDiskQueueId queueId, bool connected); + void QueueConnectUpdate(ui32 orderNumber, NKikimrBlobStorage::EVDiskQueueId queueId, bool connected, + bool extraBlockChecksSupport, const TBlobStorageGroupInfo::TTopology& topology); ui32 GetNumUnconnectedDisks(); }; diff --git a/ydb/core/blobstorage/ut_blobstorage/CMakeLists.darwin.txt b/ydb/core/blobstorage/ut_blobstorage/CMakeLists.darwin.txt index 20737cdd499..9bcfefe2f87 100644 --- a/ydb/core/blobstorage/ut_blobstorage/CMakeLists.darwin.txt +++ b/ydb/core/blobstorage/ut_blobstorage/CMakeLists.darwin.txt @@ -34,6 +34,7 @@ target_sources(ydb-core-blobstorage-ut_blobstorage PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/decommit_3dc.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/defrag.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/encryption.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/extra_block_checks.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/gc_quorum_3dc.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/incorrect_queries.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/main.cpp diff --git a/ydb/core/blobstorage/ut_blobstorage/CMakeLists.linux.txt b/ydb/core/blobstorage/ut_blobstorage/CMakeLists.linux.txt index c29b009099a..3ac6ede19a9 100644 --- a/ydb/core/blobstorage/ut_blobstorage/CMakeLists.linux.txt +++ b/ydb/core/blobstorage/ut_blobstorage/CMakeLists.linux.txt @@ -38,6 +38,7 @@ target_sources(ydb-core-blobstorage-ut_blobstorage PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/decommit_3dc.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/defrag.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/encryption.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/extra_block_checks.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/gc_quorum_3dc.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/incorrect_queries.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/main.cpp diff --git a/ydb/core/blobstorage/ut_blobstorage/extra_block_checks.cpp b/ydb/core/blobstorage/ut_blobstorage/extra_block_checks.cpp new file mode 100644 index 00000000000..cac7bfd7f3f --- /dev/null +++ b/ydb/core/blobstorage/ut_blobstorage/extra_block_checks.cpp @@ -0,0 +1,62 @@ +#include <ydb/core/blobstorage/ut_blobstorage/lib/env.h> + +Y_UNIT_TEST_SUITE(ExtraBlockChecks) { + Y_UNIT_TEST(Basic) { + TEnvironmentSetup env(TEnvironmentSetup::TSettings{ + .NodeCount = 8, + .Erasure = TBlobStorageGroupType::Erasure4Plus2Block, + }); + + auto& runtime = env.Runtime; + + env.CreateBoxAndPool(1, 1); + auto groups = env.GetGroups(); + UNIT_ASSERT_VALUES_EQUAL(groups.size(), 1); + const TIntrusivePtr<TBlobStorageGroupInfo> info = env.GetGroupInfo(groups.front()); + + const auto& edge = runtime->AllocateEdgeActor(1, __FILE__, __LINE__); + runtime->WrapInActorContext(edge, [&] { + SendToBSProxy(edge, info->GroupID, new TEvBlobStorage::TEvBlock(1, 10, TInstant::Max())); + SendToBSProxy(edge, info->GroupID, new TEvBlobStorage::TEvBlock(2, 10, TInstant::Max())); + }); + { + auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvBlockResult>(edge, false); + UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK); + res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvBlockResult>(edge, false); + UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK); + } + + const TString data = "data"; + + runtime->WrapInActorContext(edge, [&] { + auto ev = std::make_unique<TEvBlobStorage::TEvPut>(TLogoBlobID(1, 11, 1, 0, data.size(), 1), data, TInstant::Max()); + ev->ExtraBlockChecks.emplace_back(2, 10); + SendToBSProxy(edge, info->GroupID, ev.release()); + }); + { + auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvPutResult>(edge, false); + UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::BLOCKED); + } + + runtime->WrapInActorContext(edge, [&] { + SendToBSProxy(edge, info->GroupID, new TEvBlobStorage::TEvPut(TLogoBlobID(1, 11, 1, 0, data.size(), 2), data, TInstant::Max())); + }); + { + auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvPutResult>(edge, false); + UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK); + } + + runtime->WrapInActorContext(edge, [&] { + auto ev = std::make_unique<TEvBlobStorage::TEvPut>(TLogoBlobID(1, 11, 1, 0, data.size(), 3), data, TInstant::Max()); + ev->ExtraBlockChecks.emplace_back(2, 10); + SendToBSProxy(edge, info->GroupID, ev.release()); + SendToBSProxy(edge, info->GroupID, new TEvBlobStorage::TEvPut(TLogoBlobID(1, 11, 1, 0, data.size(), 4), data, TInstant::Max())); + }); + { + auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvPutResult>(edge, false); + UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::BLOCKED); + res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvPutResult>(edge, false); + UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK); + } + } +} diff --git a/ydb/core/blobstorage/ut_blobstorage/incorrect_queries.cpp b/ydb/core/blobstorage/ut_blobstorage/incorrect_queries.cpp index 1e1bca2fe9c..2e51f3e5bfc 100644 --- a/ydb/core/blobstorage/ut_blobstorage/incorrect_queries.cpp +++ b/ydb/core/blobstorage/ut_blobstorage/incorrect_queries.cpp @@ -80,7 +80,7 @@ Y_UNIT_TEST_SUITE(IncorrectQueries) { TInstant::Max(), NKikimrBlobStorage::EPutHandleClass::TabletLog, false, nullptr)); for(auto [blob, data, status] : blobs) { - static_cast<TEvBlobStorage::TEvVMultiPut*>(ev.get())->AddVPut(blob, data, 0); + static_cast<TEvBlobStorage::TEvVMultiPut*>(ev.get())->AddVPut(blob, data, nullptr, nullptr); } static_cast<TEvBlobStorage::TEvVMultiPut*>(ev.get())->Record = proto; @@ -105,7 +105,7 @@ Y_UNIT_TEST_SUITE(IncorrectQueries) { TInstant::Max(), NKikimrBlobStorage::EPutHandleClass::TabletLog, false, nullptr)); for(auto [blob, data, status] : blobs) { - static_cast<TEvBlobStorage::TEvVMultiPut*>(ev.get())->AddVPut(blob, data, 0); + static_cast<TEvBlobStorage::TEvVMultiPut*>(ev.get())->AddVPut(blob, data, nullptr, nullptr); } env.WithQueueId(test.Info->GetVDiskInSubgroup(0, blobs[0].BlobId.Hash()), NKikimrBlobStorage::EVDiskQueueId::PutTabletLog, [&](TActorId queueId) { @@ -528,7 +528,7 @@ Y_UNIT_TEST_SUITE(IncorrectQueries) { if (i % 19 != 18) { ++goodCount; TLogoBlobID blob(i, 1, 0, 0, blobSize, 0, 1); - static_cast<TEvBlobStorage::TEvVMultiPut*>(events[i].get())->AddVPut(blob, data, 0); + static_cast<TEvBlobStorage::TEvVMultiPut*>(events[i].get())->AddVPut(blob, data, nullptr, nullptr); } } diff --git a/ydb/core/blobstorage/ut_vdisk/lib/helpers.cpp b/ydb/core/blobstorage/ut_vdisk/lib/helpers.cpp index b73da3df8db..f7bd796a008 100644 --- a/ydb/core/blobstorage/ut_vdisk/lib/helpers.cpp +++ b/ydb/core/blobstorage/ut_vdisk/lib/helpers.cpp @@ -448,7 +448,7 @@ class TManyMultiPuts : public TActorBootstrapped<TManyMultiPuts> { TVDiskIdShort mainVDiskId = TIngress::GetMainReplica(&Conf->GroupInfo->GetTopology(), logoBlobID); if (mainVDiskId == VDiskInfo.VDiskID) { ui64 cookieValue = Step; - vMultiPut->AddVPut(logoBlobID, MsgData, &cookieValue); + vMultiPut->AddVPut(logoBlobID, MsgData, &cookieValue, nullptr); putCount++; Step++; diff --git a/ydb/core/blobstorage/ut_vdisk/lib/test_huge.cpp b/ydb/core/blobstorage/ut_vdisk/lib/test_huge.cpp index 9940e6da358..0e41b8b27f4 100644 --- a/ydb/core/blobstorage/ut_vdisk/lib/test_huge.cpp +++ b/ydb/core/blobstorage/ut_vdisk/lib/test_huge.cpp @@ -60,7 +60,7 @@ public: new TEvHullWriteHugeBlob(TActorId(), 0, logoBlobId, TIngress(), TRope(abcdefghkj), false, NKikimrBlobStorage::EPutHandleClass::AsyncBlob, - std::make_unique<TEvBlobStorage::TEvVPutResult>())); + std::make_unique<TEvBlobStorage::TEvVPutResult>(), nullptr)); State = 1; return false; } diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_events.h b/ydb/core/blobstorage/vdisk/common/vdisk_events.h index bfd260b0040..dc52a49d022 100644 --- a/ydb/core/blobstorage/vdisk/common/vdisk_events.h +++ b/ydb/core/blobstorage/vdisk/common/vdisk_events.h @@ -836,7 +836,8 @@ namespace NKikimr { TRope GetItemBuffer(ui64 itemIdx) const; - void AddVPut(const TLogoBlobID &logoBlobId, const TString &buffer, ui64 *cookie) { + void AddVPut(const TLogoBlobID &logoBlobId, const TString &buffer, ui64 *cookie, + std::vector<std::pair<ui64, ui32>> *extraBlockChecks) { NKikimrBlobStorage::TVMultiPutItem *item = Record.AddItems(); LogoBlobIDFromLogoBlobID(logoBlobId, item->MutableBlobID()); item->SetFullDataSize(logoBlobId.BlobSize()); @@ -845,6 +846,13 @@ namespace NKikimr { if (cookie) { item->SetCookie(*cookie); } + if (extraBlockChecks) { + for (const auto& [tabletId, generation] : *extraBlockChecks) { + auto *p = item->AddExtraBlockChecks(); + p->SetTabletId(tabletId); + p->SetGeneration(generation); + } + } } bool Validate(TString& errorReason) { @@ -2361,6 +2369,7 @@ namespace NKikimr { TEvVCheckReadinessResult(NKikimrProto::EReplyStatus status) { Record.SetStatus(status); + Record.SetExtraBlockChecksSupport(true); } }; diff --git a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.cpp b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.cpp index 989c5e920c5..6c154281eb8 100644 --- a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.cpp +++ b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.cpp @@ -201,7 +201,8 @@ namespace NKikimr { ctx.Send(NotifyID, new TEvHullHugeWritten(HugeSlot)); ctx.Send(HugeKeeperCtx->SkeletonId, new TEvHullLogHugeBlob(WriteId, Item->LogoBlobId, Item->Ingress, DiskAddr, - Item->IgnoreBlock, Item->SenderId, Item->Cookie, std::move(Item->Result))); + Item->IgnoreBlock, Item->SenderId, Item->Cookie, std::move(Item->Result), + &Item->ExtraBlockChecks)); LOG_DEBUG(ctx, BS_HULLHUGE, VDISKP(HugeKeeperCtx->VCtx->VDiskLogPrefix, "Writer: finish: id# %s diskAddr# %s", diff --git a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.h b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.h index ae6d7536eab..1876855d797 100644 --- a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.h +++ b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.h @@ -21,6 +21,7 @@ namespace NKikimr { const bool IgnoreBlock; const NKikimrBlobStorage::EPutHandleClass HandleClass; std::unique_ptr<TEvBlobStorage::TEvVPutResult> Result; + NProtoBuf::RepeatedPtrField<NKikimrBlobStorage::TEvVPut::TExtraBlockCheck> ExtraBlockChecks; TEvHullWriteHugeBlob(const TActorId &senderId, ui64 cookie, @@ -29,7 +30,8 @@ namespace NKikimr { TRope&& data, bool ignoreBlock, NKikimrBlobStorage::EPutHandleClass handleClass, - std::unique_ptr<TEvBlobStorage::TEvVPutResult> result) + std::unique_ptr<TEvBlobStorage::TEvVPutResult> result, + NProtoBuf::RepeatedPtrField<NKikimrBlobStorage::TEvVPut::TExtraBlockCheck> *extraBlockChecks) : SenderId(senderId) , Cookie(cookie) , LogoBlobId(logoBlobId) @@ -38,7 +40,11 @@ namespace NKikimr { , IgnoreBlock(ignoreBlock) , HandleClass(handleClass) , Result(std::move(result)) - {} + { + if (extraBlockChecks) { + ExtraBlockChecks.Swap(extraBlockChecks); + } + } ui64 ByteSize() const { return Data.GetSize(); @@ -64,6 +70,7 @@ namespace NKikimr { const TActorId OrigClient; const ui64 OrigCookie; std::unique_ptr<TEvBlobStorage::TEvVPutResult> Result; + NProtoBuf::RepeatedPtrField<NKikimrBlobStorage::TEvVPut::TExtraBlockCheck> ExtraBlockChecks; TEvHullLogHugeBlob(ui64 writeId, const TLogoBlobID &logoBlobID, @@ -72,7 +79,8 @@ namespace NKikimr { bool ignoreBlock, const TActorId &origClient, ui64 origCookie, - std::unique_ptr<TEvBlobStorage::TEvVPutResult> result) + std::unique_ptr<TEvBlobStorage::TEvVPutResult> result, + NProtoBuf::RepeatedPtrField<NKikimrBlobStorage::TEvVPut::TExtraBlockCheck> *extraBlockChecks) : WriteId(writeId) , LogoBlobID(logoBlobID) , Ingress(ingress) @@ -81,8 +89,11 @@ namespace NKikimr { , OrigClient(origClient) , OrigCookie(origCookie) , Result(std::move(result)) - - {} + { + if (extraBlockChecks) { + ExtraBlockChecks.Swap(extraBlockChecks); + } + } }; //////////////////////////////////////////////////////////////////////////// diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp index 33a8e1153b5..58c881fb027 100644 --- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp +++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp @@ -161,7 +161,8 @@ namespace NKikimr { THullCheckStatus THull::CheckLogoBlob( const TActorContext &ctx, const TLogoBlobID &id, - bool ignoreBlock) + bool ignoreBlock, + const NProtoBuf::RepeatedPtrField<NKikimrBlobStorage::TEvVPut::TExtraBlockCheck>& extraBlockChecks) { // check blocked if (!ignoreBlock) { @@ -174,6 +175,18 @@ namespace NKikimr { case TBlocksCache::EStatus::BLOCKED_INFLIGH: return {NKikimrProto::BLOCKED, "blocked", res.Lsn, true}; } + + for (const auto& item : extraBlockChecks) { + auto res = BlocksCache.IsBlocked(item.GetTabletId(), {item.GetGeneration(), 0}); + switch (res.Status) { + case TBlocksCache::EStatus::OK: + break; + case TBlocksCache::EStatus::BLOCKED_PERS: + return {NKikimrProto::BLOCKED, "blocked", 0, false}; + case TBlocksCache::EStatus::BLOCKED_INFLIGH: + return {NKikimrProto::BLOCKED, "blocked", res.Lsn, true}; + } + } } ValidateWriteQuery(ctx, id); diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h index 309c3d52c2f..60c68a51114 100644 --- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h +++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h @@ -92,7 +92,8 @@ namespace NKikimr { THullCheckStatus CheckLogoBlob( const TActorContext &ctx, const TLogoBlobID &id, - bool ignoreBlock); + bool ignoreBlock, + const NProtoBuf::RepeatedPtrField<NKikimrBlobStorage::TEvVPut::TExtraBlockCheck>& extraBlockChecks); void AddLogoBlob( const TActorContext &ctx, diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp index a5f5483bbdc..e89088f0864 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp @@ -301,12 +301,16 @@ namespace NKikimr { TLsnSeg Lsn = {}; THullCheckStatus HullStatus; bool IsHugeBlob = false; + NProtoBuf::RepeatedPtrField<NKikimrBlobStorage::TEvVPut::TExtraBlockCheck> ExtraBlockChecks; - TVPutInfo(TLogoBlobID blobId, TRope &&buffer) + TVPutInfo(TLogoBlobID blobId, TRope &&buffer, + NProtoBuf::RepeatedPtrField<NKikimrBlobStorage::TEvVPut::TExtraBlockCheck> *extraBlockChecks) : Buffer(std::move(buffer)) , BlobId(blobId) - , HullStatus({NKikimrProto::UNKNOWN, 0 ,false}) - {} + , HullStatus({NKikimrProto::UNKNOWN, 0, false}) + { + ExtraBlockChecks.Swap(extraBlockChecks); + } }; void UpdatePDiskWriteBytes(size_t size) { @@ -378,11 +382,12 @@ namespace NKikimr { std::move(info.Buffer), *Arena); UpdatePDiskWriteBytes(info.Buffer.GetSize()); return std::make_unique<TEvHullWriteHugeBlob>(sender, cookie, info.BlobId, info.Ingress, - std::move(info.Buffer), ignoreBlock, handleClass, std::move(res)); + std::move(info.Buffer), ignoreBlock, handleClass, std::move(res), &info.ExtraBlockChecks); } THullCheckStatus ValidateVPut(const TActorContext &ctx, TString evPrefix, - TLogoBlobID id, ui64 bufSize, bool ignoreBlock) + TLogoBlobID id, ui64 bufSize, bool ignoreBlock, + const NProtoBuf::RepeatedPtrField<NKikimrBlobStorage::TEvVPut::TExtraBlockCheck>& extraBlockChecks) { ui64 blobPartSize = 0; try { @@ -411,7 +416,7 @@ namespace NKikimr { return {NKikimrProto::ERROR, "buffer is too large"}; } - auto status = Hull->CheckLogoBlob(ctx, id, ignoreBlock); + auto status = Hull->CheckLogoBlob(ctx, id, ignoreBlock, extraBlockChecks); if (status.Status != NKikimrProto::OK) { LOG_ERROR_S(ctx, BS_VDISK_PUT, VCtx->VDiskLogPrefix << evPrefix << ": failed to pass the Hull check;" << " id# " << id @@ -475,9 +480,9 @@ namespace NKikimr { TBatchedVec<TVPutInfo> putsInfo; ui64 lsnCount = 0; for (ui64 itemIdx = 0; itemIdx < record.ItemsSize(); ++itemIdx) { - auto &item = record.GetItems(itemIdx); + auto &item = *record.MutableItems(itemIdx); TLogoBlobID blobId = LogoBlobIDFromLogoBlobID(item.GetBlobID()); - putsInfo.emplace_back(blobId, ev->Get()->GetItemBuffer(itemIdx)); + putsInfo.emplace_back(blobId, ev->Get()->GetItemBuffer(itemIdx), item.MutableExtraBlockChecks()); TVPutInfo &info = putsInfo.back(); try { @@ -488,7 +493,8 @@ namespace NKikimr { } if (info.HullStatus.Status == NKikimrProto::UNKNOWN) { - info.HullStatus = ValidateVPut(ctx, "TEvVMultiPut", blobId, info.Buffer.GetSize(), ignoreBlock); + info.HullStatus = ValidateVPut(ctx, "TEvVMultiPut", blobId, info.Buffer.GetSize(), ignoreBlock, + info.ExtraBlockChecks); } if (info.HullStatus.Status == NKikimrProto::OK) { @@ -587,7 +593,7 @@ namespace NKikimr { ctx.Send(Db->HugeKeeperID, hugeWrite.release(), 0, 0, NWilson::TTraceId(traceId)); } else { ctx.Send(SelfId(), new TEvHullLogHugeBlob(0, info.BlobId, info.Ingress, TDiskPart(), - ignoreBlock, vMultiPutActorId, cookie, std::move(result))); + ignoreBlock, vMultiPutActorId, cookie, std::move(result), &info.ExtraBlockChecks)); } } else { Y_VERIFY(lsnBatch.First <= lsnBatch.Last); @@ -648,7 +654,7 @@ namespace NKikimr { const TLogoBlobID id = LogoBlobIDFromLogoBlobID(record.GetBlobID()); LWTRACK(VDiskSkeletonVPutRecieved, ev->Get()->Orbit, VCtx->NodeId, VCtx->GroupId, VCtx->Top->GetFailDomainOrderNumber(VCtx->ShortSelfVDisk), id.TabletID(), id.BlobSize()); - TVPutInfo info(id, ev->Get()->GetBuffer()); + TVPutInfo info(id, ev->Get()->GetBuffer(), record.MutableExtraBlockChecks()); const ui64 bufSize = info.Buffer.GetSize(); try { @@ -674,7 +680,7 @@ namespace NKikimr { return; } - info.HullStatus = ValidateVPut(ctx, "TEvVPut", id, bufSize, ignoreBlock); + info.HullStatus = ValidateVPut(ctx, "TEvVPut", id, bufSize, ignoreBlock, info.ExtraBlockChecks); if (info.HullStatus.Status != NKikimrProto::OK) { ReplyError(info.HullStatus, ev, ctx, now); return; @@ -720,7 +726,7 @@ namespace NKikimr { ctx.Send(Db->HugeKeeperID, hugeWrite.release(), 0, 0, std::move(ev->TraceId)); } else { ctx.Send(SelfId(), new TEvHullLogHugeBlob(0, info.BlobId, info.Ingress, TDiskPart(), - ignoreBlock, ev->Sender, ev->Cookie, std::move(result))); + ignoreBlock, ev->Sender, ev->Cookie, std::move(result), &info.ExtraBlockChecks)); } } @@ -729,7 +735,7 @@ namespace NKikimr { // update hull write duration msg->Result->MarkHugeWriteTime(); - auto status = Hull->CheckLogoBlob(ctx, msg->LogoBlobID, msg->IgnoreBlock); + auto status = Hull->CheckLogoBlob(ctx, msg->LogoBlobID, msg->IgnoreBlock, msg->ExtraBlockChecks); if (status.Status != NKikimrProto::OK) { msg->Result->UpdateStatus(status.Status); // modify status in result LOG_DEBUG_S(ctx, BS_VDISK_PUT, VCtx->VDiskLogPrefix @@ -1541,9 +1547,10 @@ namespace NKikimr { TIngress ingress = *TIngress::CreateIngressWithLocal(VCtx->Top.get(), SelfVDiskId, id); if (buf) { ctx.Send(Db->HugeKeeperID, new TEvHullWriteHugeBlob(ev->Sender, ev->Cookie, id, ingress, std::move(buf), - true, NKikimrBlobStorage::EPutHandleClass::AsyncBlob, std::move(result))); + true, NKikimrBlobStorage::EPutHandleClass::AsyncBlob, std::move(result), nullptr)); } else { - ctx.Send(SelfId(), new TEvHullLogHugeBlob(0, id, ingress, TDiskPart(), true, ev->Sender, ev->Cookie, std::move(result))); + ctx.Send(SelfId(), new TEvHullLogHugeBlob(0, id, ingress, TDiskPart(), true, ev->Sender, ev->Cookie, + std::move(result), nullptr)); } } diff --git a/ydb/core/protos/blobstorage.proto b/ydb/core/protos/blobstorage.proto index 632eda54dcb..d9f07a932ac 100644 --- a/ydb/core/protos/blobstorage.proto +++ b/ydb/core/protos/blobstorage.proto @@ -356,6 +356,11 @@ message TEvVInplacePatchResult { } message TEvVPut { + message TExtraBlockCheck { + optional fixed64 TabletId = 1; + optional uint32 Generation = 2; + } + optional NKikimrProto.TLogoBlobID BlobID = 1; optional bytes Buffer = 2; @@ -368,6 +373,8 @@ message TEvVPut { optional EPutHandleClass HandleClass = 9; optional TMsgQoS MsgQoS = 10; optional TTimestamps Timestamps = 23; + + repeated TExtraBlockCheck ExtraBlockChecks = 11; } message TEvVPutResult { @@ -393,6 +400,8 @@ message TVMultiPutItem { optional uint64 FullDataSize = 3; optional uint64 Cookie = 4; + + repeated TEvVPut.TExtraBlockCheck ExtraBlockChecks = 5; } message TEvVMultiPut { @@ -630,6 +639,7 @@ message TEvVCheckReadinessResult { //optional bool SupportProtoWithPayload = 2; optional TMessageId ExpectedMsgId = 3; optional TVDiskCostSettings CostSettings = 4; + optional bool ExtraBlockChecksSupport = 5; optional TGroupInfo RecentGroup = 100; // filled in by VDisk on RACE when VDisk has newer generation } |