aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexander Rutkovsky <alexander.rutkovsky@gmail.com>2022-06-28 13:44:34 +0300
committerAlexander Rutkovsky <alexander.rutkovsky@gmail.com>2022-06-28 13:44:34 +0300
commit3f2d373fed52ba10087ee33afe3842f6a678e518 (patch)
tree2ea5eb8766b6624d5bb4da655b96f27bb4467a57
parentb13fbd750496fa994449f37c8c8a49bed18ccd6e (diff)
downloadydb-3f2d373fed52ba10087ee33afe3842f6a678e518.tar.gz
Support ExtraBlockChecks in EvPuts KIKIMR-14867
ref:4acb5bec675beb89cfeb968755c7f60155644e3f
-rw-r--r--ydb/core/base/blobstorage.h1
-rw-r--r--ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp9
-rw-r--r--ydb/core/blobstorage/backpressure/queue_backpressure_client.h6
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp14
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h11
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp2
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_put.cpp25
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp20
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h71
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_state.cpp3
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp3
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3of4.h2
-rw-r--r--ydb/core/blobstorage/dsproxy/group_sessions.cpp10
-rw-r--r--ydb/core/blobstorage/dsproxy/group_sessions.h26
-rw-r--r--ydb/core/blobstorage/ut_blobstorage/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/blobstorage/ut_blobstorage/CMakeLists.linux.txt1
-rw-r--r--ydb/core/blobstorage/ut_blobstorage/extra_block_checks.cpp62
-rw-r--r--ydb/core/blobstorage/ut_blobstorage/incorrect_queries.cpp6
-rw-r--r--ydb/core/blobstorage/ut_vdisk/lib/helpers.cpp2
-rw-r--r--ydb/core/blobstorage/ut_vdisk/lib/test_huge.cpp2
-rw-r--r--ydb/core/blobstorage/vdisk/common/vdisk_events.h11
-rw-r--r--ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.cpp3
-rw-r--r--ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.h21
-rw-r--r--ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp15
-rw-r--r--ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h3
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp39
-rw-r--r--ydb/core/protos/blobstorage.proto10
27 files changed, 295 insertions, 84 deletions
diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h
index c9e6504c45f..dd442e1724e 100644
--- a/ydb/core/base/blobstorage.h
+++ b/ydb/core/base/blobstorage.h
@@ -890,6 +890,7 @@ struct TEvBlobStorage {
const ETactic Tactic;
mutable NLWTrace::TOrbit Orbit;
ui32 RestartCounter = 0;
+ std::vector<std::pair<ui64, ui32>> ExtraBlockChecks; // (TabletId, Generation) pairs
TEvPut(const TLogoBlobID &id, const TString &buffer, TInstant deadline,
NKikimrBlobStorage::EPutHandleClass handleClass = NKikimrBlobStorage::TabletLog,
diff --git a/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp b/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp
index ed38409713b..f4ae30da968 100644
--- a/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp
+++ b/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp
@@ -66,6 +66,8 @@ class TVDiskBackpressureClientActor : public TActorBootstrapped<TVDiskBackpressu
return "<unknown>";
}
+ bool ExtraBlockChecksSupport = false;
+
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::BS_QUEUE_ACTOR;
@@ -453,7 +455,7 @@ private:
case EState::READY:
QLOG_NOTICE_S("BSQ96", "connection lost status# " << NKikimrProto::EReplyStatus_Name(status)
<< " errorReason# " << errorReason << " timeout# " << timeout);
- ctx.Send(BlobStorageProxy, new TEvProxyQueueState(VDiskId, QueueId, false));
+ ctx.Send(BlobStorageProxy, new TEvProxyQueueState(VDiskId, QueueId, false, false));
Queue.DrainQueue(status, TStringBuilder() << "BS_QUEUE: " << errorReason, ctx);
DrainStatus(status, ctx);
break;
@@ -551,7 +553,8 @@ private:
const auto& record = ev->Get()->Record;
if (record.GetStatus() != NKikimrProto::NOTREADY) {
- ctx.Send(BlobStorageProxy, new TEvProxyQueueState(VDiskId, QueueId, true));
+ ExtraBlockChecksSupport = record.GetExtraBlockChecksSupport();
+ ctx.Send(BlobStorageProxy, new TEvProxyQueueState(VDiskId, QueueId, true, ExtraBlockChecksSupport));
if (record.HasExpectedMsgId()) {
Queue.SetMessageId(NBackpressure::TMessageId(record.GetExpectedMsgId()));
}
@@ -758,7 +761,7 @@ private:
<< " RemoteVDisk# " << RemoteVDisk
<< " VDiskId# " << VDiskId
<< " IsConnected# " << isConnected);
- ctx.Send(ev->Sender, new TEvProxyQueueState(VDiskId, QueueId, isConnected));
+ ctx.Send(ev->Sender, new TEvProxyQueueState(VDiskId, QueueId, isConnected, isConnected && ExtraBlockChecksSupport));
}
#define QueueRequestHFunc(TEvType) \
diff --git a/ydb/core/blobstorage/backpressure/queue_backpressure_client.h b/ydb/core/blobstorage/backpressure/queue_backpressure_client.h
index e6cc006a7a9..74c0f13edc7 100644
--- a/ydb/core/blobstorage/backpressure/queue_backpressure_client.h
+++ b/ydb/core/blobstorage/backpressure/queue_backpressure_client.h
@@ -16,11 +16,14 @@ namespace NKikimr {
TVDiskID VDiskId;
NKikimrBlobStorage::EVDiskQueueId QueueId;
bool IsConnected;
+ bool ExtraBlockChecksSupport;
- TEvProxyQueueState(const TVDiskID &vDiskId, NKikimrBlobStorage::EVDiskQueueId queueId, bool isConnected)
+ TEvProxyQueueState(const TVDiskID &vDiskId, NKikimrBlobStorage::EVDiskQueueId queueId, bool isConnected,
+ bool extraBlockChecksSupport)
: VDiskId(vDiskId)
, QueueId(queueId)
, IsConnected(isConnected)
+ , ExtraBlockChecksSupport(extraBlockChecksSupport)
{}
TString ToString() const {
@@ -28,6 +31,7 @@ namespace NKikimr {
str << "{VDiskId# " << VDiskId.ToString();
str << " QueueId# " << static_cast<ui32>(QueueId);
str << " IsConnected# " << (IsConnected ? "true" : "false");
+ str << " ExtraBlockChecksSupport# " << (ExtraBlockChecksSupport ? "true" : "false");
str << "}";
return str.Str();
}
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp
index eb6d8394f02..344fde247ef 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp
@@ -346,9 +346,11 @@ void TGroupDiskRequests::AddGet(const ui32 diskOrderNumber, const TLogoBlobID &i
}
void TGroupDiskRequests::AddPut(const ui32 diskOrderNumber, const TLogoBlobID &id, TString buffer,
- TDiskPutRequest::EPutReason putReason, bool isHandoff, ui8 blobIdx) {
+ TDiskPutRequest::EPutReason putReason, bool isHandoff, std::vector<std::pair<ui64, ui32>> *extraBlockChecks,
+ ui8 blobIdx) {
Y_VERIFY(diskOrderNumber < DiskRequestsForOrderNumber.size());
- DiskRequestsForOrderNumber[diskOrderNumber].PutsToSend.emplace_back(id, buffer, putReason, isHandoff, blobIdx);
+ DiskRequestsForOrderNumber[diskOrderNumber].PutsToSend.emplace_back(id, buffer, putReason, isHandoff,
+ extraBlockChecks, blobIdx);
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -378,13 +380,17 @@ void TBlackboard::AddNeeded(const TLogoBlobID &id, ui32 inShift, ui32 inSize) {
}
}
-void TBlackboard::AddPartToPut(const TLogoBlobID &id, ui32 partIdx, TString &partData) {
+void TBlackboard::AddPartToPut(const TLogoBlobID &id, ui32 partIdx, TString &partData,
+ std::vector<std::pair<ui64, ui32>> *extraBlockChecks) {
Y_VERIFY(bool(id));
Y_VERIFY(id.PartId() == 0);
Y_VERIFY(id.BlobSize() != 0);
TBlobState &state = BlobStates[id];
- if (!bool(state.Id)) {
+ if (!state.Id) {
state.Init(id, *Info);
+ state.ExtraBlockChecks = extraBlockChecks;
+ } else {
+ Y_VERIFY(state.ExtraBlockChecks == extraBlockChecks);
}
state.AddPartToPut(partIdx, partData);
}
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h
index fe926648a83..cbffa37eb0c 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h
@@ -81,6 +81,7 @@ struct TBlobState {
NKikimrProto::EReplyStatus Status = NKikimrProto::UNKNOWN;
bool IsChanged = false;
bool IsDone = false;
+ std::vector<std::pair<ui64, ui32>> *ExtraBlockChecks = nullptr;
void Init(const TLogoBlobID &id, const TBlobStorageGroupInfo &Info);
void AddNeeded(ui64 begin, ui64 size);
@@ -127,13 +128,16 @@ struct TDiskPutRequest {
EPutReason Reason;
bool IsHandoff;
ui8 BlobIdx;
+ std::vector<std::pair<ui64, ui32>> *ExtraBlockChecks;
- TDiskPutRequest(const TLogoBlobID &id, TString buffer, EPutReason reason, bool isHandoff, ui8 blobIdx = 0)
+ TDiskPutRequest(const TLogoBlobID &id, TString buffer, EPutReason reason, bool isHandoff,
+ std::vector<std::pair<ui64, ui32>> *extraBlockChecks, ui8 blobIdx = 0)
: Id(id)
, Buffer(std::move(buffer))
, Reason(reason)
, IsHandoff(isHandoff)
, BlobIdx(blobIdx)
+ , ExtraBlockChecks(extraBlockChecks)
{}
};
@@ -151,7 +155,8 @@ struct TGroupDiskRequests {
void AddGet(const ui32 diskOrderNumber, const TLogoBlobID &id, const TIntervalSet<i32> &intervalSet);
void AddGet(const ui32 diskOrderNumber, const TLogoBlobID &id, const ui32 shift, const ui32 size);
void AddPut(const ui32 diskOrderNumber, const TLogoBlobID &id, TString buffer,
- TDiskPutRequest::EPutReason putReason, bool isHandoff, ui8 blobIdx = 0);
+ TDiskPutRequest::EPutReason putReason, bool isHandoff, std::vector<std::pair<ui64, ui32>> *extraBlockChecks,
+ ui8 blobIdx = 0);
};
struct TBlackboard;
@@ -194,7 +199,7 @@ struct TBlackboard {
{}
void AddNeeded(const TLogoBlobID &id, ui32 inShift, ui32 inSize);
- void AddPartToPut(const TLogoBlobID &id, ui32 partIdx, TString &partData);
+ void AddPartToPut(const TLogoBlobID &id, ui32 partIdx, TString &partData, std::vector<std::pair<ui64, ui32>> *extraBlockChecks);
void MarkBlobReadyToPut(const TLogoBlobID &id, ui8 blobIdx = 0);
void MoveBlobStateToDone(const TLogoBlobID &id);
void AddResponseData(const TLogoBlobID &id, ui32 orderNumber, ui32 shift, TString &data);
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp
index 0583b454df6..f1d01776b6c 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp
@@ -352,7 +352,7 @@ void TGetImpl::PrepareVPuts(TLogContext &logCtx,
}
bytes += put.Buffer.size();
lastItemCount++;
- vMultiPut->AddVPut(put.Id, put.Buffer, &cookie);
+ vMultiPut->AddVPut(put.Id, put.Buffer, &cookie, put.ExtraBlockChecks);
}
vMultiPut->Record.SetCookie(TVMultiPutCookie(diskOrderNumber, lastItemCount, VMultiPutRequests));
++VMultiPutRequests;
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp
index df51c9a78e0..e2b717d13f6 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp
@@ -45,9 +45,10 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
NWilson::TTraceId TraceId;
NLWTrace::TOrbit Orbit;
bool Replied = false;
+ std::vector<std::pair<ui64, ui32>> ExtraBlockChecks;
TMultiPutItemInfo(TLogoBlobID id, const TString& buffer, TActorId recipient, ui64 cookie,
- NWilson::TTraceId traceId, NLWTrace::TOrbit &&orbit)
+ NWilson::TTraceId traceId, NLWTrace::TOrbit &&orbit, std::vector<std::pair<ui64, ui32>> extraBlockChecks)
: BlobId(id)
, Buffer(buffer)
, BufferSize(buffer.size())
@@ -55,6 +56,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
, Cookie(cookie)
, TraceId(std::move(traceId))
, Orbit(std::move(orbit))
+ , ExtraBlockChecks(std::move(extraBlockChecks))
{}
};
@@ -92,6 +94,8 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
const bool IsMultiPutMode;
+ bool RequireExtraBlockChecks = false;
+
void SanityCheck() {
if (RequestsSent <= MaxSaneRequests) {
return;
@@ -458,11 +462,13 @@ public:
, IsAccelerated(false)
, IsAccelerateScheduled(false)
, IsMultiPutMode(false)
+ , RequireExtraBlockChecks(!ev->ExtraBlockChecks.empty())
{
if (ev->Orbit.HasShuttles()) {
RootCauseTrack.IsOn = true;
}
- ItemsInfo.emplace_back(ev->Id, ev->Buffer, source, cookie, NWilson::TTraceId(), std::move(ev->Orbit));
+ ItemsInfo.emplace_back(ev->Id, ev->Buffer, source, cookie, NWilson::TTraceId(), std::move(ev->Orbit),
+ std::move(ev->ExtraBlockChecks));
LWPROBE(DSProxyBlobPutTactics, ItemsInfo[0].BlobId.TabletID(), Info->GroupID, ItemsInfo[0].BlobId.ToString(),
Tactic, NKikimrBlobStorage::EPutHandleClass_Name(HandleClass));
ReportBytes(ItemsInfo[0].Buffer.capacity() + sizeof(*this));
@@ -507,17 +513,22 @@ public:
{
Y_VERIFY_DEBUG(events.size() <= MaxBatchedPutRequests);
for (auto &ev : events) {
- Deadline = Max(Deadline, ev->Get()->Deadline);
- if (ev->Get()->Orbit.HasShuttles()) {
+ auto& msg = *ev->Get();
+ Deadline = Max(Deadline, msg.Deadline);
+ if (msg.Orbit.HasShuttles()) {
RootCauseTrack.IsOn = true;
}
+ if (!msg.ExtraBlockChecks.empty()) {
+ RequireExtraBlockChecks = true;
+ }
ItemsInfo.emplace_back(
- ev->Get()->Id,
- ev->Get()->Buffer,
+ msg.Id,
+ msg.Buffer,
ev->Sender,
ev->Cookie,
std::move(ev->TraceId),
- std::move(ev->Get()->Orbit)
+ std::move(msg.Orbit),
+ std::move(msg.ExtraBlockChecks)
);
LWPROBE(DSProxyBlobPutTactics, ItemsInfo.back().BlobId.TabletID(), Info->GroupID,
ItemsInfo.back().BlobId.ToString(), Tactic, NKikimrBlobStorage::EPutHandleClass_Name(HandleClass));
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp
index 70b774636ca..757b313cfb0 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp
@@ -59,15 +59,15 @@ void TPutImpl::PrepareOneReply(NKikimrProto::EReplyStatus status, TLogoBlobID bl
void TPutImpl::PrepareReply(NKikimrProto::EReplyStatus status, TLogContext &logCtx, TString errorReason,
TPutResultVec &outPutResults) {
A_LOG_DEBUG_SX(logCtx, "BPP34", "PrepareReply status# " << status << " errorReason# " << errorReason);
- for (ui64 idx = 0; idx < BlobIds.size(); ++idx) {
+ for (ui64 idx = 0; idx < Blobs.size(); ++idx) {
if (IsDone[idx]) {
- A_LOG_DEBUG_SX(logCtx, "BPP35", "blob# " << BlobIds[idx].ToString() <<
+ A_LOG_DEBUG_SX(logCtx, "BPP35", "blob# " << Blobs[idx].ToString() <<
" idx# " << idx << " is sent, skipped");
continue;
}
- outPutResults.emplace_back(idx, new TEvBlobStorage::TEvPutResult(status, BlobIds[idx], StatusFlags, Info->GroupID,
- ApproximateFreeSpaceShare));
+ outPutResults.emplace_back(idx, new TEvBlobStorage::TEvPutResult(status, Blobs[idx].Id, StatusFlags,
+ Info->GroupID, ApproximateFreeSpaceShare));
outPutResults.back().second->ErrorReason = errorReason;
NLog::EPriority priority = GetPriorityForReply(Info->PutErrorMuteChecker, status);
@@ -87,7 +87,7 @@ void TPutImpl::PrepareReply(TLogContext &logCtx, TString errorReason,
for (auto item : finished) {
auto &[blobId, state] = *item;
const ui64 idx = state.BlobIdx;
- Y_VERIFY(blobId == BlobIds[idx], "BlobIdx# %" PRIu64 " BlobState# %s Blackboard# %s",
+ Y_VERIFY(blobId == Blobs[idx].Id, "BlobIdx# %" PRIu64 " BlobState# %s Blackboard# %s",
idx, state.ToString().c_str(), Blackboard.ToString().c_str());
Y_VERIFY(!IsDone[idx]);
Y_VERIFY(state.Status != NKikimrProto::UNKNOWN);
@@ -126,7 +126,7 @@ TString TPutImpl::DumpFullState() const {
str << Endl;
str << " Blackboard# " << Blackboard.ToString();
str << Endl;
- str << " BlobIds# " << BlobIds.ToString();
+ str << " Blobs# " << Blobs.ToString();
str << Endl;
str << "IsDone# " << IsDone.ToString();
str << Endl;
@@ -147,12 +147,16 @@ TString TPutImpl::DumpFullState() const {
}
bool TPutImpl::MarkBlobAsSent(ui64 idx) {
- Y_VERIFY(idx < BlobIds.size());
+ Y_VERIFY(idx < Blobs.size());
Y_VERIFY(!IsDone[idx]);
- Blackboard.MoveBlobStateToDone(BlobIds[idx]);
+ Blackboard.MoveBlobStateToDone(Blobs[idx].Id);
IsDone[idx] = true;
DoneBlobs++;
return true;
}
}//NKikimr
+
+Y_DECLARE_OUT_SPEC(, NKikimr::TPutImpl::TBlobInfo, stream, value) {
+ value.Output(stream);
+}
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h
index b8b74c1cde7..b95d0cf3485 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h
@@ -44,7 +44,31 @@ private:
const TEvBlobStorage::TEvPut::ETactic Tactic;
- TBatchedVec<TLogoBlobID> BlobIds;
+ struct TBlobInfo {
+ TLogoBlobID Id;
+ std::vector<std::pair<ui64, ui32>> ExtraBlockChecks;
+
+ void Output(IOutputStream& s) const {
+ s << Id;
+ if (!ExtraBlockChecks.empty()) {
+ s << "{";
+ for (auto it = ExtraBlockChecks.begin(); it != ExtraBlockChecks.end(); ++it) {
+ if (it != ExtraBlockChecks.begin()) {
+ s << ", ";
+ }
+ s << it->first << ":" << it->second;
+ }
+ s << "}";
+ }
+ }
+
+ TString ToString() const {
+ TStringStream s;
+ Output(s);
+ return s.Str();
+ }
+ };
+ TBatchedVec<TBlobInfo> Blobs;
TStackVec<bool, MaxBatchedPutRequests * TypicalDisksInSubring> ReceivedVPutResponses;
TStackVec<bool, MaxBatchedPutRequests * TypicalDisksInSubring> ReceivedVMultiPutResponses;
@@ -53,6 +77,8 @@ private:
TString ErrorDescription;
+ friend void ::Out<TBlobInfo>(IOutputStream&, const TBlobInfo&);
+
public:
TPutImpl(const TIntrusivePtr<TBlobStorageGroupInfo> &info, const TIntrusivePtr<TGroupQueues> &state,
TEvBlobStorage::TEvPut *ev, const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon,
@@ -66,10 +92,10 @@ public:
, Mon(mon)
, EnableRequestMod3x3ForMinLatecy(enableRequestMod3x3ForMinLatecy)
, Tactic(ev->Tactic)
- , BlobIds({ev->Id})
+ , Blobs({{ev->Id, std::move(ev->ExtraBlockChecks)}})
{
- Y_VERIFY(BlobIds.size());
- Y_VERIFY(BlobIds.size() <= MaxBatchedPutRequests);
+ Y_VERIFY(Blobs.size());
+ Y_VERIFY(Blobs.size() <= MaxBatchedPutRequests);
}
TPutImpl(const TIntrusivePtr<TBlobStorageGroupInfo> &info, const TIntrusivePtr<TGroupQueues> &state,
@@ -88,13 +114,14 @@ public:
{
Y_VERIFY(events.size(), "TEvPut vector is empty");
for (auto &ev : events) {
- Y_VERIFY(ev->Get()->HandleClass == putHandleClass);
- Y_VERIFY(ev->Get()->Tactic == tactic);
- BlobIds.push_back(ev->Get()->Id);
- Deadline = Max(Deadline, ev->Get()->Deadline);
+ auto& msg = *ev->Get();
+ Y_VERIFY(msg.HandleClass == putHandleClass);
+ Y_VERIFY(msg.Tactic == tactic);
+ Blobs.push_back({msg.Id, std::move(msg.ExtraBlockChecks)});
+ Deadline = Max(Deadline, msg.Deadline);
}
- Y_VERIFY(BlobIds.size());
- Y_VERIFY(BlobIds.size() <= MaxBatchedPutRequests);
+ Y_VERIFY(Blobs.size());
+ Y_VERIFY(Blobs.size() <= MaxBatchedPutRequests);
}
NKikimrBlobStorage::EPutHandleClass GetPutHandleClass() const {
@@ -109,17 +136,17 @@ public:
void GenerateInitialRequests(TLogContext &logCtx, TBatchedVec<TDataPartSet> &partSets,
TDeque<std::unique_ptr<TVPutEvent>> &outVPuts) {
Y_UNUSED(logCtx);
- Y_VERIFY_S(partSets.size() == BlobIds.size(), "partSets.size# " << partSets.size()
- << " BlobIds.size# " << BlobIds.size());
+ Y_VERIFY_S(partSets.size() == Blobs.size(), "partSets.size# " << partSets.size()
+ << " Blobs.size# " << Blobs.size());
const ui32 totalParts = Info->Type.TotalPartCount();
- for (ui64 blobIdx = 0; blobIdx < BlobIds.size(); ++blobIdx) {
- TLogoBlobID &blobId = BlobIds[blobIdx];
+ for (ui64 blobIdx = 0; blobIdx < Blobs.size(); ++blobIdx) {
+ TBlobInfo& blob = Blobs[blobIdx];
for (ui32 i = 0; i < totalParts; ++i) {
REQUEST_VALGRIND_CHECK_MEM_IS_DEFINED(partSets[blobIdx].Parts[i].OwnedString.Data(),
partSets[blobIdx].Parts[i].OwnedString.Size());
- Blackboard.AddPartToPut(blobId, i, partSets[blobIdx].Parts[i].OwnedString);
+ Blackboard.AddPartToPut(blob.Id, i, partSets[blobIdx].Parts[i].OwnedString, &blob.ExtraBlockChecks);
}
- Blackboard.MarkBlobReadyToPut(blobId, blobIdx);
+ Blackboard.MarkBlobReadyToPut(blob.Id, blobIdx);
}
TPutResultVec putResults;
@@ -322,7 +349,7 @@ public:
}
Step(logCtx, outVPutEvents, outPutResults);
- Y_VERIFY_S(DoneBlobs == BlobIds.size() || requests > responses,
+ Y_VERIFY_S(DoneBlobs == Blobs.size() || requests > responses,
"No put result while"
<< " Type# " << putType
<< " DoneBlobs# " << DoneBlobs
@@ -420,13 +447,21 @@ protected:
if constexpr (isVPut) {
auto vPut = std::make_unique<TEvBlobStorage::TEvVPut>(put.Id, put.Buffer, vDiskId, false, &cookie,
Deadline, Blackboard.PutHandleClass);
+ auto& record = vPut->Record;
+ if (put.ExtraBlockChecks) {
+ for (const auto& [tabletId, generation] : *put.ExtraBlockChecks) {
+ auto *p = record.AddExtraBlockChecks();
+ p->SetTabletId(tabletId);
+ p->SetGeneration(generation);
+ }
+ }
R_LOG_DEBUG_SX(logCtx, "BPP20", "Send put to orderNumber# " << diskOrderNumber << " idx# " << idx
<< " vPut# " << vPut->ToString());
outVPutEvents.push_back(std::move(vPut));
++VPutRequests;
ReceivedVPutResponses.push_back(false);
} else if constexpr (isVMultiPut) {
- outVPutEvents.back()->AddVPut(put.Id, put.Buffer, &cookie);
+ outVPutEvents.back()->AddVPut(put.Id, put.Buffer, &cookie, put.ExtraBlockChecks);
}
if (put.IsHandoff) {
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_state.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_state.cpp
index 9cf4b55e589..f2dcdce5881 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_state.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_state.cpp
@@ -205,7 +205,8 @@ namespace NKikimr {
Y_VERIFY(Sessions);
auto *msg = ev->Get();
Y_VERIFY(Topology);
- Sessions->QueueConnectUpdate(Topology->GetOrderNumber(msg->VDiskId), msg->QueueId, msg->IsConnected);
+ Sessions->QueueConnectUpdate(Topology->GetOrderNumber(msg->VDiskId), msg->QueueId, msg->IsConnected,
+ msg->ExtraBlockChecksSupport, *Topology);
if (msg->IsConnected && (CurrentStateFunc() == &TThis::StateEstablishingSessions ||
CurrentStateFunc() == &TThis::StateEstablishingSessionsTimeout)) {
SwitchToWorkWhenGoodToGo();
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp
index c73c20712ec..89304e2cd34 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp
@@ -333,7 +333,8 @@ void TStrategyBase::PreparePutsForPartPlacement(TLogContext &logCtx, TBlobState
<< " blob Id# " << partId.ToString());
Y_VERIFY(state.Parts[record.PartIdx].Data.IsMonolith());
groupDiskRequests.AddPut(disk.OrderNumber, partId, state.Parts[record.PartIdx].Data.GetMonolith(),
- TDiskPutRequest::ReasonInitial, info.Type.IsHandoffInSubgroup(record.VDiskIdx), state.BlobIdx);
+ TDiskPutRequest::ReasonInitial, info.Type.IsHandoffInSubgroup(record.VDiskIdx),
+ state.ExtraBlockChecks, state.BlobIdx);
disk.DiskParts[record.PartIdx].Situation = TBlobState::ESituation::Sent;
}
}
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3of4.h b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3of4.h
index b372e891ebd..5ef2c7ddf4d 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3of4.h
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3of4.h
@@ -140,6 +140,7 @@ protected:
GetDataBuffer(state, info),
diskIdx == group.DiskIdx[0] ? TDiskPutRequest::ReasonInitial : TDiskPutRequest::ReasonError,
diskIdx != group.DiskIdx[0],
+ state.ExtraBlockChecks,
state.BlobIdx);
s = TBlobState::ESituation::Sent;
any |= {&info.GetTopology(), diskIdx};
@@ -172,6 +173,7 @@ protected:
TString(),
handoff ? TDiskPutRequest::ReasonError : TDiskPutRequest::ReasonInitial,
handoff,
+ state.ExtraBlockChecks,
state.BlobIdx);
part.Situation = TBlobState::ESituation::Sent;
any |= {&info.GetTopology(), (ui8)diskIdx};
diff --git a/ydb/core/blobstorage/dsproxy/group_sessions.cpp b/ydb/core/blobstorage/dsproxy/group_sessions.cpp
index 9923c6b7843..27223501628 100644
--- a/ydb/core/blobstorage/dsproxy/group_sessions.cpp
+++ b/ydb/core/blobstorage/dsproxy/group_sessions.cpp
@@ -85,6 +85,7 @@ TGroupSessions::TGroupSessions(const TIntrusivePtr<TBlobStorageGroupInfo>& info,
auto& q = stateVDisk.Queues.GetQueue(queueId);
q.ActorId = queue;
q.FlowRecord = std::move(flowRecord);
+ q.ExtraBlockChecksSupport = false;
}
}
}
@@ -114,11 +115,18 @@ bool TGroupSessions::GoodToGo(const TBlobStorageGroupInfo::TTopology& topology,
: topology.GetQuorumChecker().CheckQuorumForGroup(connected);
}
-void TGroupSessions::QueueConnectUpdate(ui32 orderNumber, NKikimrBlobStorage::EVDiskQueueId queueId, bool connected) {
+void TGroupSessions::QueueConnectUpdate(ui32 orderNumber, NKikimrBlobStorage::EVDiskQueueId queueId, bool connected,
+ bool extraGroupChecksSupport, const TBlobStorageGroupInfo::TTopology& topology) {
+ const auto v = topology.GetVDiskId(orderNumber);
+ const ui32 fdom = topology.GetFailDomainOrderNumber(v);
+ auto& q = GroupQueues->FailDomains[fdom].VDisks[v.VDisk].Queues.GetQueue(queueId);
+
if (connected) {
ConnectedQueuesMask[orderNumber] |= 1 << queueId;
+ q.ExtraBlockChecksSupport = extraGroupChecksSupport;
} else {
ConnectedQueuesMask[orderNumber] &= ~(1 << queueId);
+ q.ExtraBlockChecksSupport = false;
}
}
diff --git a/ydb/core/blobstorage/dsproxy/group_sessions.h b/ydb/core/blobstorage/dsproxy/group_sessions.h
index 9284051408a..120ad2da4c9 100644
--- a/ydb/core/blobstorage/dsproxy/group_sessions.h
+++ b/ydb/core/blobstorage/dsproxy/group_sessions.h
@@ -21,6 +21,7 @@ namespace NKikimr {
struct TQueue {
TActorId ActorId;
TIntrusivePtr<NBackpressure::TFlowRecord> FlowRecord;
+ bool ExtraBlockChecksSupport;
};
TQueue PutTabletLog;
TQueue PutAsyncBlob;
@@ -65,10 +66,6 @@ namespace NKikimr {
}
}
- TActorId& QueueForQueueId(NKikimrBlobStorage::EVDiskQueueId queueId) {
- return GetQueue(queueId).ActorId;
- }
-
TIntrusivePtr<NBackpressure::TFlowRecord>& FlowRecordForQueueId(NKikimrBlobStorage::EVDiskQueueId queueId) {
return GetQueue(queueId).FlowRecord;
}
@@ -78,9 +75,25 @@ namespace NKikimr {
return flowRecord ? flowRecord->GetPredictedDelayNs() : 0;
}
+ template<typename T>
+ static void ValidateEvent(TQueue& /*queue*/, const T& /*event*/)
+ {}
+
+ static void ValidateEvent(TQueue& queue, const TEvBlobStorage::TEvVPut& event) {
+ Y_VERIFY(!event.Record.ExtraBlockChecksSize() || queue.ExtraBlockChecksSupport);
+ }
+
+ static void ValidateEvent(TQueue& queue, const TEvBlobStorage::TEvVMultiPut& event) {
+ for (const auto& item : event.Record.GetItems()) {
+ Y_VERIFY(!item.ExtraBlockChecksSize() || queue.ExtraBlockChecksSupport);
+ }
+ }
+
template<typename TEvent>
TActorId QueueForEvent(const TEvent& event) {
- return QueueForQueueId(VDiskQueueId(event));
+ TQueue& queue = GetQueue(VDiskQueueId(event));
+ ValidateEvent(queue, event);
+ return queue.ActorId;
}
TString ToString() const {
@@ -220,7 +233,8 @@ namespace NKikimr {
const TActorId& monActor, const TActorId& proxyActor);
void Poison();
bool GoodToGo(const TBlobStorageGroupInfo::TTopology& topology, bool waitForAllVDisks);
- void QueueConnectUpdate(ui32 orderNumber, NKikimrBlobStorage::EVDiskQueueId queueId, bool connected);
+ void QueueConnectUpdate(ui32 orderNumber, NKikimrBlobStorage::EVDiskQueueId queueId, bool connected,
+ bool extraBlockChecksSupport, const TBlobStorageGroupInfo::TTopology& topology);
ui32 GetNumUnconnectedDisks();
};
diff --git a/ydb/core/blobstorage/ut_blobstorage/CMakeLists.darwin.txt b/ydb/core/blobstorage/ut_blobstorage/CMakeLists.darwin.txt
index 20737cdd499..9bcfefe2f87 100644
--- a/ydb/core/blobstorage/ut_blobstorage/CMakeLists.darwin.txt
+++ b/ydb/core/blobstorage/ut_blobstorage/CMakeLists.darwin.txt
@@ -34,6 +34,7 @@ target_sources(ydb-core-blobstorage-ut_blobstorage PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/decommit_3dc.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/defrag.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/encryption.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/extra_block_checks.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/gc_quorum_3dc.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/incorrect_queries.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/main.cpp
diff --git a/ydb/core/blobstorage/ut_blobstorage/CMakeLists.linux.txt b/ydb/core/blobstorage/ut_blobstorage/CMakeLists.linux.txt
index c29b009099a..3ac6ede19a9 100644
--- a/ydb/core/blobstorage/ut_blobstorage/CMakeLists.linux.txt
+++ b/ydb/core/blobstorage/ut_blobstorage/CMakeLists.linux.txt
@@ -38,6 +38,7 @@ target_sources(ydb-core-blobstorage-ut_blobstorage PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/decommit_3dc.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/defrag.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/encryption.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/extra_block_checks.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/gc_quorum_3dc.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/incorrect_queries.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/main.cpp
diff --git a/ydb/core/blobstorage/ut_blobstorage/extra_block_checks.cpp b/ydb/core/blobstorage/ut_blobstorage/extra_block_checks.cpp
new file mode 100644
index 00000000000..cac7bfd7f3f
--- /dev/null
+++ b/ydb/core/blobstorage/ut_blobstorage/extra_block_checks.cpp
@@ -0,0 +1,62 @@
+#include <ydb/core/blobstorage/ut_blobstorage/lib/env.h>
+
+Y_UNIT_TEST_SUITE(ExtraBlockChecks) {
+ Y_UNIT_TEST(Basic) {
+ TEnvironmentSetup env(TEnvironmentSetup::TSettings{
+ .NodeCount = 8,
+ .Erasure = TBlobStorageGroupType::Erasure4Plus2Block,
+ });
+
+ auto& runtime = env.Runtime;
+
+ env.CreateBoxAndPool(1, 1);
+ auto groups = env.GetGroups();
+ UNIT_ASSERT_VALUES_EQUAL(groups.size(), 1);
+ const TIntrusivePtr<TBlobStorageGroupInfo> info = env.GetGroupInfo(groups.front());
+
+ const auto& edge = runtime->AllocateEdgeActor(1, __FILE__, __LINE__);
+ runtime->WrapInActorContext(edge, [&] {
+ SendToBSProxy(edge, info->GroupID, new TEvBlobStorage::TEvBlock(1, 10, TInstant::Max()));
+ SendToBSProxy(edge, info->GroupID, new TEvBlobStorage::TEvBlock(2, 10, TInstant::Max()));
+ });
+ {
+ auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvBlockResult>(edge, false);
+ UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK);
+ res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvBlockResult>(edge, false);
+ UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK);
+ }
+
+ const TString data = "data";
+
+ runtime->WrapInActorContext(edge, [&] {
+ auto ev = std::make_unique<TEvBlobStorage::TEvPut>(TLogoBlobID(1, 11, 1, 0, data.size(), 1), data, TInstant::Max());
+ ev->ExtraBlockChecks.emplace_back(2, 10);
+ SendToBSProxy(edge, info->GroupID, ev.release());
+ });
+ {
+ auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvPutResult>(edge, false);
+ UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::BLOCKED);
+ }
+
+ runtime->WrapInActorContext(edge, [&] {
+ SendToBSProxy(edge, info->GroupID, new TEvBlobStorage::TEvPut(TLogoBlobID(1, 11, 1, 0, data.size(), 2), data, TInstant::Max()));
+ });
+ {
+ auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvPutResult>(edge, false);
+ UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK);
+ }
+
+ runtime->WrapInActorContext(edge, [&] {
+ auto ev = std::make_unique<TEvBlobStorage::TEvPut>(TLogoBlobID(1, 11, 1, 0, data.size(), 3), data, TInstant::Max());
+ ev->ExtraBlockChecks.emplace_back(2, 10);
+ SendToBSProxy(edge, info->GroupID, ev.release());
+ SendToBSProxy(edge, info->GroupID, new TEvBlobStorage::TEvPut(TLogoBlobID(1, 11, 1, 0, data.size(), 4), data, TInstant::Max()));
+ });
+ {
+ auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvPutResult>(edge, false);
+ UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::BLOCKED);
+ res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvPutResult>(edge, false);
+ UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK);
+ }
+ }
+}
diff --git a/ydb/core/blobstorage/ut_blobstorage/incorrect_queries.cpp b/ydb/core/blobstorage/ut_blobstorage/incorrect_queries.cpp
index 1e1bca2fe9c..2e51f3e5bfc 100644
--- a/ydb/core/blobstorage/ut_blobstorage/incorrect_queries.cpp
+++ b/ydb/core/blobstorage/ut_blobstorage/incorrect_queries.cpp
@@ -80,7 +80,7 @@ Y_UNIT_TEST_SUITE(IncorrectQueries) {
TInstant::Max(), NKikimrBlobStorage::EPutHandleClass::TabletLog, false, nullptr));
for(auto [blob, data, status] : blobs) {
- static_cast<TEvBlobStorage::TEvVMultiPut*>(ev.get())->AddVPut(blob, data, 0);
+ static_cast<TEvBlobStorage::TEvVMultiPut*>(ev.get())->AddVPut(blob, data, nullptr, nullptr);
}
static_cast<TEvBlobStorage::TEvVMultiPut*>(ev.get())->Record = proto;
@@ -105,7 +105,7 @@ Y_UNIT_TEST_SUITE(IncorrectQueries) {
TInstant::Max(), NKikimrBlobStorage::EPutHandleClass::TabletLog, false, nullptr));
for(auto [blob, data, status] : blobs) {
- static_cast<TEvBlobStorage::TEvVMultiPut*>(ev.get())->AddVPut(blob, data, 0);
+ static_cast<TEvBlobStorage::TEvVMultiPut*>(ev.get())->AddVPut(blob, data, nullptr, nullptr);
}
env.WithQueueId(test.Info->GetVDiskInSubgroup(0, blobs[0].BlobId.Hash()), NKikimrBlobStorage::EVDiskQueueId::PutTabletLog, [&](TActorId queueId) {
@@ -528,7 +528,7 @@ Y_UNIT_TEST_SUITE(IncorrectQueries) {
if (i % 19 != 18) {
++goodCount;
TLogoBlobID blob(i, 1, 0, 0, blobSize, 0, 1);
- static_cast<TEvBlobStorage::TEvVMultiPut*>(events[i].get())->AddVPut(blob, data, 0);
+ static_cast<TEvBlobStorage::TEvVMultiPut*>(events[i].get())->AddVPut(blob, data, nullptr, nullptr);
}
}
diff --git a/ydb/core/blobstorage/ut_vdisk/lib/helpers.cpp b/ydb/core/blobstorage/ut_vdisk/lib/helpers.cpp
index b73da3df8db..f7bd796a008 100644
--- a/ydb/core/blobstorage/ut_vdisk/lib/helpers.cpp
+++ b/ydb/core/blobstorage/ut_vdisk/lib/helpers.cpp
@@ -448,7 +448,7 @@ class TManyMultiPuts : public TActorBootstrapped<TManyMultiPuts> {
TVDiskIdShort mainVDiskId = TIngress::GetMainReplica(&Conf->GroupInfo->GetTopology(), logoBlobID);
if (mainVDiskId == VDiskInfo.VDiskID) {
ui64 cookieValue = Step;
- vMultiPut->AddVPut(logoBlobID, MsgData, &cookieValue);
+ vMultiPut->AddVPut(logoBlobID, MsgData, &cookieValue, nullptr);
putCount++;
Step++;
diff --git a/ydb/core/blobstorage/ut_vdisk/lib/test_huge.cpp b/ydb/core/blobstorage/ut_vdisk/lib/test_huge.cpp
index 9940e6da358..0e41b8b27f4 100644
--- a/ydb/core/blobstorage/ut_vdisk/lib/test_huge.cpp
+++ b/ydb/core/blobstorage/ut_vdisk/lib/test_huge.cpp
@@ -60,7 +60,7 @@ public:
new TEvHullWriteHugeBlob(TActorId(), 0, logoBlobId, TIngress(),
TRope(abcdefghkj),
false, NKikimrBlobStorage::EPutHandleClass::AsyncBlob,
- std::make_unique<TEvBlobStorage::TEvVPutResult>()));
+ std::make_unique<TEvBlobStorage::TEvVPutResult>(), nullptr));
State = 1;
return false;
}
diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_events.h b/ydb/core/blobstorage/vdisk/common/vdisk_events.h
index bfd260b0040..dc52a49d022 100644
--- a/ydb/core/blobstorage/vdisk/common/vdisk_events.h
+++ b/ydb/core/blobstorage/vdisk/common/vdisk_events.h
@@ -836,7 +836,8 @@ namespace NKikimr {
TRope GetItemBuffer(ui64 itemIdx) const;
- void AddVPut(const TLogoBlobID &logoBlobId, const TString &buffer, ui64 *cookie) {
+ void AddVPut(const TLogoBlobID &logoBlobId, const TString &buffer, ui64 *cookie,
+ std::vector<std::pair<ui64, ui32>> *extraBlockChecks) {
NKikimrBlobStorage::TVMultiPutItem *item = Record.AddItems();
LogoBlobIDFromLogoBlobID(logoBlobId, item->MutableBlobID());
item->SetFullDataSize(logoBlobId.BlobSize());
@@ -845,6 +846,13 @@ namespace NKikimr {
if (cookie) {
item->SetCookie(*cookie);
}
+ if (extraBlockChecks) {
+ for (const auto& [tabletId, generation] : *extraBlockChecks) {
+ auto *p = item->AddExtraBlockChecks();
+ p->SetTabletId(tabletId);
+ p->SetGeneration(generation);
+ }
+ }
}
bool Validate(TString& errorReason) {
@@ -2361,6 +2369,7 @@ namespace NKikimr {
TEvVCheckReadinessResult(NKikimrProto::EReplyStatus status) {
Record.SetStatus(status);
+ Record.SetExtraBlockChecksSupport(true);
}
};
diff --git a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.cpp b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.cpp
index 989c5e920c5..6c154281eb8 100644
--- a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.cpp
+++ b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.cpp
@@ -201,7 +201,8 @@ namespace NKikimr {
ctx.Send(NotifyID, new TEvHullHugeWritten(HugeSlot));
ctx.Send(HugeKeeperCtx->SkeletonId,
new TEvHullLogHugeBlob(WriteId, Item->LogoBlobId, Item->Ingress, DiskAddr,
- Item->IgnoreBlock, Item->SenderId, Item->Cookie, std::move(Item->Result)));
+ Item->IgnoreBlock, Item->SenderId, Item->Cookie, std::move(Item->Result),
+ &Item->ExtraBlockChecks));
LOG_DEBUG(ctx, BS_HULLHUGE,
VDISKP(HugeKeeperCtx->VCtx->VDiskLogPrefix,
"Writer: finish: id# %s diskAddr# %s",
diff --git a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.h b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.h
index ae6d7536eab..1876855d797 100644
--- a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.h
+++ b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.h
@@ -21,6 +21,7 @@ namespace NKikimr {
const bool IgnoreBlock;
const NKikimrBlobStorage::EPutHandleClass HandleClass;
std::unique_ptr<TEvBlobStorage::TEvVPutResult> Result;
+ NProtoBuf::RepeatedPtrField<NKikimrBlobStorage::TEvVPut::TExtraBlockCheck> ExtraBlockChecks;
TEvHullWriteHugeBlob(const TActorId &senderId,
ui64 cookie,
@@ -29,7 +30,8 @@ namespace NKikimr {
TRope&& data,
bool ignoreBlock,
NKikimrBlobStorage::EPutHandleClass handleClass,
- std::unique_ptr<TEvBlobStorage::TEvVPutResult> result)
+ std::unique_ptr<TEvBlobStorage::TEvVPutResult> result,
+ NProtoBuf::RepeatedPtrField<NKikimrBlobStorage::TEvVPut::TExtraBlockCheck> *extraBlockChecks)
: SenderId(senderId)
, Cookie(cookie)
, LogoBlobId(logoBlobId)
@@ -38,7 +40,11 @@ namespace NKikimr {
, IgnoreBlock(ignoreBlock)
, HandleClass(handleClass)
, Result(std::move(result))
- {}
+ {
+ if (extraBlockChecks) {
+ ExtraBlockChecks.Swap(extraBlockChecks);
+ }
+ }
ui64 ByteSize() const {
return Data.GetSize();
@@ -64,6 +70,7 @@ namespace NKikimr {
const TActorId OrigClient;
const ui64 OrigCookie;
std::unique_ptr<TEvBlobStorage::TEvVPutResult> Result;
+ NProtoBuf::RepeatedPtrField<NKikimrBlobStorage::TEvVPut::TExtraBlockCheck> ExtraBlockChecks;
TEvHullLogHugeBlob(ui64 writeId,
const TLogoBlobID &logoBlobID,
@@ -72,7 +79,8 @@ namespace NKikimr {
bool ignoreBlock,
const TActorId &origClient,
ui64 origCookie,
- std::unique_ptr<TEvBlobStorage::TEvVPutResult> result)
+ std::unique_ptr<TEvBlobStorage::TEvVPutResult> result,
+ NProtoBuf::RepeatedPtrField<NKikimrBlobStorage::TEvVPut::TExtraBlockCheck> *extraBlockChecks)
: WriteId(writeId)
, LogoBlobID(logoBlobID)
, Ingress(ingress)
@@ -81,8 +89,11 @@ namespace NKikimr {
, OrigClient(origClient)
, OrigCookie(origCookie)
, Result(std::move(result))
-
- {}
+ {
+ if (extraBlockChecks) {
+ ExtraBlockChecks.Swap(extraBlockChecks);
+ }
+ }
};
////////////////////////////////////////////////////////////////////////////
diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp
index 33a8e1153b5..58c881fb027 100644
--- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp
+++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp
@@ -161,7 +161,8 @@ namespace NKikimr {
THullCheckStatus THull::CheckLogoBlob(
const TActorContext &ctx,
const TLogoBlobID &id,
- bool ignoreBlock)
+ bool ignoreBlock,
+ const NProtoBuf::RepeatedPtrField<NKikimrBlobStorage::TEvVPut::TExtraBlockCheck>& extraBlockChecks)
{
// check blocked
if (!ignoreBlock) {
@@ -174,6 +175,18 @@ namespace NKikimr {
case TBlocksCache::EStatus::BLOCKED_INFLIGH:
return {NKikimrProto::BLOCKED, "blocked", res.Lsn, true};
}
+
+ for (const auto& item : extraBlockChecks) {
+ auto res = BlocksCache.IsBlocked(item.GetTabletId(), {item.GetGeneration(), 0});
+ switch (res.Status) {
+ case TBlocksCache::EStatus::OK:
+ break;
+ case TBlocksCache::EStatus::BLOCKED_PERS:
+ return {NKikimrProto::BLOCKED, "blocked", 0, false};
+ case TBlocksCache::EStatus::BLOCKED_INFLIGH:
+ return {NKikimrProto::BLOCKED, "blocked", res.Lsn, true};
+ }
+ }
}
ValidateWriteQuery(ctx, id);
diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h
index 309c3d52c2f..60c68a51114 100644
--- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h
+++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h
@@ -92,7 +92,8 @@ namespace NKikimr {
THullCheckStatus CheckLogoBlob(
const TActorContext &ctx,
const TLogoBlobID &id,
- bool ignoreBlock);
+ bool ignoreBlock,
+ const NProtoBuf::RepeatedPtrField<NKikimrBlobStorage::TEvVPut::TExtraBlockCheck>& extraBlockChecks);
void AddLogoBlob(
const TActorContext &ctx,
diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp
index a5f5483bbdc..e89088f0864 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp
+++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp
@@ -301,12 +301,16 @@ namespace NKikimr {
TLsnSeg Lsn = {};
THullCheckStatus HullStatus;
bool IsHugeBlob = false;
+ NProtoBuf::RepeatedPtrField<NKikimrBlobStorage::TEvVPut::TExtraBlockCheck> ExtraBlockChecks;
- TVPutInfo(TLogoBlobID blobId, TRope &&buffer)
+ TVPutInfo(TLogoBlobID blobId, TRope &&buffer,
+ NProtoBuf::RepeatedPtrField<NKikimrBlobStorage::TEvVPut::TExtraBlockCheck> *extraBlockChecks)
: Buffer(std::move(buffer))
, BlobId(blobId)
- , HullStatus({NKikimrProto::UNKNOWN, 0 ,false})
- {}
+ , HullStatus({NKikimrProto::UNKNOWN, 0, false})
+ {
+ ExtraBlockChecks.Swap(extraBlockChecks);
+ }
};
void UpdatePDiskWriteBytes(size_t size) {
@@ -378,11 +382,12 @@ namespace NKikimr {
std::move(info.Buffer), *Arena);
UpdatePDiskWriteBytes(info.Buffer.GetSize());
return std::make_unique<TEvHullWriteHugeBlob>(sender, cookie, info.BlobId, info.Ingress,
- std::move(info.Buffer), ignoreBlock, handleClass, std::move(res));
+ std::move(info.Buffer), ignoreBlock, handleClass, std::move(res), &info.ExtraBlockChecks);
}
THullCheckStatus ValidateVPut(const TActorContext &ctx, TString evPrefix,
- TLogoBlobID id, ui64 bufSize, bool ignoreBlock)
+ TLogoBlobID id, ui64 bufSize, bool ignoreBlock,
+ const NProtoBuf::RepeatedPtrField<NKikimrBlobStorage::TEvVPut::TExtraBlockCheck>& extraBlockChecks)
{
ui64 blobPartSize = 0;
try {
@@ -411,7 +416,7 @@ namespace NKikimr {
return {NKikimrProto::ERROR, "buffer is too large"};
}
- auto status = Hull->CheckLogoBlob(ctx, id, ignoreBlock);
+ auto status = Hull->CheckLogoBlob(ctx, id, ignoreBlock, extraBlockChecks);
if (status.Status != NKikimrProto::OK) {
LOG_ERROR_S(ctx, BS_VDISK_PUT, VCtx->VDiskLogPrefix << evPrefix << ": failed to pass the Hull check;"
<< " id# " << id
@@ -475,9 +480,9 @@ namespace NKikimr {
TBatchedVec<TVPutInfo> putsInfo;
ui64 lsnCount = 0;
for (ui64 itemIdx = 0; itemIdx < record.ItemsSize(); ++itemIdx) {
- auto &item = record.GetItems(itemIdx);
+ auto &item = *record.MutableItems(itemIdx);
TLogoBlobID blobId = LogoBlobIDFromLogoBlobID(item.GetBlobID());
- putsInfo.emplace_back(blobId, ev->Get()->GetItemBuffer(itemIdx));
+ putsInfo.emplace_back(blobId, ev->Get()->GetItemBuffer(itemIdx), item.MutableExtraBlockChecks());
TVPutInfo &info = putsInfo.back();
try {
@@ -488,7 +493,8 @@ namespace NKikimr {
}
if (info.HullStatus.Status == NKikimrProto::UNKNOWN) {
- info.HullStatus = ValidateVPut(ctx, "TEvVMultiPut", blobId, info.Buffer.GetSize(), ignoreBlock);
+ info.HullStatus = ValidateVPut(ctx, "TEvVMultiPut", blobId, info.Buffer.GetSize(), ignoreBlock,
+ info.ExtraBlockChecks);
}
if (info.HullStatus.Status == NKikimrProto::OK) {
@@ -587,7 +593,7 @@ namespace NKikimr {
ctx.Send(Db->HugeKeeperID, hugeWrite.release(), 0, 0, NWilson::TTraceId(traceId));
} else {
ctx.Send(SelfId(), new TEvHullLogHugeBlob(0, info.BlobId, info.Ingress, TDiskPart(),
- ignoreBlock, vMultiPutActorId, cookie, std::move(result)));
+ ignoreBlock, vMultiPutActorId, cookie, std::move(result), &info.ExtraBlockChecks));
}
} else {
Y_VERIFY(lsnBatch.First <= lsnBatch.Last);
@@ -648,7 +654,7 @@ namespace NKikimr {
const TLogoBlobID id = LogoBlobIDFromLogoBlobID(record.GetBlobID());
LWTRACK(VDiskSkeletonVPutRecieved, ev->Get()->Orbit, VCtx->NodeId, VCtx->GroupId,
VCtx->Top->GetFailDomainOrderNumber(VCtx->ShortSelfVDisk), id.TabletID(), id.BlobSize());
- TVPutInfo info(id, ev->Get()->GetBuffer());
+ TVPutInfo info(id, ev->Get()->GetBuffer(), record.MutableExtraBlockChecks());
const ui64 bufSize = info.Buffer.GetSize();
try {
@@ -674,7 +680,7 @@ namespace NKikimr {
return;
}
- info.HullStatus = ValidateVPut(ctx, "TEvVPut", id, bufSize, ignoreBlock);
+ info.HullStatus = ValidateVPut(ctx, "TEvVPut", id, bufSize, ignoreBlock, info.ExtraBlockChecks);
if (info.HullStatus.Status != NKikimrProto::OK) {
ReplyError(info.HullStatus, ev, ctx, now);
return;
@@ -720,7 +726,7 @@ namespace NKikimr {
ctx.Send(Db->HugeKeeperID, hugeWrite.release(), 0, 0, std::move(ev->TraceId));
} else {
ctx.Send(SelfId(), new TEvHullLogHugeBlob(0, info.BlobId, info.Ingress, TDiskPart(),
- ignoreBlock, ev->Sender, ev->Cookie, std::move(result)));
+ ignoreBlock, ev->Sender, ev->Cookie, std::move(result), &info.ExtraBlockChecks));
}
}
@@ -729,7 +735,7 @@ namespace NKikimr {
// update hull write duration
msg->Result->MarkHugeWriteTime();
- auto status = Hull->CheckLogoBlob(ctx, msg->LogoBlobID, msg->IgnoreBlock);
+ auto status = Hull->CheckLogoBlob(ctx, msg->LogoBlobID, msg->IgnoreBlock, msg->ExtraBlockChecks);
if (status.Status != NKikimrProto::OK) {
msg->Result->UpdateStatus(status.Status); // modify status in result
LOG_DEBUG_S(ctx, BS_VDISK_PUT, VCtx->VDiskLogPrefix
@@ -1541,9 +1547,10 @@ namespace NKikimr {
TIngress ingress = *TIngress::CreateIngressWithLocal(VCtx->Top.get(), SelfVDiskId, id);
if (buf) {
ctx.Send(Db->HugeKeeperID, new TEvHullWriteHugeBlob(ev->Sender, ev->Cookie, id, ingress, std::move(buf),
- true, NKikimrBlobStorage::EPutHandleClass::AsyncBlob, std::move(result)));
+ true, NKikimrBlobStorage::EPutHandleClass::AsyncBlob, std::move(result), nullptr));
} else {
- ctx.Send(SelfId(), new TEvHullLogHugeBlob(0, id, ingress, TDiskPart(), true, ev->Sender, ev->Cookie, std::move(result)));
+ ctx.Send(SelfId(), new TEvHullLogHugeBlob(0, id, ingress, TDiskPart(), true, ev->Sender, ev->Cookie,
+ std::move(result), nullptr));
}
}
diff --git a/ydb/core/protos/blobstorage.proto b/ydb/core/protos/blobstorage.proto
index 632eda54dcb..d9f07a932ac 100644
--- a/ydb/core/protos/blobstorage.proto
+++ b/ydb/core/protos/blobstorage.proto
@@ -356,6 +356,11 @@ message TEvVInplacePatchResult {
}
message TEvVPut {
+ message TExtraBlockCheck {
+ optional fixed64 TabletId = 1;
+ optional uint32 Generation = 2;
+ }
+
optional NKikimrProto.TLogoBlobID BlobID = 1;
optional bytes Buffer = 2;
@@ -368,6 +373,8 @@ message TEvVPut {
optional EPutHandleClass HandleClass = 9;
optional TMsgQoS MsgQoS = 10;
optional TTimestamps Timestamps = 23;
+
+ repeated TExtraBlockCheck ExtraBlockChecks = 11;
}
message TEvVPutResult {
@@ -393,6 +400,8 @@ message TVMultiPutItem {
optional uint64 FullDataSize = 3;
optional uint64 Cookie = 4;
+
+ repeated TEvVPut.TExtraBlockCheck ExtraBlockChecks = 5;
}
message TEvVMultiPut {
@@ -630,6 +639,7 @@ message TEvVCheckReadinessResult {
//optional bool SupportProtoWithPayload = 2;
optional TMessageId ExpectedMsgId = 3;
optional TVDiskCostSettings CostSettings = 4;
+ optional bool ExtraBlockChecksSupport = 5;
optional TGroupInfo RecentGroup = 100; // filled in by VDisk on RACE when VDisk has newer generation
}