aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-12-07 19:55:02 +0300
committeralexvru <alexvru@ydb.tech>2023-12-07 20:56:15 +0300
commite8437d81a04fbfae5c0de3e701a3c039205eaab1 (patch)
tree0913cdb7b274d69e6c338676151057cc4b83e53a
parent5ddac82c510633e79e4a48d13f708c85765cb5e9 (diff)
downloadydb-e8437d81a04fbfae5c0de3e701a3c039205eaab1.tar.gz
Handle long-lasting Put queries in DS proxy correctly -- part 1 KIKIMR-9016
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp171
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h7
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_put.cpp108
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h5
4 files changed, 167 insertions, 124 deletions
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp
index 3dc9dba1b4..0de81447ee 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp
@@ -92,7 +92,7 @@ void TBlobState::AddResponseData(const TBlobStorageGroupInfo &info, const TLogoB
ui32 shift, TRope&& data, bool keep, bool doNotKeep) {
// Add actual data to Parts
Y_ABORT_UNLESS(id.PartId() != 0);
- ui32 partIdx = id.PartId() - 1;
+ const ui32 partIdx = id.PartId() - 1;
Y_ABORT_UNLESS(partIdx < Parts.size());
const ui32 partSize = info.Type.PartSize(id);
const ui32 dataSize = data.size();
@@ -100,114 +100,89 @@ void TBlobState::AddResponseData(const TBlobStorageGroupInfo &info, const TLogoB
Parts[partIdx].AddResponseData(partSize, shift, std::move(data));
}
IsChanged = true;
+
+ const ui32 diskIdx = info.GetIdxInSubgroup(info.GetVDiskId(orderNumber), id.Hash());
+ Y_ABORT_UNLESS(diskIdx != info.Type.BlobSubgroupSize());
+ TDisk& disk = Disks[diskIdx];
+ Y_ABORT_UNLESS(disk.OrderNumber == orderNumber);
+
// Mark part as present for the disk
- bool isFound = false;
- for (ui32 diskIdx = 0; diskIdx < Disks.size(); ++diskIdx) {
- TDisk &disk = Disks[diskIdx];
- if (disk.OrderNumber == orderNumber) {
- isFound = true;
- Y_ABORT_UNLESS(partIdx < disk.DiskParts.size());
- TDiskPart &diskPart = disk.DiskParts[partIdx];
- //Cerr << Endl << "present diskIdx# " << diskIdx << " partIdx# " << partIdx << Endl << Endl;
- diskPart.Situation = ESituation::Present;
- if (partSize) {
- TIntervalVec<i32> responseInterval(shift, shift + dataSize);
- diskPart.Requested.Subtract(responseInterval);
- }
- break;
- }
+ Y_ABORT_UNLESS(partIdx < disk.DiskParts.size());
+ TDiskPart &diskPart = disk.DiskParts[partIdx];
+ diskPart.Situation = ESituation::Present;
+ if (partSize) {
+ TIntervalVec<i32> responseInterval(shift, shift + dataSize);
+ diskPart.Requested.Subtract(responseInterval);
}
- Y_ABORT_UNLESS(isFound);
+
Keep |= keep;
DoNotKeep |= doNotKeep;
}
void TBlobState::AddNoDataResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 orderNumber) {
- Y_UNUSED(info);
Y_ABORT_UNLESS(id.PartId() != 0);
- ui32 partIdx = id.PartId() - 1;
+ const ui32 partIdx = id.PartId() - 1;
IsChanged = true;
- // Mark part as absent for the disk
- bool isFound = false;
- for (ui32 diskIdx = 0; diskIdx < Disks.size(); ++diskIdx) {
- TDisk &disk = Disks[diskIdx];
- if (disk.OrderNumber == orderNumber) {
- isFound = true;
- Y_ABORT_UNLESS(partIdx < disk.DiskParts.size());
- TDiskPart &diskPart = disk.DiskParts[partIdx];
- //Cerr << Endl << "absent diskIdx# " << diskIdx << " partIdx# " << partIdx << Endl << Endl;
- diskPart.Situation = ESituation::Absent;
- diskPart.Requested.Clear();
- break;
- }
- }
- Y_ABORT_UNLESS(isFound);
+
+ const ui32 diskIdx = info.GetIdxInSubgroup(info.GetVDiskId(orderNumber), id.Hash());
+ Y_ABORT_UNLESS(diskIdx != info.Type.BlobSubgroupSize());
+ TDisk& disk = Disks[diskIdx];
+ Y_ABORT_UNLESS(disk.OrderNumber == orderNumber);
+
+ Y_ABORT_UNLESS(partIdx < disk.DiskParts.size());
+ TDiskPart &diskPart = disk.DiskParts[partIdx];
+ diskPart.Situation = ESituation::Absent;
+ diskPart.Requested.Clear();
}
void TBlobState::AddPutOkResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 orderNumber) {
- Y_UNUSED(info);
Y_ABORT_UNLESS(id.PartId() != 0);
- ui32 partIdx = id.PartId() - 1;
+ const ui32 partIdx = id.PartId() - 1;
IsChanged = true;
- // Mark part as put ok for the disk
- bool isFound = false;
- for (ui32 diskIdx = 0; diskIdx < Disks.size(); ++diskIdx) {
- TDisk &disk = Disks[diskIdx];
- if (disk.OrderNumber == orderNumber) {
- isFound = true;
- Y_ABORT_UNLESS(partIdx < disk.DiskParts.size());
- TDiskPart &diskPart = disk.DiskParts[partIdx];
- //Cerr << Endl << "put ok diskIdx# " << diskIdx << " partIdx# " << partIdx << Endl << Endl;
- diskPart.Situation = ESituation::Present;
- break;
- }
- }
- Y_ABORT_UNLESS(isFound);
+
+ const ui32 diskIdx = info.GetIdxInSubgroup(info.GetVDiskId(orderNumber), id.Hash());
+ Y_ABORT_UNLESS(diskIdx != info.Type.BlobSubgroupSize());
+ TDisk& disk = Disks[diskIdx];
+ Y_ABORT_UNLESS(disk.OrderNumber == orderNumber);
+
+ Y_ABORT_UNLESS(partIdx < disk.DiskParts.size());
+ TDiskPart& diskPart = disk.DiskParts[partIdx];
+ diskPart.Situation = ESituation::Present;
}
void TBlobState::AddErrorResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 orderNumber) {
- Y_UNUSED(info);
Y_ABORT_UNLESS(id.PartId() != 0);
ui32 partIdx = id.PartId() - 1;
IsChanged = true;
- // Mark part as error for the disk
- bool isFound = false;
- for (ui32 diskIdx = 0; diskIdx < Disks.size(); ++diskIdx) {
- TDisk &disk = Disks[diskIdx];
- if (disk.OrderNumber == orderNumber) {
- isFound = true;
- Y_ABORT_UNLESS(partIdx < disk.DiskParts.size());
- TDiskPart &diskPart = disk.DiskParts[partIdx];
- //Cerr << Endl << "error diskIdx# " << diskIdx << " partIdx# " << partIdx << Endl << Endl;
- diskPart.Situation = ESituation::Error;
- diskPart.Requested.Clear();
- break;
- }
- }
- Y_ABORT_UNLESS(isFound);
+
+ const ui32 diskIdx = info.GetIdxInSubgroup(info.GetVDiskId(orderNumber), id.Hash());
+ Y_ABORT_UNLESS(diskIdx != info.Type.BlobSubgroupSize());
+ TDisk& disk = Disks[diskIdx];
+ Y_ABORT_UNLESS(disk.OrderNumber == orderNumber);
+
+ Y_ABORT_UNLESS(partIdx < disk.DiskParts.size());
+ TDiskPart &diskPart = disk.DiskParts[partIdx];
+ diskPart.Situation = ESituation::Error;
+ diskPart.Requested.Clear();
}
void TBlobState::AddNotYetResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 orderNumber,
bool keep, bool doNotKeep) {
- Y_UNUSED(info);
Y_ABORT_UNLESS(id.PartId() != 0);
- ui32 partIdx = id.PartId() - 1;
+ const ui32 partIdx = id.PartId() - 1;
IsChanged = true;
- // Mark part as error for the disk
- bool isFound = false;
- for (ui32 diskIdx = 0; diskIdx < Disks.size(); ++diskIdx) {
- TDisk &disk = Disks[diskIdx];
- if (disk.OrderNumber == orderNumber) {
- isFound = true;
- Y_ABORT_UNLESS(partIdx < disk.DiskParts.size());
- TDiskPart &diskPart = disk.DiskParts[partIdx];
- //Cerr << Endl << "error diskIdx# " << diskIdx << " partIdx# " << partIdx << Endl << Endl;
- diskPart.Situation = ESituation::Lost;
- diskPart.Requested.Clear();
- break;
- }
- }
- Y_ABORT_UNLESS(isFound);
+
+ const ui32 diskIdx = info.GetIdxInSubgroup(info.GetVDiskId(orderNumber), id.Hash());
+ Y_ABORT_UNLESS(diskIdx != info.Type.BlobSubgroupSize());
+ TDisk& disk = Disks[diskIdx];
+ Y_ABORT_UNLESS(disk.OrderNumber == orderNumber);
+
+ Y_ABORT_UNLESS(partIdx < disk.DiskParts.size());
+ TDiskPart &diskPart = disk.DiskParts[partIdx];
+ //Cerr << Endl << "error diskIdx# " << diskIdx << " partIdx# " << partIdx << Endl << Endl;
+ diskPart.Situation = ESituation::Lost;
+ diskPart.Requested.Clear();
+
Keep |= keep;
DoNotKeep |= doNotKeep;
}
@@ -236,6 +211,20 @@ void TBlobState::GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TG
}
}
+bool TBlobState::HasWrittenQuorum(const TBlobStorageGroupInfo& info, const TBlobStorageGroupInfo::TGroupVDisks& expired) const {
+ TSubgroupPartLayout layout;
+ for (ui32 diskIdx = 0, numDisks = Disks.size(); diskIdx < numDisks; ++diskIdx) {
+ const TDisk& disk = Disks[diskIdx];
+ for (ui32 partIdx = 0, numParts = disk.DiskParts.size(); partIdx < numParts; ++partIdx) {
+ const TDiskPart& part = disk.DiskParts[partIdx];
+ if (part.Situation == ESituation::Present && !expired[disk.OrderNumber]) {
+ layout.AddItem(diskIdx, partIdx, info.Type);
+ }
+ }
+ }
+ return info.GetQuorumChecker().GetBlobState(layout, {&info.GetTopology()}) == TBlobStorageGroupInfo::EBS_FULL;
+}
+
TString TBlobState::ToString() const {
TStringStream str;
str << "{Id# " << Id.ToString();
@@ -615,5 +604,21 @@ TString TBlackboard::ToString() const {
return str.Str();
}
+void TBlackboard::InvalidatePartStates(ui32 orderNumber) {
+ const TVDiskID vdiskId = Info->GetVDiskId(orderNumber);
+ for (auto& [id, state] : BlobStates) {
+ Y_ABORT_UNLESS(!state.IsDone);
+ if (const ui32 diskIdx = Info->GetIdxInSubgroup(vdiskId, id.Hash()); diskIdx != Info->Type.BlobSubgroupSize()) {
+ TBlobState::TDisk& disk = state.Disks[diskIdx];
+ for (ui32 partIdx = 0; partIdx < disk.DiskParts.size(); ++partIdx) {
+ TBlobState::TDiskPart& part = disk.DiskParts[partIdx];
+ if (part.Situation == TBlobState::ESituation::Sent || part.Situation == TBlobState::ESituation::Present) {
+ part.Situation = TBlobState::ESituation::Unknown;
+ state.IsChanged = true;
+ }
+ }
+ }
+ }
+}
}//NKikimr
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h
index d61e250f32..f61e226dd6 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h
@@ -5,6 +5,7 @@
#include <ydb/core/blobstorage/base/batched_vec.h>
#include <ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.h>
+#include <ydb/core/blobstorage/groupinfo/blobstorage_groupinfo_partlayout.h>
#include <ydb/core/util/fragmented_buffer.h>
#include <ydb/core/util/interval_set.h>
#include <library/cpp/containers/stack_vector/stack_vec.h>
@@ -63,12 +64,14 @@ struct TBlobState {
struct TDiskPart {
TIntervalSet<i32> Requested;
ESituation Situation = ESituation::Unknown;
+
TString ToString() const;
};
struct TDisk {
ui32 OrderNumber;
bool IsSlow = false;
TStackVec<TDiskPart, TypicalPartsInBlob> DiskParts;
+
TString ToString() const;
};
@@ -105,6 +108,8 @@ struct TBlobState {
NKikimrBlobStorage::EVDiskQueueId queueId,
ui64 *outWorstNs, ui64 *outNextToWorstNs, i32 *outWorstSubgroupIdx) const;
TString ToString() const;
+ bool HasWrittenQuorum(const TBlobStorageGroupInfo& info, const TBlobStorageGroupInfo::TGroupVDisks& expired) const;
+
static TString SituationToString(ESituation situation);
};
@@ -228,6 +233,8 @@ struct TBlackboard {
}
}
+ void InvalidatePartStates(ui32 orderNumber);
+
void RegisterBlobForPut(const TLogoBlobID& id, std::vector<std::pair<ui64, ui32>> *extraBlockChecks, NWilson::TSpan *span);
TBlobState& operator [](const TLogoBlobID& id);
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp
index 35d9cc681d..39112508ae 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp
@@ -70,6 +70,12 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
bool RequireExtraBlockChecks = false;
+ struct TIncarnationRecord {
+ ui64 IncarnationGuid;
+ TMonotonic ExpirationTimestamp;
+ };
+ std::vector<std::optional<TIncarnationRecord>> IncarnationRecords;
+
void SanityCheck() {
if (RequestsSent <= MaxSaneRequests) {
return;
@@ -89,29 +95,58 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
SanityCheck(); // May Die
}
+ template<typename TEvent, typename TCookie>
+ void SendPuts(auto&& callback) {
+ TDeque<std::unique_ptr<TEvent>> v;
+ callback(v);
+ UpdatePengingVDiskResponseCount<TEvent, TCookie>(v);
+ RequestsSent += v.size();
+ CountPuts(v);
+ SendToQueues(v, TimeStatsEnabled);
+ }
+
void Accelerate() {
if (IsAccelerated) {
return;
}
IsAccelerated = true;
+ auto callback = [this](auto& v) {
+ PutImpl.Accelerate(LogCtx, v);
+ *(IsMultiPutMode ? Mon->NodeMon->AccelerateEvVMultiPutCount : Mon->NodeMon->AccelerateEvVPutCount) += v.size();
+ };
+
if (IsMultiPutMode) {
- TDeque<std::unique_ptr<TEvBlobStorage::TEvVMultiPut>> vMultiPuts;
- PutImpl.Accelerate(LogCtx, vMultiPuts);
- UpdatePengingVDiskResponseCount<TEvBlobStorage::TEvVMultiPut, TVMultiPutCookie>(vMultiPuts);
- RequestsSent += vMultiPuts.size();
- *Mon->NodeMon->AccelerateEvVMultiPutCount += vMultiPuts.size();
- CountPuts(vMultiPuts);
- SendToQueues(vMultiPuts, TimeStatsEnabled);
+ SendPuts<TEvBlobStorage::TEvVMultiPut, TVMultiPutCookie>(callback);
} else {
- TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> vPuts;
- PutImpl.Accelerate(LogCtx, vPuts);
- UpdatePengingVDiskResponseCount<TEvBlobStorage::TEvVPut, TBlobCookie>(vPuts);
- RequestsSent += vPuts.size();
- *Mon->NodeMon->AccelerateEvVPutCount += vPuts.size();
- CountPuts(vPuts);
- SendToQueues(vPuts, TimeStatsEnabled);
+ SendPuts<TEvBlobStorage::TEvVPut, TBlobCookie>(callback);
+ }
+ }
+
+ void HandleIncarnation(TMonotonic timestamp, ui32 orderNumber, ui64 incarnationGuid) {
+ Y_ABORT_UNLESS(orderNumber < IncarnationRecords.size());
+ if (auto& record = IncarnationRecords[orderNumber]; !record) {
+ record = TIncarnationRecord{
+ .IncarnationGuid = incarnationGuid,
+ .ExpirationTimestamp = timestamp,
+ };
+ } else if (record->IncarnationGuid != incarnationGuid) {
+ PutImpl.InvalidatePartStates(orderNumber);
+ record->IncarnationGuid = incarnationGuid;
+ record->ExpirationTimestamp = timestamp;
+ } else if (record->ExpirationTimestamp < timestamp) {
+ record->ExpirationTimestamp = timestamp;
+ }
+ }
+
+ TBlobStorageGroupInfo::TGroupVDisks CreateExpiredVDiskSet(TMonotonic timestamp) const {
+ TBlobStorageGroupInfo::TGroupVDisks res(&Info->GetTopology());
+ for (ui32 i = 0; i < IncarnationRecords.size(); ++i) {
+ if (auto& record = IncarnationRecords[i]; record && record->ExpirationTimestamp <= timestamp) {
+ res |= {&Info->GetTopology(), Info->GetVDiskId(i)};
+ }
}
+ return res;
}
void Handle(TEvBlobStorage::TEvVPutResult::TPtr &ev) {
@@ -148,6 +183,10 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
TVDiskID vDiskId = VDiskIDFromVDiskID(record.GetVDiskID());
const TVDiskIdShort shortId(vDiskId);
+ if (record.HasIncarnationGuid()) {
+ HandleIncarnation(TActivationContext::Monotonic(), Info->GetOrderNumber(shortId), record.GetIncarnationGuid());
+ }
+
LWPROBE(DSProxyVDiskRequestDuration, TEvBlobStorage::EvVPut, blob.BlobSize(), blob.TabletID(),
Info->GroupID, blob.Channel(), Info->GetFailDomainOrderNumber(shortId),
GetStartTime(record.GetTimestamps()),
@@ -162,13 +201,10 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
GetVDiskTimeMs(record.GetTimestamps()));
}
- TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> vPuts;
TPutImpl::TPutResultVec putResults;
- PutImpl.OnVPutEventResult(LogCtx, ev->Sender, *ev->Get(), vPuts, putResults);
- UpdatePengingVDiskResponseCount<TEvBlobStorage::TEvVPut, TBlobCookie>(vPuts);
- RequestsSent += vPuts.size();
- CountPuts(vPuts);
- SendToQueues(vPuts, TimeStatsEnabled);
+ SendPuts<TEvBlobStorage::TEvVPut, TBlobCookie>([&](auto& v) {
+ PutImpl.OnVPutEventResult(LogCtx, ev->Sender, *ev->Get(), v, putResults);
+ });
if (ReplyAndDieWithLastResponse(putResults)) {
return;
}
@@ -209,6 +245,10 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
TVDiskID vDiskId = VDiskIDFromVDiskID(record.GetVDiskID());
const TVDiskIdShort shortId(vDiskId);
+ if (record.HasIncarnationGuid()) {
+ HandleIncarnation(TActivationContext::Monotonic(), Info->GetOrderNumber(shortId), record.GetIncarnationGuid());
+ }
+
Y_ABORT_UNLESS(record.HasCookie());
TVMultiPutCookie cookie(record.GetCookie());
const ui64 vdisk = cookie.GetVDiskOrderNumber();
@@ -277,12 +317,9 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
}
putResults.clear();
- TDeque<std::unique_ptr<TEvBlobStorage::TEvVMultiPut>> vMultiPuts;
- PutImpl.OnVPutEventResult(LogCtx, ev->Sender, *ev->Get(), vMultiPuts, putResults);
- UpdatePengingVDiskResponseCount<TEvBlobStorage::TEvVMultiPut, TVMultiPutCookie>(vMultiPuts);
- RequestsSent += vMultiPuts.size();
- CountPuts(vMultiPuts);
- SendToQueues(vMultiPuts, TimeStatsEnabled);
+ SendPuts<TEvBlobStorage::TEvVMultiPut, TVMultiPutCookie>([&](auto& v) {
+ PutImpl.OnVPutEventResult(LogCtx, ev->Sender, *ev->Get(), v, putResults);
+ });
if (ReplyAndDieWithLastResponse(putResults)) {
return;
}
@@ -443,6 +480,7 @@ public:
, IsAccelerateScheduled(false)
, IsMultiPutMode(false)
, RequireExtraBlockChecks(!ev->ExtraBlockChecks.empty())
+ , IncarnationRecords(info->GetTotalVDisksNum())
{
if (ev->Orbit.HasShuttles()) {
RootCauseTrack.IsOn = true;
@@ -486,6 +524,7 @@ public:
, IsAccelerated(false)
, IsAccelerateScheduled(false)
, IsMultiPutMode(true)
+ , IncarnationRecords(info->GetTotalVDisksNum())
{
Y_DEBUG_ABORT_UNLESS(events.size() <= MaxBatchedPutRequests);
for (auto &ev : events) {
@@ -606,24 +645,13 @@ public:
}
}
- template<typename TEvV, typename TCookie>
- struct TIssue {
- void operator ()(TThis& self) {
- TDeque<std::unique_ptr<TEvV>> events;
- self.PutImpl.GenerateInitialRequests(self.LogCtx, self.PartSets, events);
- self.UpdatePengingVDiskResponseCount<TEvV, TCookie>(events);
- self.RequestsSent += events.size();
- self.CountPuts(events);
- self.SendToQueues(events, self.TimeStatsEnabled);
- }
- };
-
void ResumeBootstrap() {
if (EncodeQuantum()) {
+ auto callback = [this](auto& v) { PutImpl.GenerateInitialRequests(LogCtx, PartSets, v); };
if (IsMultiPutMode) {
- TIssue<TEvBlobStorage::TEvVMultiPut, TVMultiPutCookie>()(*this);
+ SendPuts<TEvBlobStorage::TEvVMultiPut, TVMultiPutCookie>(callback);
} else {
- TIssue<TEvBlobStorage::TEvVPut, TBlobCookie>()(*this);
+ SendPuts<TEvBlobStorage::TEvVPut, TBlobCookie>(callback);
}
} else {
TActivationContext::Send(new IEventHandle(TEvBlobStorage::EvResume, 0, SelfId(), {}, nullptr, 0));
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h
index 53f2483a80..e883bb960c 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h
@@ -427,10 +427,13 @@ public:
TString DumpFullState() const;
bool MarkBlobAsSent(ui64 blobIdx);
- bool MarkBlobAsSent(TMap<TLogoBlobID, TBlobState>::iterator it);
TString ToString() const;
+ void InvalidatePartStates(ui32 orderNumber) {
+ Blackboard.InvalidatePartStates(orderNumber);
+ }
+
protected:
bool RunStrategies(TLogContext &logCtx, TPutResultVec &outPutResults);
bool RunStrategy(TLogContext &logCtx, const IStrategy& strategy, TPutResultVec &outPutResults);