diff options
author | alexvru <alexvru@ydb.tech> | 2022-12-20 11:52:39 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2022-12-20 11:52:39 +0300 |
commit | 04d2eb996bc6cb6ae828ef1e725ef007c7048ab6 (patch) | |
tree | bba7d4e68d358fe30b450ee6366fbdc24b1ea5df | |
parent | fbd79db653a5787d6025b0ed207ebde59748a16d (diff) | |
download | ydb-04d2eb996bc6cb6ae828ef1e725ef007c7048ab6.tar.gz |
Report writes beyond barrier and check them in BlobDepot
11 files changed, 54 insertions, 16 deletions
diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h index b72c2639cc..d7bd827d8a 100644 --- a/ydb/core/base/blobstorage.h +++ b/ydb/core/base/blobstorage.h @@ -985,6 +985,7 @@ struct TEvBlobStorage { const ui32 GroupId; const float ApproximateFreeSpaceShare; // 0.f has special meaning 'data could not be obtained' TString ErrorReason; + bool WrittenBeyondBarrier = false; // was this blob written beyond the barrier? mutable NLWTrace::TOrbit Orbit; TEvPutResult(NKikimrProto::EReplyStatus status, const TLogoBlobID &id, const TStorageStatusFlags statusFlags, diff --git a/ydb/core/blob_depot/agent/storage_put.cpp b/ydb/core/blob_depot/agent/storage_put.cpp index 30d0ebc8b9..8f52d42e93 100644 --- a/ydb/core/blob_depot/agent/storage_put.cpp +++ b/ydb/core/blob_depot/agent/storage_put.cpp @@ -14,6 +14,7 @@ namespace NKikimr::NBlobDepot { bool PutsIssued = false; bool WaitingForCommitBlobSeq = false; bool IsInFlight = false; + bool WrittenBeyondBarrier = false; NKikimrBlobDepot::TEvCommitBlobSeq CommitBlobSeq; TBlobSeqId BlobSeqId; @@ -214,6 +215,10 @@ namespace NKikimr::NBlobDepot { BDEV_QUERY(BDEV11, "TEvPut_resultFromProxy", (BlobId, msg.Id), (Status, msg.Status), (ErrorReason, msg.ErrorReason)); + if (msg.Status == NKikimrProto::OK && msg.WrittenBeyondBarrier) { + WrittenBeyondBarrier = true; + } + --PutsInFlight; if (msg.Status != NKikimrProto::OK) { EndWithError(msg.Status, std::move(msg.ErrorReason)); @@ -274,6 +279,9 @@ namespace NKikimr::NBlobDepot { IssueCommitBlobSeq(false); } + // ensure that blob was written not beyond the barrier, or it will be lost otherwise + Y_VERIFY(!WrittenBeyondBarrier); + TBlobStorageQuery::EndWithSuccess(std::make_unique<TEvBlobStorage::TEvPutResult>(NKikimrProto::OK, Request.Id, Agent.GetStorageStatusFlags(), Agent.VirtualGroupId, Agent.GetApproximateFreeSpaceShare())); } diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp index 57f9c320a3..5c682482e6 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp @@ -352,6 +352,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt RootCauseTrack.RenderTrack(PutImpl.Blobs[blobIdx].Orbit); LWTRACK(DSProxyPutReply, PutImpl.Blobs[blobIdx].Orbit); putResult->Orbit = std::move(PutImpl.Blobs[blobIdx].Orbit); + putResult->WrittenBeyondBarrier = PutImpl.WrittenBeyondBarrier[blobIdx]; if (!IsManyPuts) { SendResponse(std::move(putResult), TimeStatsEnabled ? &TimeStats : nullptr); } else { diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h index 8622f50e67..b9ef53f45a 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h @@ -29,6 +29,7 @@ private: TBlackboard Blackboard; TBatchedVec<bool> IsDone; + TBatchedVec<bool> WrittenBeyondBarrier; TStorageStatusFlags StatusFlags; float ApproximateFreeSpaceShare; TIntrusivePtr<TBlobStorageGroupProxyMon> Mon; @@ -109,6 +110,7 @@ public: , Info(info) , Blackboard(info, state, ev->HandleClass, NKikimrBlobStorage::EGetHandleClass::AsyncRead, false) , IsDone(1) + , WrittenBeyondBarrier(1) , StatusFlags(0) , ApproximateFreeSpaceShare(0.f) , Mon(mon) @@ -131,6 +133,7 @@ public: , Info(info) , Blackboard(info, state, putHandleClass, NKikimrBlobStorage::EGetHandleClass::AsyncRead, false) , IsDone(events.size()) + , WrittenBeyondBarrier(events.size()) , StatusFlags(0) , ApproximateFreeSpaceShare(0.f) , Mon(mon) @@ -304,6 +307,7 @@ public: case NKikimrProto::OK: case NKikimrProto::ALREADY: Blackboard.AddPutOkResponse(blobId, orderNumber); + WrittenBeyondBarrier[blobIdx] = record.GetWrittenBeyondBarrier(); break; default: ErrorDescription = TStringBuilder() << "Unexpected status# " << status; diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_events.h b/ydb/core/blobstorage/vdisk/common/vdisk_events.h index 6eedc4343d..f5521c25be 100644 --- a/ydb/core/blobstorage/vdisk/common/vdisk_events.h +++ b/ydb/core/blobstorage/vdisk/common/vdisk_events.h @@ -938,7 +938,7 @@ namespace NKikimr { } void AddVPutResult(NKikimrProto::EReplyStatus status, const TString& errorReason, const TLogoBlobID &logoBlobId, - ui64 *cookie, ui32 statusFlags = 0) + ui64 *cookie, ui32 statusFlags = 0, bool writtenBeyondBarrier = false) { NKikimrBlobStorage::TVMultiPutResultItem *item = Record.AddItems(); item->SetStatus(status); @@ -950,6 +950,9 @@ namespace NKikimr { item->SetCookie(*cookie); } item->SetStatusFlags(statusFlags); + if (writtenBeyondBarrier) { + item->SetWrittenBeyondBarrier(true); + } } void MakeError(NKikimrProto::EReplyStatus status, const TString& errorReason, diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp index 940cae78c9..3737df1151 100644 --- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp +++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp @@ -91,7 +91,7 @@ namespace NKikimr { //////////////////////////////////////////////////////////////////////////// // Private //////////////////////////////////////////////////////////////////////////// - void THull::ValidateWriteQuery(const TActorContext &ctx, const TLogoBlobID &id) { + void THull::ValidateWriteQuery(const TActorContext &ctx, const TLogoBlobID &id, bool *writtenBeyondBarrier) { if (Fields->BarrierValidation) { // ensure that the new blob would not fall under GC TString explanation; @@ -100,6 +100,7 @@ namespace NKikimr { VDISKP(HullDs->HullCtx->VCtx->VDiskLogPrefix, "Db# LogoBlobs; putting blob beyond the barrier id# %s barrier# %s", id.ToString().data(), explanation.data())); + *writtenBeyondBarrier = true; } } } @@ -163,7 +164,8 @@ namespace NKikimr { const TActorContext &ctx, const TLogoBlobID &id, bool ignoreBlock, - const NProtoBuf::RepeatedPtrField<NKikimrBlobStorage::TEvVPut::TExtraBlockCheck>& extraBlockChecks) + const NProtoBuf::RepeatedPtrField<NKikimrBlobStorage::TEvVPut::TExtraBlockCheck>& extraBlockChecks, + bool *writtenBeyondBarrier) { // check blocked if (!ignoreBlock) { @@ -190,7 +192,7 @@ namespace NKikimr { } } - ValidateWriteQuery(ctx, id); + ValidateWriteQuery(ctx, id, writtenBeyondBarrier); return {NKikimrProto::OK, 0, false}; } diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h index e5a0170137..c6d74408e3 100644 --- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h +++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h @@ -41,7 +41,7 @@ namespace NKikimr { struct TFields; std::unique_ptr<TFields> Fields; - void ValidateWriteQuery(const TActorContext &ctx, const TLogoBlobID &id); + void ValidateWriteQuery(const TActorContext &ctx, const TLogoBlobID &id, bool *writtenBeyondBarrier); // validate GC barrier command against existing barriers metabase (ensure that keys are // coming in ascending order, CollectGen/CollectStep pairs do not decrease and that keys @@ -95,7 +95,8 @@ namespace NKikimr { const TActorContext &ctx, const TLogoBlobID &id, bool ignoreBlock, - const NProtoBuf::RepeatedPtrField<NKikimrBlobStorage::TEvVPut::TExtraBlockCheck>& extraBlockChecks); + const NProtoBuf::RepeatedPtrField<NKikimrBlobStorage::TEvVPut::TExtraBlockCheck>& extraBlockChecks, + bool *writtenBeyondBarrier); void AddLogoBlob( const TActorContext &ctx, diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp index 33d6a39c24..df0158e208 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp @@ -326,6 +326,7 @@ namespace NKikimr { bool IsHugeBlob = false; NProtoBuf::RepeatedPtrField<NKikimrBlobStorage::TEvVPut::TExtraBlockCheck> ExtraBlockChecks; NWilson::TTraceId TraceId; + bool WrittenBeyondBarrier = false; TVPutInfo(TLogoBlobID blobId, TRope &&buffer, NProtoBuf::RepeatedPtrField<NKikimrBlobStorage::TEvVPut::TExtraBlockCheck> *extraBlockChecks, @@ -407,7 +408,8 @@ namespace NKikimr { THullCheckStatus ValidateVPut(const TActorContext &ctx, TString evPrefix, TLogoBlobID id, ui64 bufSize, bool ignoreBlock, - const NProtoBuf::RepeatedPtrField<NKikimrBlobStorage::TEvVPut::TExtraBlockCheck>& extraBlockChecks) + const NProtoBuf::RepeatedPtrField<NKikimrBlobStorage::TEvVPut::TExtraBlockCheck>& extraBlockChecks, + bool *writtenBeyondBarrier) { ui64 blobPartSize = 0; try { @@ -436,7 +438,7 @@ namespace NKikimr { return {NKikimrProto::ERROR, "buffer is too large"}; } - auto status = Hull->CheckLogoBlob(ctx, id, ignoreBlock, extraBlockChecks); + auto status = Hull->CheckLogoBlob(ctx, id, ignoreBlock, extraBlockChecks, writtenBeyondBarrier); if (status.Status != NKikimrProto::OK) { LOG_ERROR_S(ctx, BS_VDISK_PUT, VCtx->VDiskLogPrefix << evPrefix << ": failed to pass the Hull check;" << " id# " << id @@ -515,7 +517,7 @@ namespace NKikimr { if (info.HullStatus.Status == NKikimrProto::UNKNOWN) { info.HullStatus = ValidateVPut(ctx, "TEvVMultiPut", blobId, info.Buffer.GetSize(), ignoreBlock, - info.ExtraBlockChecks); + info.ExtraBlockChecks, &info.WrittenBeyondBarrier); } if (info.HullStatus.Status == NKikimrProto::OK) { @@ -586,7 +588,8 @@ namespace NKikimr { const TString& errorReason = info.HullStatus.ErrorReason; if (info.HullStatus.Postponed) { - auto result = std::make_unique<TEvVMultiPutItemResult>(info.BlobId, itemIdx, status, errorReason); + auto result = std::make_unique<TEvVMultiPutItemResult>(info.BlobId, itemIdx, status, errorReason, + info.WrittenBeyondBarrier); Hull->PostponeReplyUntilCommitted(result.release(), vMultiPutActorId, itemIdx, std::move(info.TraceId), info.HullStatus.Lsn); continue; @@ -622,8 +625,8 @@ namespace NKikimr { info.Lsn = TLsnSeg(lsnBatch.First, lsnBatch.First); lsnBatch.First++; - std::unique_ptr<TEvVMultiPutItemResult> evItemResult( - new TEvVMultiPutItemResult(info.BlobId, itemIdx, status, errorReason)); + auto evItemResult = std::make_unique<TEvVMultiPutItemResult>(info.BlobId, itemIdx, status, errorReason, + info.WrittenBeyondBarrier); auto logMsg = CreatePutLogEvent(ctx, "TEvVMultiPut", vMultiPutActorId, cookie, std::move(orbit), info, std::move(evItemResult)); evLogs->AddLog(THolder<NPDisk::TEvLog>(logMsg.release())); @@ -702,7 +705,7 @@ namespace NKikimr { return; } - info.HullStatus = ValidateVPut(ctx, "TEvVPut", id, bufSize, ignoreBlock, info.ExtraBlockChecks); + info.HullStatus = ValidateVPut(ctx, "TEvVPut", id, bufSize, ignoreBlock, info.ExtraBlockChecks, &info.WrittenBeyondBarrier); if (info.HullStatus.Status != NKikimrProto::OK) { ReplyError(info.HullStatus, ev, ctx, now); return; @@ -732,6 +735,9 @@ namespace NKikimr { // no more errors (at least for for log writes) std::unique_ptr<TEvBlobStorage::TEvVPutResult> result = CreateResult(VCtx, NKikimrProto::OK, TString(), ev, now, SkeletonFrontIDPtr, SelfVDiskId, Db->GetVDiskIncarnationGuid()); + if (info.WrittenBeyondBarrier) { + result->Record.SetWrittenBeyondBarrier(true); + } // Manage PDisk scheduler weights OverloadHandler->ActualizeWeights(ctx, Mask(EHullDbType::LogoBlobs)); @@ -757,7 +763,8 @@ namespace NKikimr { // update hull write duration msg->Result->MarkHugeWriteTime(); - auto status = Hull->CheckLogoBlob(ctx, msg->LogoBlobID, msg->IgnoreBlock, msg->ExtraBlockChecks); + bool writtenBeyondBarrier = false; + auto status = Hull->CheckLogoBlob(ctx, msg->LogoBlobID, msg->IgnoreBlock, msg->ExtraBlockChecks, &writtenBeyondBarrier); if (status.Status != NKikimrProto::OK) { msg->Result->UpdateStatus(status.Status); // modify status in result LOG_DEBUG_S(ctx, BS_VDISK_PUT, VCtx->VDiskLogPrefix @@ -774,6 +781,8 @@ namespace NKikimr { } return; + } else if (writtenBeyondBarrier) { + msg->Result->Record.SetWrittenBeyondBarrier(true); } #ifdef OPTIMIZE_SYNC diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmultiput_actor.cpp b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmultiput_actor.cpp index e34428b8ed..1f3e7fee0f 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmultiput_actor.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmultiput_actor.cpp @@ -17,6 +17,7 @@ namespace NKikimr { ui32 StatusFlags = 0; bool Received = false; bool HasCookie = false; + bool WrittenBeyondBarrier; TString ToString() const { return TStringBuilder() @@ -28,6 +29,7 @@ namespace NKikimr { << " Cookie# " << Cookie << " StatusFlags# " << NPDisk::StatusFlagsToString(StatusFlags) << " Received# " << Received + << " WrittenBeyondBarrier# " << WrittenBeyondBarrier << " }"; } }; @@ -85,7 +87,7 @@ namespace NKikimr { for (ui64 idx = 0; idx < Items.size(); ++idx) { TItem &result = Items[idx]; vMultiPutResult->AddVPutResult(result.Status, result.ErrorReason, result.BlobId, - result.HasCookie ? &result.Cookie : nullptr, result.StatusFlags); + result.HasCookie ? &result.Cookie : nullptr, result.StatusFlags, result.WrittenBeyondBarrier); } vMultiPutResult->Record.SetStatusFlags(OOSStatus.Flags); @@ -105,6 +107,7 @@ namespace NKikimr { item.Received = true; item.Status = ev->Get()->Status; item.ErrorReason = ev->Get()->ErrorReason; + item.WrittenBeyondBarrier = ev->Get()->WrittenBeyondBarrier; ReceivedResults++; @@ -128,6 +131,7 @@ namespace NKikimr { Y_VERIFY(record.HasStatus()); item.Status = record.GetStatus(); item.ErrorReason = record.GetErrorReason(); + item.WrittenBeyondBarrier = record.GetWrittenBeyondBarrier(); ReceivedResults++; diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmultiput_actor.h b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmultiput_actor.h index 18d01db17e..6ed7199459 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmultiput_actor.h +++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmultiput_actor.h @@ -13,13 +13,16 @@ struct TEvVMultiPutItemResult : TEventLocal<TEvVMultiPutItemResult, TEvBlobStora ui64 ItemIdx; NKikimrProto::EReplyStatus Status; TString ErrorReason; + bool WrittenBeyondBarrier; - TEvVMultiPutItemResult(TLogoBlobID id, ui64 itemIdx, NKikimrProto::EReplyStatus status, TString errorReason) + TEvVMultiPutItemResult(TLogoBlobID id, ui64 itemIdx, NKikimrProto::EReplyStatus status, TString errorReason, + bool writtenBeyondBarrier) : TEventLocal() , BlobId(id) , ItemIdx(itemIdx) , Status(status) , ErrorReason(std::move(errorReason)) + , WrittenBeyondBarrier(writtenBeyondBarrier) {} }; diff --git a/ydb/core/protos/blobstorage.proto b/ydb/core/protos/blobstorage.proto index a514e3cda1..2e7d41db24 100644 --- a/ydb/core/protos/blobstorage.proto +++ b/ydb/core/protos/blobstorage.proto @@ -392,6 +392,7 @@ message TEvVPutResult { optional NKikimrProto.EReplyStatus Status = 1; optional string ErrorReason = 200; // textual description of error optional NKikimrProto.TLogoBlobID BlobID = 2; + optional bool WrittenBeyondBarrier = 6; optional NKikimrBlobStorage.TVDiskID VDiskID = 3; optional uint64 Cookie = 4; @@ -434,6 +435,7 @@ message TVMultiPutResultItem { optional NKikimrProto.EReplyStatus Status = 1; optional string ErrorReason = 200; optional NKikimrProto.TLogoBlobID BlobID = 2; + optional bool WrittenBeyondBarrier = 5; optional uint64 Cookie = 3; optional uint32 StatusFlags = 4 [default = 0]; |