aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2022-12-20 11:52:39 +0300
committeralexvru <alexvru@ydb.tech>2022-12-20 11:52:39 +0300
commit04d2eb996bc6cb6ae828ef1e725ef007c7048ab6 (patch)
treebba7d4e68d358fe30b450ee6366fbdc24b1ea5df
parentfbd79db653a5787d6025b0ed207ebde59748a16d (diff)
downloadydb-04d2eb996bc6cb6ae828ef1e725ef007c7048ab6.tar.gz
Report writes beyond barrier and check them in BlobDepot
-rw-r--r--ydb/core/base/blobstorage.h1
-rw-r--r--ydb/core/blob_depot/agent/storage_put.cpp8
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_put.cpp1
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h4
-rw-r--r--ydb/core/blobstorage/vdisk/common/vdisk_events.h5
-rw-r--r--ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp8
-rw-r--r--ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h5
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp25
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/skeleton_vmultiput_actor.cpp6
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/skeleton_vmultiput_actor.h5
-rw-r--r--ydb/core/protos/blobstorage.proto2
11 files changed, 54 insertions, 16 deletions
diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h
index b72c2639cc..d7bd827d8a 100644
--- a/ydb/core/base/blobstorage.h
+++ b/ydb/core/base/blobstorage.h
@@ -985,6 +985,7 @@ struct TEvBlobStorage {
const ui32 GroupId;
const float ApproximateFreeSpaceShare; // 0.f has special meaning 'data could not be obtained'
TString ErrorReason;
+ bool WrittenBeyondBarrier = false; // was this blob written beyond the barrier?
mutable NLWTrace::TOrbit Orbit;
TEvPutResult(NKikimrProto::EReplyStatus status, const TLogoBlobID &id, const TStorageStatusFlags statusFlags,
diff --git a/ydb/core/blob_depot/agent/storage_put.cpp b/ydb/core/blob_depot/agent/storage_put.cpp
index 30d0ebc8b9..8f52d42e93 100644
--- a/ydb/core/blob_depot/agent/storage_put.cpp
+++ b/ydb/core/blob_depot/agent/storage_put.cpp
@@ -14,6 +14,7 @@ namespace NKikimr::NBlobDepot {
bool PutsIssued = false;
bool WaitingForCommitBlobSeq = false;
bool IsInFlight = false;
+ bool WrittenBeyondBarrier = false;
NKikimrBlobDepot::TEvCommitBlobSeq CommitBlobSeq;
TBlobSeqId BlobSeqId;
@@ -214,6 +215,10 @@ namespace NKikimr::NBlobDepot {
BDEV_QUERY(BDEV11, "TEvPut_resultFromProxy", (BlobId, msg.Id), (Status, msg.Status),
(ErrorReason, msg.ErrorReason));
+ if (msg.Status == NKikimrProto::OK && msg.WrittenBeyondBarrier) {
+ WrittenBeyondBarrier = true;
+ }
+
--PutsInFlight;
if (msg.Status != NKikimrProto::OK) {
EndWithError(msg.Status, std::move(msg.ErrorReason));
@@ -274,6 +279,9 @@ namespace NKikimr::NBlobDepot {
IssueCommitBlobSeq(false);
}
+ // ensure that blob was written not beyond the barrier, or it will be lost otherwise
+ Y_VERIFY(!WrittenBeyondBarrier);
+
TBlobStorageQuery::EndWithSuccess(std::make_unique<TEvBlobStorage::TEvPutResult>(NKikimrProto::OK, Request.Id,
Agent.GetStorageStatusFlags(), Agent.VirtualGroupId, Agent.GetApproximateFreeSpaceShare()));
}
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp
index 57f9c320a3..5c682482e6 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp
@@ -352,6 +352,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
RootCauseTrack.RenderTrack(PutImpl.Blobs[blobIdx].Orbit);
LWTRACK(DSProxyPutReply, PutImpl.Blobs[blobIdx].Orbit);
putResult->Orbit = std::move(PutImpl.Blobs[blobIdx].Orbit);
+ putResult->WrittenBeyondBarrier = PutImpl.WrittenBeyondBarrier[blobIdx];
if (!IsManyPuts) {
SendResponse(std::move(putResult), TimeStatsEnabled ? &TimeStats : nullptr);
} else {
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h
index 8622f50e67..b9ef53f45a 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h
@@ -29,6 +29,7 @@ private:
TBlackboard Blackboard;
TBatchedVec<bool> IsDone;
+ TBatchedVec<bool> WrittenBeyondBarrier;
TStorageStatusFlags StatusFlags;
float ApproximateFreeSpaceShare;
TIntrusivePtr<TBlobStorageGroupProxyMon> Mon;
@@ -109,6 +110,7 @@ public:
, Info(info)
, Blackboard(info, state, ev->HandleClass, NKikimrBlobStorage::EGetHandleClass::AsyncRead, false)
, IsDone(1)
+ , WrittenBeyondBarrier(1)
, StatusFlags(0)
, ApproximateFreeSpaceShare(0.f)
, Mon(mon)
@@ -131,6 +133,7 @@ public:
, Info(info)
, Blackboard(info, state, putHandleClass, NKikimrBlobStorage::EGetHandleClass::AsyncRead, false)
, IsDone(events.size())
+ , WrittenBeyondBarrier(events.size())
, StatusFlags(0)
, ApproximateFreeSpaceShare(0.f)
, Mon(mon)
@@ -304,6 +307,7 @@ public:
case NKikimrProto::OK:
case NKikimrProto::ALREADY:
Blackboard.AddPutOkResponse(blobId, orderNumber);
+ WrittenBeyondBarrier[blobIdx] = record.GetWrittenBeyondBarrier();
break;
default:
ErrorDescription = TStringBuilder() << "Unexpected status# " << status;
diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_events.h b/ydb/core/blobstorage/vdisk/common/vdisk_events.h
index 6eedc4343d..f5521c25be 100644
--- a/ydb/core/blobstorage/vdisk/common/vdisk_events.h
+++ b/ydb/core/blobstorage/vdisk/common/vdisk_events.h
@@ -938,7 +938,7 @@ namespace NKikimr {
}
void AddVPutResult(NKikimrProto::EReplyStatus status, const TString& errorReason, const TLogoBlobID &logoBlobId,
- ui64 *cookie, ui32 statusFlags = 0)
+ ui64 *cookie, ui32 statusFlags = 0, bool writtenBeyondBarrier = false)
{
NKikimrBlobStorage::TVMultiPutResultItem *item = Record.AddItems();
item->SetStatus(status);
@@ -950,6 +950,9 @@ namespace NKikimr {
item->SetCookie(*cookie);
}
item->SetStatusFlags(statusFlags);
+ if (writtenBeyondBarrier) {
+ item->SetWrittenBeyondBarrier(true);
+ }
}
void MakeError(NKikimrProto::EReplyStatus status, const TString& errorReason,
diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp
index 940cae78c9..3737df1151 100644
--- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp
+++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp
@@ -91,7 +91,7 @@ namespace NKikimr {
////////////////////////////////////////////////////////////////////////////
// Private
////////////////////////////////////////////////////////////////////////////
- void THull::ValidateWriteQuery(const TActorContext &ctx, const TLogoBlobID &id) {
+ void THull::ValidateWriteQuery(const TActorContext &ctx, const TLogoBlobID &id, bool *writtenBeyondBarrier) {
if (Fields->BarrierValidation) {
// ensure that the new blob would not fall under GC
TString explanation;
@@ -100,6 +100,7 @@ namespace NKikimr {
VDISKP(HullDs->HullCtx->VCtx->VDiskLogPrefix,
"Db# LogoBlobs; putting blob beyond the barrier id# %s barrier# %s",
id.ToString().data(), explanation.data()));
+ *writtenBeyondBarrier = true;
}
}
}
@@ -163,7 +164,8 @@ namespace NKikimr {
const TActorContext &ctx,
const TLogoBlobID &id,
bool ignoreBlock,
- const NProtoBuf::RepeatedPtrField<NKikimrBlobStorage::TEvVPut::TExtraBlockCheck>& extraBlockChecks)
+ const NProtoBuf::RepeatedPtrField<NKikimrBlobStorage::TEvVPut::TExtraBlockCheck>& extraBlockChecks,
+ bool *writtenBeyondBarrier)
{
// check blocked
if (!ignoreBlock) {
@@ -190,7 +192,7 @@ namespace NKikimr {
}
}
- ValidateWriteQuery(ctx, id);
+ ValidateWriteQuery(ctx, id, writtenBeyondBarrier);
return {NKikimrProto::OK, 0, false};
}
diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h
index e5a0170137..c6d74408e3 100644
--- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h
+++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h
@@ -41,7 +41,7 @@ namespace NKikimr {
struct TFields;
std::unique_ptr<TFields> Fields;
- void ValidateWriteQuery(const TActorContext &ctx, const TLogoBlobID &id);
+ void ValidateWriteQuery(const TActorContext &ctx, const TLogoBlobID &id, bool *writtenBeyondBarrier);
// validate GC barrier command against existing barriers metabase (ensure that keys are
// coming in ascending order, CollectGen/CollectStep pairs do not decrease and that keys
@@ -95,7 +95,8 @@ namespace NKikimr {
const TActorContext &ctx,
const TLogoBlobID &id,
bool ignoreBlock,
- const NProtoBuf::RepeatedPtrField<NKikimrBlobStorage::TEvVPut::TExtraBlockCheck>& extraBlockChecks);
+ const NProtoBuf::RepeatedPtrField<NKikimrBlobStorage::TEvVPut::TExtraBlockCheck>& extraBlockChecks,
+ bool *writtenBeyondBarrier);
void AddLogoBlob(
const TActorContext &ctx,
diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp
index 33d6a39c24..df0158e208 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp
+++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp
@@ -326,6 +326,7 @@ namespace NKikimr {
bool IsHugeBlob = false;
NProtoBuf::RepeatedPtrField<NKikimrBlobStorage::TEvVPut::TExtraBlockCheck> ExtraBlockChecks;
NWilson::TTraceId TraceId;
+ bool WrittenBeyondBarrier = false;
TVPutInfo(TLogoBlobID blobId, TRope &&buffer,
NProtoBuf::RepeatedPtrField<NKikimrBlobStorage::TEvVPut::TExtraBlockCheck> *extraBlockChecks,
@@ -407,7 +408,8 @@ namespace NKikimr {
THullCheckStatus ValidateVPut(const TActorContext &ctx, TString evPrefix,
TLogoBlobID id, ui64 bufSize, bool ignoreBlock,
- const NProtoBuf::RepeatedPtrField<NKikimrBlobStorage::TEvVPut::TExtraBlockCheck>& extraBlockChecks)
+ const NProtoBuf::RepeatedPtrField<NKikimrBlobStorage::TEvVPut::TExtraBlockCheck>& extraBlockChecks,
+ bool *writtenBeyondBarrier)
{
ui64 blobPartSize = 0;
try {
@@ -436,7 +438,7 @@ namespace NKikimr {
return {NKikimrProto::ERROR, "buffer is too large"};
}
- auto status = Hull->CheckLogoBlob(ctx, id, ignoreBlock, extraBlockChecks);
+ auto status = Hull->CheckLogoBlob(ctx, id, ignoreBlock, extraBlockChecks, writtenBeyondBarrier);
if (status.Status != NKikimrProto::OK) {
LOG_ERROR_S(ctx, BS_VDISK_PUT, VCtx->VDiskLogPrefix << evPrefix << ": failed to pass the Hull check;"
<< " id# " << id
@@ -515,7 +517,7 @@ namespace NKikimr {
if (info.HullStatus.Status == NKikimrProto::UNKNOWN) {
info.HullStatus = ValidateVPut(ctx, "TEvVMultiPut", blobId, info.Buffer.GetSize(), ignoreBlock,
- info.ExtraBlockChecks);
+ info.ExtraBlockChecks, &info.WrittenBeyondBarrier);
}
if (info.HullStatus.Status == NKikimrProto::OK) {
@@ -586,7 +588,8 @@ namespace NKikimr {
const TString& errorReason = info.HullStatus.ErrorReason;
if (info.HullStatus.Postponed) {
- auto result = std::make_unique<TEvVMultiPutItemResult>(info.BlobId, itemIdx, status, errorReason);
+ auto result = std::make_unique<TEvVMultiPutItemResult>(info.BlobId, itemIdx, status, errorReason,
+ info.WrittenBeyondBarrier);
Hull->PostponeReplyUntilCommitted(result.release(), vMultiPutActorId, itemIdx, std::move(info.TraceId),
info.HullStatus.Lsn);
continue;
@@ -622,8 +625,8 @@ namespace NKikimr {
info.Lsn = TLsnSeg(lsnBatch.First, lsnBatch.First);
lsnBatch.First++;
- std::unique_ptr<TEvVMultiPutItemResult> evItemResult(
- new TEvVMultiPutItemResult(info.BlobId, itemIdx, status, errorReason));
+ auto evItemResult = std::make_unique<TEvVMultiPutItemResult>(info.BlobId, itemIdx, status, errorReason,
+ info.WrittenBeyondBarrier);
auto logMsg = CreatePutLogEvent(ctx, "TEvVMultiPut", vMultiPutActorId, cookie, std::move(orbit),
info, std::move(evItemResult));
evLogs->AddLog(THolder<NPDisk::TEvLog>(logMsg.release()));
@@ -702,7 +705,7 @@ namespace NKikimr {
return;
}
- info.HullStatus = ValidateVPut(ctx, "TEvVPut", id, bufSize, ignoreBlock, info.ExtraBlockChecks);
+ info.HullStatus = ValidateVPut(ctx, "TEvVPut", id, bufSize, ignoreBlock, info.ExtraBlockChecks, &info.WrittenBeyondBarrier);
if (info.HullStatus.Status != NKikimrProto::OK) {
ReplyError(info.HullStatus, ev, ctx, now);
return;
@@ -732,6 +735,9 @@ namespace NKikimr {
// no more errors (at least for for log writes)
std::unique_ptr<TEvBlobStorage::TEvVPutResult> result = CreateResult(VCtx, NKikimrProto::OK, TString(), ev, now,
SkeletonFrontIDPtr, SelfVDiskId, Db->GetVDiskIncarnationGuid());
+ if (info.WrittenBeyondBarrier) {
+ result->Record.SetWrittenBeyondBarrier(true);
+ }
// Manage PDisk scheduler weights
OverloadHandler->ActualizeWeights(ctx, Mask(EHullDbType::LogoBlobs));
@@ -757,7 +763,8 @@ namespace NKikimr {
// update hull write duration
msg->Result->MarkHugeWriteTime();
- auto status = Hull->CheckLogoBlob(ctx, msg->LogoBlobID, msg->IgnoreBlock, msg->ExtraBlockChecks);
+ bool writtenBeyondBarrier = false;
+ auto status = Hull->CheckLogoBlob(ctx, msg->LogoBlobID, msg->IgnoreBlock, msg->ExtraBlockChecks, &writtenBeyondBarrier);
if (status.Status != NKikimrProto::OK) {
msg->Result->UpdateStatus(status.Status); // modify status in result
LOG_DEBUG_S(ctx, BS_VDISK_PUT, VCtx->VDiskLogPrefix
@@ -774,6 +781,8 @@ namespace NKikimr {
}
return;
+ } else if (writtenBeyondBarrier) {
+ msg->Result->Record.SetWrittenBeyondBarrier(true);
}
#ifdef OPTIMIZE_SYNC
diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmultiput_actor.cpp b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmultiput_actor.cpp
index e34428b8ed..1f3e7fee0f 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmultiput_actor.cpp
+++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmultiput_actor.cpp
@@ -17,6 +17,7 @@ namespace NKikimr {
ui32 StatusFlags = 0;
bool Received = false;
bool HasCookie = false;
+ bool WrittenBeyondBarrier;
TString ToString() const {
return TStringBuilder()
@@ -28,6 +29,7 @@ namespace NKikimr {
<< " Cookie# " << Cookie
<< " StatusFlags# " << NPDisk::StatusFlagsToString(StatusFlags)
<< " Received# " << Received
+ << " WrittenBeyondBarrier# " << WrittenBeyondBarrier
<< " }";
}
};
@@ -85,7 +87,7 @@ namespace NKikimr {
for (ui64 idx = 0; idx < Items.size(); ++idx) {
TItem &result = Items[idx];
vMultiPutResult->AddVPutResult(result.Status, result.ErrorReason, result.BlobId,
- result.HasCookie ? &result.Cookie : nullptr, result.StatusFlags);
+ result.HasCookie ? &result.Cookie : nullptr, result.StatusFlags, result.WrittenBeyondBarrier);
}
vMultiPutResult->Record.SetStatusFlags(OOSStatus.Flags);
@@ -105,6 +107,7 @@ namespace NKikimr {
item.Received = true;
item.Status = ev->Get()->Status;
item.ErrorReason = ev->Get()->ErrorReason;
+ item.WrittenBeyondBarrier = ev->Get()->WrittenBeyondBarrier;
ReceivedResults++;
@@ -128,6 +131,7 @@ namespace NKikimr {
Y_VERIFY(record.HasStatus());
item.Status = record.GetStatus();
item.ErrorReason = record.GetErrorReason();
+ item.WrittenBeyondBarrier = record.GetWrittenBeyondBarrier();
ReceivedResults++;
diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmultiput_actor.h b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmultiput_actor.h
index 18d01db17e..6ed7199459 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmultiput_actor.h
+++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmultiput_actor.h
@@ -13,13 +13,16 @@ struct TEvVMultiPutItemResult : TEventLocal<TEvVMultiPutItemResult, TEvBlobStora
ui64 ItemIdx;
NKikimrProto::EReplyStatus Status;
TString ErrorReason;
+ bool WrittenBeyondBarrier;
- TEvVMultiPutItemResult(TLogoBlobID id, ui64 itemIdx, NKikimrProto::EReplyStatus status, TString errorReason)
+ TEvVMultiPutItemResult(TLogoBlobID id, ui64 itemIdx, NKikimrProto::EReplyStatus status, TString errorReason,
+ bool writtenBeyondBarrier)
: TEventLocal()
, BlobId(id)
, ItemIdx(itemIdx)
, Status(status)
, ErrorReason(std::move(errorReason))
+ , WrittenBeyondBarrier(writtenBeyondBarrier)
{}
};
diff --git a/ydb/core/protos/blobstorage.proto b/ydb/core/protos/blobstorage.proto
index a514e3cda1..2e7d41db24 100644
--- a/ydb/core/protos/blobstorage.proto
+++ b/ydb/core/protos/blobstorage.proto
@@ -392,6 +392,7 @@ message TEvVPutResult {
optional NKikimrProto.EReplyStatus Status = 1;
optional string ErrorReason = 200; // textual description of error
optional NKikimrProto.TLogoBlobID BlobID = 2;
+ optional bool WrittenBeyondBarrier = 6;
optional NKikimrBlobStorage.TVDiskID VDiskID = 3;
optional uint64 Cookie = 4;
@@ -434,6 +435,7 @@ message TVMultiPutResultItem {
optional NKikimrProto.EReplyStatus Status = 1;
optional string ErrorReason = 200;
optional NKikimrProto.TLogoBlobID BlobID = 2;
+ optional bool WrittenBeyondBarrier = 5;
optional uint64 Cookie = 3;
optional uint32 StatusFlags = 4 [default = 0];