diff options
author | alexvru <alexvru@ydb.tech> | 2023-01-20 14:19:42 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-01-20 14:19:42 +0300 |
commit | 429aca7da4af518479dfeffceafd89221bda56d7 (patch) | |
tree | e470a132e4fc542b19cdc2f34c36afd809a7731c | |
parent | b6cea35840afb1e5f170db886c0706d27b79bae2 (diff) | |
download | ydb-429aca7da4af518479dfeffceafd89221bda56d7.tar.gz |
Fix BlobDepot decommission mode bugs
-rw-r--r-- | ydb/core/base/blobstorage.h | 4 | ||||
-rw-r--r-- | ydb/core/blob_depot/blob_depot.cpp | 2 | ||||
-rw-r--r-- | ydb/core/blob_depot/data.cpp | 5 | ||||
-rw-r--r-- | ydb/core/blob_depot/data.h | 11 | ||||
-rw-r--r-- | ydb/core/blob_depot/data_resolve.cpp | 264 | ||||
-rw-r--r-- | ydb/core/blobstorage/dsproxy/dsproxy_range.cpp | 4 |
6 files changed, 185 insertions, 105 deletions
diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h index 555baa1213..bbbe141609 100644 --- a/ydb/core/base/blobstorage.h +++ b/ydb/core/base/blobstorage.h @@ -1788,9 +1788,11 @@ struct TEvBlobStorage { TResponse() {} - TResponse(const TLogoBlobID &id, const TString &x) + TResponse(const TLogoBlobID &id, const TString &x, bool keep = false, bool doNotKeep = false) : Id(id) , Buffer(x) + , Keep(keep) + , DoNotKeep(doNotKeep) {} }; diff --git a/ydb/core/blob_depot/blob_depot.cpp b/ydb/core/blob_depot/blob_depot.cpp index 8c5e1627a2..86661f9590 100644 --- a/ydb/core/blob_depot/blob_depot.cpp +++ b/ydb/core/blob_depot/blob_depot.cpp @@ -88,7 +88,7 @@ namespace NKikimr::NBlobDepot { hFunc(TEvBlobStorage::TEvCollectGarbageResult, Data->Handle); hFunc(TEvBlobStorage::TEvRangeResult, Data->Handle); - hFunc(TEvBlobStorage::TEvGetResult, Data->UncertaintyResolver->Handle); + hFunc(TEvBlobStorage::TEvGetResult, Data->Handle); hFunc(TEvBlobStorage::TEvStatusResult, SpaceMonitor->Handle); cFunc(TEvPrivate::EvKickSpaceMonitor, KickSpaceMonitor); diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp index dcb728a77a..fd1c25f69f 100644 --- a/ydb/core/blob_depot/data.cpp +++ b/ydb/core/blob_depot/data.cpp @@ -352,8 +352,9 @@ namespace NKikimr::NBlobDepot { } bool TData::AddDataOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBlob& blob, - NTabletFlatExecutor::TTransactionContext& txc, void *cookie) { - Y_VERIFY_S(Loaded || IsKeyLoaded(TKey(blob.Id)), "Id# " << Self->GetLogId() << " Blob# " << blob.ToString()); + NTabletFlatExecutor::TTransactionContext& txc, void *cookie, bool suppressLoadedCheck) { + Y_VERIFY_S(suppressLoadedCheck || Loaded || IsKeyLoaded(TKey(blob.Id)), "Id# " << Self->GetLogId() + << " Blob# " << blob.ToString()); return UpdateKey(TKey(blob.Id), txc, cookie, "AddDataOnDecommit", [&](TValue& value, bool inserted) { bool change = inserted; diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h index 536aafd09b..33a883c055 100644 --- a/ydb/core/blob_depot/data.h +++ b/ydb/core/blob_depot/data.h @@ -363,9 +363,10 @@ namespace NKikimr::NBlobDepot { TEvBlobDepot::TEvResolve::TPtr Ev; // original resolve request ui32 NumRangesInFlight; std::deque<TEvBlobStorage::TEvAssimilateResult::TBlob> DecommitBlobs = {}; - std::vector<std::tuple<TLogoBlobID, TLogoBlobID>> Errors = {}; + std::vector<TKey> ResolutionErrors = {}; + std::deque<TKey> DropNodataBlobs = {}; }; - ui64 LastRangeId = 0; + ui64 LastResolveQueryId = 0; THashMap<ui64, TResolveDecommitContext> ResolveDecommitContexts; class TTxIssueGC; @@ -456,7 +457,7 @@ namespace NKikimr::NBlobDepot { void AddLoadSkip(TKey key); void AddDataOnLoad(TKey key, TString value, bool uncertainWrite, bool skip); bool AddDataOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBlob& blob, - NTabletFlatExecutor::TTransactionContext& txc, void *cookie); + NTabletFlatExecutor::TTransactionContext& txc, void *cookie, bool suppressLoadedCheck = false); void AddTrashOnLoad(TLogoBlobID id); void AddGenStepOnLoad(ui8 channel, ui32 groupId, TGenStep issuedGenStep, TGenStep confirmedGenStep); @@ -501,6 +502,10 @@ namespace NKikimr::NBlobDepot { void Handle(TEvBlobDepot::TEvResolve::TPtr ev); void Handle(TEvBlobStorage::TEvRangeResult::TPtr ev); + void Handle(TEvBlobStorage::TEvGetResult::TPtr ev); + + template<typename TCallback> + void HandleResolutionResult(ui64 id, TCallback&& callback); ui64 GetTotalStoredDataSize() const { return TotalStoredDataSize; diff --git a/ydb/core/blob_depot/data_resolve.cpp b/ydb/core/blob_depot/data_resolve.cpp index ffb2e460f8..1f25d09a43 100644 --- a/ydb/core/blob_depot/data_resolve.cpp +++ b/ydb/core/blob_depot/data_resolve.cpp @@ -83,8 +83,7 @@ namespace NKikimr::NBlobDepot { class TData::TTxResolve : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { std::unique_ptr<TEvBlobDepot::TEvResolve::THandle> Request; - std::deque<TEvBlobStorage::TEvAssimilateResult::TBlob> DecommitBlobs; - std::vector<std::tuple<TLogoBlobID, TLogoBlobID>> Errors; + TResolveDecommitContext ResolveDecommitContext; bool KeysLoaded = false; int ItemIndex = 0; @@ -100,20 +99,17 @@ namespace NKikimr::NBlobDepot { TTxType GetTxType() const override { return NKikimrBlobDepot::TXTYPE_RESOLVE; } TTxResolve(TBlobDepot *self, TEvBlobDepot::TEvResolve::TPtr request, - std::deque<TEvBlobStorage::TEvAssimilateResult::TBlob>&& decommitBlobs = {}, - std::vector<std::tuple<TLogoBlobID, TLogoBlobID>>&& errors = {}) + TResolveDecommitContext&& resolveDecommitContext = {}) : TTransactionBase(self) , Request(request.Release()) - , DecommitBlobs(std::move(decommitBlobs)) - , Errors(std::move(errors)) + , ResolveDecommitContext(std::move(resolveDecommitContext)) , Result(*Request) {} TTxResolve(TTxResolve& predecessor) : TTransactionBase(predecessor.Self) , Request(std::move(predecessor.Request)) - , DecommitBlobs(std::move(predecessor.DecommitBlobs)) - , Errors(std::move(predecessor.Errors)) + , ResolveDecommitContext(std::move(predecessor.ResolveDecommitContext)) , KeysLoaded(predecessor.KeysLoaded) , ItemIndex(predecessor.ItemIndex) , LastScannedKey(std::move(predecessor.LastScannedKey)) @@ -161,7 +157,7 @@ namespace NKikimr::NBlobDepot { bool Execute(TTransactionContext& txc, const TActorContext&) override { STLOG(PRI_DEBUG, BLOB_DEPOT, BDT22, "TTxResolve::Execute", (Id, Self->GetLogId()), (Sender, Request->Sender), (Cookie, Request->Cookie), (ItemIndex, ItemIndex), - (LastScannedKey, LastScannedKey), (DecommitBlobs.size, DecommitBlobs.size())); + (LastScannedKey, LastScannedKey), (DecommitBlobs.size, ResolveDecommitContext.DecommitBlobs.size())); bool progress = false; if (!KeysLoaded && !LoadKeys(txc, progress)) { @@ -172,16 +168,33 @@ namespace NKikimr::NBlobDepot { KeysLoaded = true; } - for (ui32 numItemsRemain = 10'000; !DecommitBlobs.empty(); DecommitBlobs.pop_front()) { - if (numItemsRemain) { - const auto& blob = DecommitBlobs.front(); - if (Self->Data->LastAssimilatedBlobId < blob.Id) { - numItemsRemain -= Self->Data->AddDataOnDecommit(DecommitBlobs.front(), txc, this); - } - } else { + ui32 numItemsRemain = 10'000; + + while (!ResolveDecommitContext.DecommitBlobs.empty()) { + if (!numItemsRemain) { + SuccessorTx = true; + return true; + } + + const auto& blob = ResolveDecommitContext.DecommitBlobs.front(); + if (Self->Data->LastAssimilatedBlobId < blob.Id) { + numItemsRemain -= Self->Data->AddDataOnDecommit(blob, txc, this, true); + } + ResolveDecommitContext.DecommitBlobs.pop_front(); + } + + while (!ResolveDecommitContext.DropNodataBlobs.empty()) { + if (!numItemsRemain) { SuccessorTx = true; return true; } + + const TKey& key = ResolveDecommitContext.DropNodataBlobs.front(); + const TValue *value = Self->Data->FindKey(key); + if (value && value->GoingToAssimilate) { + Self->Data->DeleteKey(key, txc, this); + } + ResolveDecommitContext.DropNodataBlobs.pop_front(); } const auto& record = Request->Get()->Record; @@ -253,6 +266,8 @@ namespace NKikimr::NBlobDepot { } if (end && Self->Data->LastLoadedKey && *end <= *Self->Data->LastLoadedKey) { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT76, "TTxResolve: skipping subrange", (Id, Self->GetLogId()), + (Sender, Request->Sender), (Cookie, Request->Cookie), (ItemIndex, ItemIndex)); continue; // key is already loaded } @@ -262,9 +277,13 @@ namespace NKikimr::NBlobDepot { // special case -- forward scan and we have some data in memory auto callback = [&](const TKey& key, const TValue& /*value*/) { LastScannedKey = key; - return !DecommitBlobs.empty() || ++NumKeysRead != maxKeys; + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT83, "TTxResolve: skipping key in memory", (Id, Self->GetLogId()), + (Sender, Request->Sender), (Cookie, Request->Cookie), (ItemIndex, ItemIndex), (Key, key)); + return !ResolveDecommitContext.DecommitBlobs.empty() || ++NumKeysRead != maxKeys; }; if (!Self->Data->ScanRange(begin, Self->Data->LastLoadedKey, flags | EScanFlags::INCLUDE_END, callback)) { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT84, "TTxResolve: skipping remaining subrange", (Id, Self->GetLogId()), + (Sender, Request->Sender), (Cookie, Request->Cookie), (ItemIndex, ItemIndex)); continue; // we have read all the keys required (MaxKeys limit hit) } @@ -286,7 +305,13 @@ namespace NKikimr::NBlobDepot { LastScannedKey = key; progress = true; - if (!Self->Data->IsKeyLoaded(key)) { + const bool isKeyLoaded = Self->Data->IsKeyLoaded(key); + + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT85, "TTxResolve: loading key from database", (Id, Self->GetLogId()), + (Sender, Request->Sender), (Cookie, Request->Cookie), (ItemIndex, ItemIndex), (Key, key), + (IsKeyLoaded, isKeyLoaded)); + + if (!isKeyLoaded) { Self->Data->AddDataOnLoad(key, rowset.template GetValue<Schema::Data::Value>(), rowset.template GetValueOrDefault<Schema::Data::UncertainWrite>(), true); } @@ -294,9 +319,7 @@ namespace NKikimr::NBlobDepot { const bool matchBegin = !begin || (flags & EScanFlags::INCLUDE_BEGIN ? *begin <= key : *begin < key); const bool matchEnd = !end || (flags & EScanFlags::INCLUDE_END ? key <= *end : key < *end); if (matchBegin && matchEnd) { - const TValue *value = Self->Data->FindKey(key); - Y_VERIFY(value); // value must exist as it was just loaded into memory and exists in the database - if (DecommitBlobs.empty() && ++NumKeysRead == maxKeys) { + if (ResolveDecommitContext.DecommitBlobs.empty() && ++NumKeysRead == maxKeys) { // we have hit the MaxItems limit, exit return true; } @@ -369,20 +392,10 @@ namespace NKikimr::NBlobDepot { item.SetMeta(value.Meta.data(), value.Meta.size()); } - bool foundError = false; - - if (!Errors.empty()) { - const TLogoBlobID id = key.GetBlobId(); - for (const auto& [from, to] : Errors) { - if (from <= id && id <= to) { - foundError = true; - break; - } - } - } - - if (foundError) { + const auto& errors = ResolveDecommitContext.ResolutionErrors; + if (std::binary_search(errors.begin(), errors.end(), key)) { item.SetErrorReason("item resolution error"); + item.ClearValueChain(); } else if (!item.ValueChainSize()) { STLOG(PRI_WARN, BLOB_DEPOT, BDT48, "empty ValueChain on Resolve", (Id, Self->GetLogId()), (Key, key), (Value, value), (Item, item), (Sender, Request->Sender), (Cookie, Request->Cookie)); @@ -400,76 +413,97 @@ namespace NKikimr::NBlobDepot { (Sender, ev->Sender), (Cookie, ev->Cookie), (LastAssimilatedBlobId, LastAssimilatedBlobId)); if (Self->Config.GetIsDecommittingGroup() && Self->DecommitState <= EDecommitState::BlobsFinished) { - std::vector<std::tuple<ui64, bool, TLogoBlobID, TLogoBlobID>> queries; + std::vector<std::unique_ptr<IEventBase>> outbox; + std::unordered_map<std::tuple<ui64, bool>, std::vector<TLogoBlobID>> getBlobIds; + const ui64 id = LastResolveQueryId + 1; for (const auto& item : ev->Get()->Record.GetItems()) { - if (!item.HasTabletId()) { - STLOG(PRI_CRIT, BLOB_DEPOT, BDT42, "incorrect request", (Id, Self->GetLogId()), (Item, item)); - auto [response, record] = TEvBlobDepot::MakeResponseFor(*ev, NKikimrProto::ERROR, - "incorrect request"); - TActivationContext::Send(response.release()); - return; - } - - const ui64 tabletId = item.GetTabletId(); - if (LastAssimilatedBlobId && tabletId < LastAssimilatedBlobId->TabletID()) { - continue; // fast path - } - - TLogoBlobID minId(tabletId, 0, 0, 0, 0, 0); - TLogoBlobID maxId(tabletId, Max<ui32>(), Max<ui32>(), TLogoBlobID::MaxChannel, TLogoBlobID::MaxBlobSize, - TLogoBlobID::MaxCookie, TLogoBlobID::MaxPartId, TLogoBlobID::MaxCrcMode); - switch (item.GetKeyDesignatorCase()) { case NKikimrBlobDepot::TEvResolve::TItem::kKeyRange: { - const auto& range = item.GetKeyRange(); - if (range.HasBeginningKey()) { - minId = TKey::FromBinaryKey(range.GetBeginningKey(), Self->Config).GetBlobId(); + if (!item.HasTabletId()) { + STLOG(PRI_CRIT, BLOB_DEPOT, BDT42, "incorrect request", (Id, Self->GetLogId()), (Item, item)); + auto [response, record] = TEvBlobDepot::MakeResponseFor(*ev, NKikimrProto::ERROR, + "incorrect request"); + TActivationContext::Send(response.release()); + return; + } + + const ui64 tabletId = item.GetTabletId(); + if (LastAssimilatedBlobId && tabletId < LastAssimilatedBlobId->TabletID()) { + continue; // fast path } - if (range.HasEndingKey()) { - maxId = TKey::FromBinaryKey(range.GetEndingKey(), Self->Config).GetBlobId(); + + const auto& range = item.GetKeyRange(); + + TLogoBlobID minId = range.HasBeginningKey() + ? TKey::FromBinaryKey(range.GetBeginningKey(), Self->Config).GetBlobId() + : TLogoBlobID(tabletId, 0, 0, 0, 0, 0); + + TLogoBlobID maxId = range.HasEndingKey() + ? TKey::FromBinaryKey(range.GetEndingKey(), Self->Config).GetBlobId() + : TLogoBlobID(tabletId, Max<ui32>(), Max<ui32>(), TLogoBlobID::MaxChannel, + TLogoBlobID::MaxBlobSize, TLogoBlobID::MaxCookie, TLogoBlobID::MaxPartId, + TLogoBlobID::MaxCrcMode); + + Y_VERIFY(minId <= maxId); + + if (LastAssimilatedBlobId && maxId <= *LastAssimilatedBlobId) { + continue; // we have this range fully assimilated, just skip } + + // adjust minId to skip already assimilated items + minId = Max(minId, LastAssimilatedBlobId.value_or(minId)); + + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT46, "going to TEvRange", (Id, Self->GetLogId()), (TabletId, tabletId), + (MinId, minId), (MaxId, maxId), (MustRestoreFirst, item.GetMustRestoreFirst()), (Cookie, id)); + + auto ev = std::make_unique<TEvBlobStorage::TEvRange>(tabletId, minId, maxId, + item.GetMustRestoreFirst(), TInstant::Max(), true); + ev->Decommission = true; + outbox.push_back(std::move(ev)); + break; } - case NKikimrBlobDepot::TEvResolve::TItem::kExactKey: - minId = maxId = TKey::FromBinaryKey(item.GetExactKey(), Self->Config).GetBlobId(); + case NKikimrBlobDepot::TEvResolve::TItem::kExactKey: { + const TLogoBlobID blobId = TKey::FromBinaryKey(item.GetExactKey(), Self->Config).GetBlobId(); + const auto key = std::make_tuple(blobId.TabletID(), item.GetMustRestoreFirst()); + getBlobIds[key].push_back(blobId); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT86, "going to TEvGet", (Id, Self->GetLogId()), + (BlobId, blobId), (MustRestoreFirst, item.GetMustRestoreFirst()), (Cookie, id)); break; + } case NKikimrBlobDepot::TEvResolve::TItem::KEYDESIGNATOR_NOT_SET: Y_VERIFY_DEBUG(false); break; } - Y_VERIFY_DEBUG(minId.TabletID() == tabletId); - Y_VERIFY_DEBUG(maxId.TabletID() == tabletId); + } - if (!LastAssimilatedBlobId || *LastAssimilatedBlobId < maxId) { - if (LastAssimilatedBlobId && minId < *LastAssimilatedBlobId) { - minId = *LastAssimilatedBlobId; - } - if (minId == maxId) { - const auto it = Data.find(TKey(minId)); - if (it != Data.end() && !it->second.GoingToAssimilate) { - continue; // fast path for extreme queries - } - } - queries.emplace_back(tabletId, item.GetMustRestoreFirst(), minId, maxId); + // convert all gets to events in outbox + for (auto& [key, blobIds] : getBlobIds) { + const auto& [tabletId, mustRestoreFirst] = key; + const ui32 sz = blobIds.size(); + TArrayHolder<TEvBlobStorage::TEvGet::TQuery> q(new TEvBlobStorage::TEvGet::TQuery[sz]); + for (ui32 i = 0; i < sz; ++i) { + q[i].Set(blobIds[i]); } + auto ev = std::make_unique<TEvBlobStorage::TEvGet>(q, sz, TInstant::Max(), + NKikimrBlobStorage::EGetHandleClass::FastRead, mustRestoreFirst, true); + ev->Decommission = true; + outbox.emplace_back(std::move(ev)); } - if (!queries.empty()) { - const ui64 id = ++LastRangeId; - for (const auto& [tabletId, mustRestoreFirst, minId, maxId] : queries) { - auto ev = std::make_unique<TEvBlobStorage::TEvRange>(tabletId, minId, maxId, mustRestoreFirst, - TInstant::Max(), true); - ev->Decommission = true; + if (!outbox.empty()) { + ResolveDecommitContexts[id] = {ev, (ui32)outbox.size()}; + ++LastResolveQueryId; + Y_VERIFY(LastResolveQueryId == id); - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT46, "going to TEvRange", (Id, Self->GetLogId()), (TabletId, tabletId), - (MinId, minId), (MaxId, maxId), (MustRestoreFirst, mustRestoreFirst), (Cookie, id)); + for (auto& ev : outbox) { SendToBSProxy(Self->SelfId(), Self->Config.GetVirtualGroupId(), ev.release(), id); } - ResolveDecommitContexts[id] = {ev, (ui32)queries.size()}; + return; } } @@ -477,30 +511,70 @@ namespace NKikimr::NBlobDepot { Self->Execute(std::make_unique<TTxResolve>(Self, ev)); } + template<typename TCallback> + void TData::HandleResolutionResult(ui64 id, TCallback&& callback) { + auto& contexts = Self->Data->ResolveDecommitContexts; + if (const auto it = contexts.find(id); it != contexts.end()) { + auto& context = it->second; + if (!callback(context)) { + STLOG(PRI_INFO, BLOB_DEPOT, BDT87, "failing TEvResolve", (Id, Self->GetLogId()), (Sender, context.Ev->Sender), + (Cookie, context.Ev->Cookie)); + auto [response, record] = TEvBlobDepot::MakeResponseFor(*context.Ev, NKikimrProto::ERROR, + "unassimilated key resolution error"); + TActivationContext::Send(response.release()); + } else if (!--context.NumRangesInFlight) { + auto ev = context.Ev; + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT88, "actually starting TEvResolve", (Id, Self->GetLogId()), (Sender, ev->Sender), + (Cookie, ev->Cookie)); + std::sort(context.ResolutionErrors.begin(), context.ResolutionErrors.end()); + Self->Execute(std::make_unique<TTxResolve>(Self, ev, std::move(context))); + } else { + return; // keep context running + } + contexts.erase(it); + } + } + void TData::Handle(TEvBlobStorage::TEvRangeResult::TPtr ev) { auto& msg = *ev->Get(); STLOG(PRI_DEBUG, BLOB_DEPOT, BDT50, "TEvRangeResult", (Id, Self->GetLogId()), (Msg, msg), (Cookie, ev->Cookie)); - auto& contexts = Self->Data->ResolveDecommitContexts; - if (const auto it = contexts.find(ev->Cookie); it != contexts.end()) { - auto& context = it->second; - + HandleResolutionResult(ev->Cookie, [&](auto& context) { if (msg.Status == NKikimrProto::OK) { - for (const auto& response : msg.Responses) { - if (LastAssimilatedBlobId < response.Id) { - context.DecommitBlobs.push_back({response.Id, response.Keep, response.DoNotKeep}); - } + for (const auto& r : msg.Responses) { + context.DecommitBlobs.push_back({r.Id, r.Keep, r.DoNotKeep}); } + return true; } else { - context.Errors.emplace_back(msg.From, msg.To); + // we can't be sure that there are no still unassimilated keys in this range, so we report ERROR to + // the whole range query; TODO(alexvru): provide a way to mark only specific request item as faulty one + return false; } + }); + } - if (!--context.NumRangesInFlight) { - Self->Execute(std::make_unique<TTxResolve>(Self, context.Ev, std::move(context.DecommitBlobs), - std::move(context.Errors))); - contexts.erase(it); - } + void TData::Handle(TEvBlobStorage::TEvGetResult::TPtr ev) { + if (!ev->Cookie) { + return UncertaintyResolver->Handle(ev); } + + auto& msg = *ev->Get(); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT55, "TEvGetResult", (Id, Self->GetLogId()), (Msg, msg), (Cookie, ev->Cookie)); + + HandleResolutionResult(ev->Cookie, [&](auto& context) { + for (ui32 i = 0; i < msg.ResponseSz; ++i) { + const auto& r = msg.Responses[i]; + if (r.Status == NKikimrProto::OK) { + context.DecommitBlobs.push_back({r.Id, r.Keep, r.DoNotKeep}); + } else if (r.Status == NKikimrProto::NODATA) { + context.DropNodataBlobs.push_back(TKey(r.Id)); + } else { + // mark this specific key as unresolvable + context.ResolutionErrors.push_back(TKey(r.Id)); + } + } + return true; + }); } } // NKikimr::NBlobDepot diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp index 11f86adae9..fbcf731216 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp @@ -26,8 +26,6 @@ class TBlobStorageGroupRangeRequest : public TBlobStorageGroupRequestActor<TBlob const bool Decommission; TInstant StartTime; - TAutoPtr<TEvBlobStorage::TEvRangeResult> Reply; - TMap<TLogoBlobID, TBlobStatusTracker> BlobStatus; TBlobStorageGroupInfo::TGroupVDisks FailedDisks; @@ -285,7 +283,7 @@ class TBlobStorageGroupRangeRequest : public TBlobStorageGroupRequestActor<TBlob Y_VERIFY(response.Id == BlobsToGet[i].BlobId); if (getResult.Responses[i].Status == NKikimrProto::OK) { - result->Responses.emplace_back(response.Id, response.Buffer); + result->Responses.emplace_back(response.Id, std::move(response.Buffer), response.Keep, response.DoNotKeep); } else if (getResult.Responses[i].Status != NKikimrProto::NODATA || BlobsToGet[i].RequiredToBePresent) { // it's okay to get NODATA if blob wasn't confirmed -- this blob is simply thrown out of resulting // set; otherwise we return error about lost data |