aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-01-20 14:19:42 +0300
committeralexvru <alexvru@ydb.tech>2023-01-20 14:19:42 +0300
commit429aca7da4af518479dfeffceafd89221bda56d7 (patch)
treee470a132e4fc542b19cdc2f34c36afd809a7731c
parentb6cea35840afb1e5f170db886c0706d27b79bae2 (diff)
downloadydb-429aca7da4af518479dfeffceafd89221bda56d7.tar.gz
Fix BlobDepot decommission mode bugs
-rw-r--r--ydb/core/base/blobstorage.h4
-rw-r--r--ydb/core/blob_depot/blob_depot.cpp2
-rw-r--r--ydb/core/blob_depot/data.cpp5
-rw-r--r--ydb/core/blob_depot/data.h11
-rw-r--r--ydb/core/blob_depot/data_resolve.cpp264
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_range.cpp4
6 files changed, 185 insertions, 105 deletions
diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h
index 555baa1213..bbbe141609 100644
--- a/ydb/core/base/blobstorage.h
+++ b/ydb/core/base/blobstorage.h
@@ -1788,9 +1788,11 @@ struct TEvBlobStorage {
TResponse()
{}
- TResponse(const TLogoBlobID &id, const TString &x)
+ TResponse(const TLogoBlobID &id, const TString &x, bool keep = false, bool doNotKeep = false)
: Id(id)
, Buffer(x)
+ , Keep(keep)
+ , DoNotKeep(doNotKeep)
{}
};
diff --git a/ydb/core/blob_depot/blob_depot.cpp b/ydb/core/blob_depot/blob_depot.cpp
index 8c5e1627a2..86661f9590 100644
--- a/ydb/core/blob_depot/blob_depot.cpp
+++ b/ydb/core/blob_depot/blob_depot.cpp
@@ -88,7 +88,7 @@ namespace NKikimr::NBlobDepot {
hFunc(TEvBlobStorage::TEvCollectGarbageResult, Data->Handle);
hFunc(TEvBlobStorage::TEvRangeResult, Data->Handle);
- hFunc(TEvBlobStorage::TEvGetResult, Data->UncertaintyResolver->Handle);
+ hFunc(TEvBlobStorage::TEvGetResult, Data->Handle);
hFunc(TEvBlobStorage::TEvStatusResult, SpaceMonitor->Handle);
cFunc(TEvPrivate::EvKickSpaceMonitor, KickSpaceMonitor);
diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp
index dcb728a77a..fd1c25f69f 100644
--- a/ydb/core/blob_depot/data.cpp
+++ b/ydb/core/blob_depot/data.cpp
@@ -352,8 +352,9 @@ namespace NKikimr::NBlobDepot {
}
bool TData::AddDataOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBlob& blob,
- NTabletFlatExecutor::TTransactionContext& txc, void *cookie) {
- Y_VERIFY_S(Loaded || IsKeyLoaded(TKey(blob.Id)), "Id# " << Self->GetLogId() << " Blob# " << blob.ToString());
+ NTabletFlatExecutor::TTransactionContext& txc, void *cookie, bool suppressLoadedCheck) {
+ Y_VERIFY_S(suppressLoadedCheck || Loaded || IsKeyLoaded(TKey(blob.Id)), "Id# " << Self->GetLogId()
+ << " Blob# " << blob.ToString());
return UpdateKey(TKey(blob.Id), txc, cookie, "AddDataOnDecommit", [&](TValue& value, bool inserted) {
bool change = inserted;
diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h
index 536aafd09b..33a883c055 100644
--- a/ydb/core/blob_depot/data.h
+++ b/ydb/core/blob_depot/data.h
@@ -363,9 +363,10 @@ namespace NKikimr::NBlobDepot {
TEvBlobDepot::TEvResolve::TPtr Ev; // original resolve request
ui32 NumRangesInFlight;
std::deque<TEvBlobStorage::TEvAssimilateResult::TBlob> DecommitBlobs = {};
- std::vector<std::tuple<TLogoBlobID, TLogoBlobID>> Errors = {};
+ std::vector<TKey> ResolutionErrors = {};
+ std::deque<TKey> DropNodataBlobs = {};
};
- ui64 LastRangeId = 0;
+ ui64 LastResolveQueryId = 0;
THashMap<ui64, TResolveDecommitContext> ResolveDecommitContexts;
class TTxIssueGC;
@@ -456,7 +457,7 @@ namespace NKikimr::NBlobDepot {
void AddLoadSkip(TKey key);
void AddDataOnLoad(TKey key, TString value, bool uncertainWrite, bool skip);
bool AddDataOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBlob& blob,
- NTabletFlatExecutor::TTransactionContext& txc, void *cookie);
+ NTabletFlatExecutor::TTransactionContext& txc, void *cookie, bool suppressLoadedCheck = false);
void AddTrashOnLoad(TLogoBlobID id);
void AddGenStepOnLoad(ui8 channel, ui32 groupId, TGenStep issuedGenStep, TGenStep confirmedGenStep);
@@ -501,6 +502,10 @@ namespace NKikimr::NBlobDepot {
void Handle(TEvBlobDepot::TEvResolve::TPtr ev);
void Handle(TEvBlobStorage::TEvRangeResult::TPtr ev);
+ void Handle(TEvBlobStorage::TEvGetResult::TPtr ev);
+
+ template<typename TCallback>
+ void HandleResolutionResult(ui64 id, TCallback&& callback);
ui64 GetTotalStoredDataSize() const {
return TotalStoredDataSize;
diff --git a/ydb/core/blob_depot/data_resolve.cpp b/ydb/core/blob_depot/data_resolve.cpp
index ffb2e460f8..1f25d09a43 100644
--- a/ydb/core/blob_depot/data_resolve.cpp
+++ b/ydb/core/blob_depot/data_resolve.cpp
@@ -83,8 +83,7 @@ namespace NKikimr::NBlobDepot {
class TData::TTxResolve : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
std::unique_ptr<TEvBlobDepot::TEvResolve::THandle> Request;
- std::deque<TEvBlobStorage::TEvAssimilateResult::TBlob> DecommitBlobs;
- std::vector<std::tuple<TLogoBlobID, TLogoBlobID>> Errors;
+ TResolveDecommitContext ResolveDecommitContext;
bool KeysLoaded = false;
int ItemIndex = 0;
@@ -100,20 +99,17 @@ namespace NKikimr::NBlobDepot {
TTxType GetTxType() const override { return NKikimrBlobDepot::TXTYPE_RESOLVE; }
TTxResolve(TBlobDepot *self, TEvBlobDepot::TEvResolve::TPtr request,
- std::deque<TEvBlobStorage::TEvAssimilateResult::TBlob>&& decommitBlobs = {},
- std::vector<std::tuple<TLogoBlobID, TLogoBlobID>>&& errors = {})
+ TResolveDecommitContext&& resolveDecommitContext = {})
: TTransactionBase(self)
, Request(request.Release())
- , DecommitBlobs(std::move(decommitBlobs))
- , Errors(std::move(errors))
+ , ResolveDecommitContext(std::move(resolveDecommitContext))
, Result(*Request)
{}
TTxResolve(TTxResolve& predecessor)
: TTransactionBase(predecessor.Self)
, Request(std::move(predecessor.Request))
- , DecommitBlobs(std::move(predecessor.DecommitBlobs))
- , Errors(std::move(predecessor.Errors))
+ , ResolveDecommitContext(std::move(predecessor.ResolveDecommitContext))
, KeysLoaded(predecessor.KeysLoaded)
, ItemIndex(predecessor.ItemIndex)
, LastScannedKey(std::move(predecessor.LastScannedKey))
@@ -161,7 +157,7 @@ namespace NKikimr::NBlobDepot {
bool Execute(TTransactionContext& txc, const TActorContext&) override {
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT22, "TTxResolve::Execute", (Id, Self->GetLogId()),
(Sender, Request->Sender), (Cookie, Request->Cookie), (ItemIndex, ItemIndex),
- (LastScannedKey, LastScannedKey), (DecommitBlobs.size, DecommitBlobs.size()));
+ (LastScannedKey, LastScannedKey), (DecommitBlobs.size, ResolveDecommitContext.DecommitBlobs.size()));
bool progress = false;
if (!KeysLoaded && !LoadKeys(txc, progress)) {
@@ -172,16 +168,33 @@ namespace NKikimr::NBlobDepot {
KeysLoaded = true;
}
- for (ui32 numItemsRemain = 10'000; !DecommitBlobs.empty(); DecommitBlobs.pop_front()) {
- if (numItemsRemain) {
- const auto& blob = DecommitBlobs.front();
- if (Self->Data->LastAssimilatedBlobId < blob.Id) {
- numItemsRemain -= Self->Data->AddDataOnDecommit(DecommitBlobs.front(), txc, this);
- }
- } else {
+ ui32 numItemsRemain = 10'000;
+
+ while (!ResolveDecommitContext.DecommitBlobs.empty()) {
+ if (!numItemsRemain) {
+ SuccessorTx = true;
+ return true;
+ }
+
+ const auto& blob = ResolveDecommitContext.DecommitBlobs.front();
+ if (Self->Data->LastAssimilatedBlobId < blob.Id) {
+ numItemsRemain -= Self->Data->AddDataOnDecommit(blob, txc, this, true);
+ }
+ ResolveDecommitContext.DecommitBlobs.pop_front();
+ }
+
+ while (!ResolveDecommitContext.DropNodataBlobs.empty()) {
+ if (!numItemsRemain) {
SuccessorTx = true;
return true;
}
+
+ const TKey& key = ResolveDecommitContext.DropNodataBlobs.front();
+ const TValue *value = Self->Data->FindKey(key);
+ if (value && value->GoingToAssimilate) {
+ Self->Data->DeleteKey(key, txc, this);
+ }
+ ResolveDecommitContext.DropNodataBlobs.pop_front();
}
const auto& record = Request->Get()->Record;
@@ -253,6 +266,8 @@ namespace NKikimr::NBlobDepot {
}
if (end && Self->Data->LastLoadedKey && *end <= *Self->Data->LastLoadedKey) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT76, "TTxResolve: skipping subrange", (Id, Self->GetLogId()),
+ (Sender, Request->Sender), (Cookie, Request->Cookie), (ItemIndex, ItemIndex));
continue; // key is already loaded
}
@@ -262,9 +277,13 @@ namespace NKikimr::NBlobDepot {
// special case -- forward scan and we have some data in memory
auto callback = [&](const TKey& key, const TValue& /*value*/) {
LastScannedKey = key;
- return !DecommitBlobs.empty() || ++NumKeysRead != maxKeys;
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT83, "TTxResolve: skipping key in memory", (Id, Self->GetLogId()),
+ (Sender, Request->Sender), (Cookie, Request->Cookie), (ItemIndex, ItemIndex), (Key, key));
+ return !ResolveDecommitContext.DecommitBlobs.empty() || ++NumKeysRead != maxKeys;
};
if (!Self->Data->ScanRange(begin, Self->Data->LastLoadedKey, flags | EScanFlags::INCLUDE_END, callback)) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT84, "TTxResolve: skipping remaining subrange", (Id, Self->GetLogId()),
+ (Sender, Request->Sender), (Cookie, Request->Cookie), (ItemIndex, ItemIndex));
continue; // we have read all the keys required (MaxKeys limit hit)
}
@@ -286,7 +305,13 @@ namespace NKikimr::NBlobDepot {
LastScannedKey = key;
progress = true;
- if (!Self->Data->IsKeyLoaded(key)) {
+ const bool isKeyLoaded = Self->Data->IsKeyLoaded(key);
+
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT85, "TTxResolve: loading key from database", (Id, Self->GetLogId()),
+ (Sender, Request->Sender), (Cookie, Request->Cookie), (ItemIndex, ItemIndex), (Key, key),
+ (IsKeyLoaded, isKeyLoaded));
+
+ if (!isKeyLoaded) {
Self->Data->AddDataOnLoad(key, rowset.template GetValue<Schema::Data::Value>(),
rowset.template GetValueOrDefault<Schema::Data::UncertainWrite>(), true);
}
@@ -294,9 +319,7 @@ namespace NKikimr::NBlobDepot {
const bool matchBegin = !begin || (flags & EScanFlags::INCLUDE_BEGIN ? *begin <= key : *begin < key);
const bool matchEnd = !end || (flags & EScanFlags::INCLUDE_END ? key <= *end : key < *end);
if (matchBegin && matchEnd) {
- const TValue *value = Self->Data->FindKey(key);
- Y_VERIFY(value); // value must exist as it was just loaded into memory and exists in the database
- if (DecommitBlobs.empty() && ++NumKeysRead == maxKeys) {
+ if (ResolveDecommitContext.DecommitBlobs.empty() && ++NumKeysRead == maxKeys) {
// we have hit the MaxItems limit, exit
return true;
}
@@ -369,20 +392,10 @@ namespace NKikimr::NBlobDepot {
item.SetMeta(value.Meta.data(), value.Meta.size());
}
- bool foundError = false;
-
- if (!Errors.empty()) {
- const TLogoBlobID id = key.GetBlobId();
- for (const auto& [from, to] : Errors) {
- if (from <= id && id <= to) {
- foundError = true;
- break;
- }
- }
- }
-
- if (foundError) {
+ const auto& errors = ResolveDecommitContext.ResolutionErrors;
+ if (std::binary_search(errors.begin(), errors.end(), key)) {
item.SetErrorReason("item resolution error");
+ item.ClearValueChain();
} else if (!item.ValueChainSize()) {
STLOG(PRI_WARN, BLOB_DEPOT, BDT48, "empty ValueChain on Resolve", (Id, Self->GetLogId()),
(Key, key), (Value, value), (Item, item), (Sender, Request->Sender), (Cookie, Request->Cookie));
@@ -400,76 +413,97 @@ namespace NKikimr::NBlobDepot {
(Sender, ev->Sender), (Cookie, ev->Cookie), (LastAssimilatedBlobId, LastAssimilatedBlobId));
if (Self->Config.GetIsDecommittingGroup() && Self->DecommitState <= EDecommitState::BlobsFinished) {
- std::vector<std::tuple<ui64, bool, TLogoBlobID, TLogoBlobID>> queries;
+ std::vector<std::unique_ptr<IEventBase>> outbox;
+ std::unordered_map<std::tuple<ui64, bool>, std::vector<TLogoBlobID>> getBlobIds;
+ const ui64 id = LastResolveQueryId + 1;
for (const auto& item : ev->Get()->Record.GetItems()) {
- if (!item.HasTabletId()) {
- STLOG(PRI_CRIT, BLOB_DEPOT, BDT42, "incorrect request", (Id, Self->GetLogId()), (Item, item));
- auto [response, record] = TEvBlobDepot::MakeResponseFor(*ev, NKikimrProto::ERROR,
- "incorrect request");
- TActivationContext::Send(response.release());
- return;
- }
-
- const ui64 tabletId = item.GetTabletId();
- if (LastAssimilatedBlobId && tabletId < LastAssimilatedBlobId->TabletID()) {
- continue; // fast path
- }
-
- TLogoBlobID minId(tabletId, 0, 0, 0, 0, 0);
- TLogoBlobID maxId(tabletId, Max<ui32>(), Max<ui32>(), TLogoBlobID::MaxChannel, TLogoBlobID::MaxBlobSize,
- TLogoBlobID::MaxCookie, TLogoBlobID::MaxPartId, TLogoBlobID::MaxCrcMode);
-
switch (item.GetKeyDesignatorCase()) {
case NKikimrBlobDepot::TEvResolve::TItem::kKeyRange: {
- const auto& range = item.GetKeyRange();
- if (range.HasBeginningKey()) {
- minId = TKey::FromBinaryKey(range.GetBeginningKey(), Self->Config).GetBlobId();
+ if (!item.HasTabletId()) {
+ STLOG(PRI_CRIT, BLOB_DEPOT, BDT42, "incorrect request", (Id, Self->GetLogId()), (Item, item));
+ auto [response, record] = TEvBlobDepot::MakeResponseFor(*ev, NKikimrProto::ERROR,
+ "incorrect request");
+ TActivationContext::Send(response.release());
+ return;
+ }
+
+ const ui64 tabletId = item.GetTabletId();
+ if (LastAssimilatedBlobId && tabletId < LastAssimilatedBlobId->TabletID()) {
+ continue; // fast path
}
- if (range.HasEndingKey()) {
- maxId = TKey::FromBinaryKey(range.GetEndingKey(), Self->Config).GetBlobId();
+
+ const auto& range = item.GetKeyRange();
+
+ TLogoBlobID minId = range.HasBeginningKey()
+ ? TKey::FromBinaryKey(range.GetBeginningKey(), Self->Config).GetBlobId()
+ : TLogoBlobID(tabletId, 0, 0, 0, 0, 0);
+
+ TLogoBlobID maxId = range.HasEndingKey()
+ ? TKey::FromBinaryKey(range.GetEndingKey(), Self->Config).GetBlobId()
+ : TLogoBlobID(tabletId, Max<ui32>(), Max<ui32>(), TLogoBlobID::MaxChannel,
+ TLogoBlobID::MaxBlobSize, TLogoBlobID::MaxCookie, TLogoBlobID::MaxPartId,
+ TLogoBlobID::MaxCrcMode);
+
+ Y_VERIFY(minId <= maxId);
+
+ if (LastAssimilatedBlobId && maxId <= *LastAssimilatedBlobId) {
+ continue; // we have this range fully assimilated, just skip
}
+
+ // adjust minId to skip already assimilated items
+ minId = Max(minId, LastAssimilatedBlobId.value_or(minId));
+
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT46, "going to TEvRange", (Id, Self->GetLogId()), (TabletId, tabletId),
+ (MinId, minId), (MaxId, maxId), (MustRestoreFirst, item.GetMustRestoreFirst()), (Cookie, id));
+
+ auto ev = std::make_unique<TEvBlobStorage::TEvRange>(tabletId, minId, maxId,
+ item.GetMustRestoreFirst(), TInstant::Max(), true);
+ ev->Decommission = true;
+ outbox.push_back(std::move(ev));
+
break;
}
- case NKikimrBlobDepot::TEvResolve::TItem::kExactKey:
- minId = maxId = TKey::FromBinaryKey(item.GetExactKey(), Self->Config).GetBlobId();
+ case NKikimrBlobDepot::TEvResolve::TItem::kExactKey: {
+ const TLogoBlobID blobId = TKey::FromBinaryKey(item.GetExactKey(), Self->Config).GetBlobId();
+ const auto key = std::make_tuple(blobId.TabletID(), item.GetMustRestoreFirst());
+ getBlobIds[key].push_back(blobId);
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT86, "going to TEvGet", (Id, Self->GetLogId()),
+ (BlobId, blobId), (MustRestoreFirst, item.GetMustRestoreFirst()), (Cookie, id));
break;
+ }
case NKikimrBlobDepot::TEvResolve::TItem::KEYDESIGNATOR_NOT_SET:
Y_VERIFY_DEBUG(false);
break;
}
- Y_VERIFY_DEBUG(minId.TabletID() == tabletId);
- Y_VERIFY_DEBUG(maxId.TabletID() == tabletId);
+ }
- if (!LastAssimilatedBlobId || *LastAssimilatedBlobId < maxId) {
- if (LastAssimilatedBlobId && minId < *LastAssimilatedBlobId) {
- minId = *LastAssimilatedBlobId;
- }
- if (minId == maxId) {
- const auto it = Data.find(TKey(minId));
- if (it != Data.end() && !it->second.GoingToAssimilate) {
- continue; // fast path for extreme queries
- }
- }
- queries.emplace_back(tabletId, item.GetMustRestoreFirst(), minId, maxId);
+ // convert all gets to events in outbox
+ for (auto& [key, blobIds] : getBlobIds) {
+ const auto& [tabletId, mustRestoreFirst] = key;
+ const ui32 sz = blobIds.size();
+ TArrayHolder<TEvBlobStorage::TEvGet::TQuery> q(new TEvBlobStorage::TEvGet::TQuery[sz]);
+ for (ui32 i = 0; i < sz; ++i) {
+ q[i].Set(blobIds[i]);
}
+ auto ev = std::make_unique<TEvBlobStorage::TEvGet>(q, sz, TInstant::Max(),
+ NKikimrBlobStorage::EGetHandleClass::FastRead, mustRestoreFirst, true);
+ ev->Decommission = true;
+ outbox.emplace_back(std::move(ev));
}
- if (!queries.empty()) {
- const ui64 id = ++LastRangeId;
- for (const auto& [tabletId, mustRestoreFirst, minId, maxId] : queries) {
- auto ev = std::make_unique<TEvBlobStorage::TEvRange>(tabletId, minId, maxId, mustRestoreFirst,
- TInstant::Max(), true);
- ev->Decommission = true;
+ if (!outbox.empty()) {
+ ResolveDecommitContexts[id] = {ev, (ui32)outbox.size()};
+ ++LastResolveQueryId;
+ Y_VERIFY(LastResolveQueryId == id);
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT46, "going to TEvRange", (Id, Self->GetLogId()), (TabletId, tabletId),
- (MinId, minId), (MaxId, maxId), (MustRestoreFirst, mustRestoreFirst), (Cookie, id));
+ for (auto& ev : outbox) {
SendToBSProxy(Self->SelfId(), Self->Config.GetVirtualGroupId(), ev.release(), id);
}
- ResolveDecommitContexts[id] = {ev, (ui32)queries.size()};
+
return;
}
}
@@ -477,30 +511,70 @@ namespace NKikimr::NBlobDepot {
Self->Execute(std::make_unique<TTxResolve>(Self, ev));
}
+ template<typename TCallback>
+ void TData::HandleResolutionResult(ui64 id, TCallback&& callback) {
+ auto& contexts = Self->Data->ResolveDecommitContexts;
+ if (const auto it = contexts.find(id); it != contexts.end()) {
+ auto& context = it->second;
+ if (!callback(context)) {
+ STLOG(PRI_INFO, BLOB_DEPOT, BDT87, "failing TEvResolve", (Id, Self->GetLogId()), (Sender, context.Ev->Sender),
+ (Cookie, context.Ev->Cookie));
+ auto [response, record] = TEvBlobDepot::MakeResponseFor(*context.Ev, NKikimrProto::ERROR,
+ "unassimilated key resolution error");
+ TActivationContext::Send(response.release());
+ } else if (!--context.NumRangesInFlight) {
+ auto ev = context.Ev;
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT88, "actually starting TEvResolve", (Id, Self->GetLogId()), (Sender, ev->Sender),
+ (Cookie, ev->Cookie));
+ std::sort(context.ResolutionErrors.begin(), context.ResolutionErrors.end());
+ Self->Execute(std::make_unique<TTxResolve>(Self, ev, std::move(context)));
+ } else {
+ return; // keep context running
+ }
+ contexts.erase(it);
+ }
+ }
+
void TData::Handle(TEvBlobStorage::TEvRangeResult::TPtr ev) {
auto& msg = *ev->Get();
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT50, "TEvRangeResult", (Id, Self->GetLogId()), (Msg, msg), (Cookie, ev->Cookie));
- auto& contexts = Self->Data->ResolveDecommitContexts;
- if (const auto it = contexts.find(ev->Cookie); it != contexts.end()) {
- auto& context = it->second;
-
+ HandleResolutionResult(ev->Cookie, [&](auto& context) {
if (msg.Status == NKikimrProto::OK) {
- for (const auto& response : msg.Responses) {
- if (LastAssimilatedBlobId < response.Id) {
- context.DecommitBlobs.push_back({response.Id, response.Keep, response.DoNotKeep});
- }
+ for (const auto& r : msg.Responses) {
+ context.DecommitBlobs.push_back({r.Id, r.Keep, r.DoNotKeep});
}
+ return true;
} else {
- context.Errors.emplace_back(msg.From, msg.To);
+ // we can't be sure that there are no still unassimilated keys in this range, so we report ERROR to
+ // the whole range query; TODO(alexvru): provide a way to mark only specific request item as faulty one
+ return false;
}
+ });
+ }
- if (!--context.NumRangesInFlight) {
- Self->Execute(std::make_unique<TTxResolve>(Self, context.Ev, std::move(context.DecommitBlobs),
- std::move(context.Errors)));
- contexts.erase(it);
- }
+ void TData::Handle(TEvBlobStorage::TEvGetResult::TPtr ev) {
+ if (!ev->Cookie) {
+ return UncertaintyResolver->Handle(ev);
}
+
+ auto& msg = *ev->Get();
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT55, "TEvGetResult", (Id, Self->GetLogId()), (Msg, msg), (Cookie, ev->Cookie));
+
+ HandleResolutionResult(ev->Cookie, [&](auto& context) {
+ for (ui32 i = 0; i < msg.ResponseSz; ++i) {
+ const auto& r = msg.Responses[i];
+ if (r.Status == NKikimrProto::OK) {
+ context.DecommitBlobs.push_back({r.Id, r.Keep, r.DoNotKeep});
+ } else if (r.Status == NKikimrProto::NODATA) {
+ context.DropNodataBlobs.push_back(TKey(r.Id));
+ } else {
+ // mark this specific key as unresolvable
+ context.ResolutionErrors.push_back(TKey(r.Id));
+ }
+ }
+ return true;
+ });
}
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp
index 11f86adae9..fbcf731216 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp
@@ -26,8 +26,6 @@ class TBlobStorageGroupRangeRequest : public TBlobStorageGroupRequestActor<TBlob
const bool Decommission;
TInstant StartTime;
- TAutoPtr<TEvBlobStorage::TEvRangeResult> Reply;
-
TMap<TLogoBlobID, TBlobStatusTracker> BlobStatus;
TBlobStorageGroupInfo::TGroupVDisks FailedDisks;
@@ -285,7 +283,7 @@ class TBlobStorageGroupRangeRequest : public TBlobStorageGroupRequestActor<TBlob
Y_VERIFY(response.Id == BlobsToGet[i].BlobId);
if (getResult.Responses[i].Status == NKikimrProto::OK) {
- result->Responses.emplace_back(response.Id, response.Buffer);
+ result->Responses.emplace_back(response.Id, std::move(response.Buffer), response.Keep, response.DoNotKeep);
} else if (getResult.Responses[i].Status != NKikimrProto::NODATA || BlobsToGet[i].RequiredToBePresent) {
// it's okay to get NODATA if blob wasn't confirmed -- this blob is simply thrown out of resulting
// set; otherwise we return error about lost data