aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2022-09-19 10:44:42 +0300
committeralexvru <alexvru@ydb.tech>2022-09-19 10:44:42 +0300
commitb8ba0035bc558ad4ec20e5e24447a5f20a7ee221 (patch)
tree99b8065271debd7c13eb0f8e86bda05fb7b7ad82
parent1703ce1bceb11720104e0c3610edb0bc5100153c (diff)
downloadydb-b8ba0035bc558ad4ec20e5e24447a5f20a7ee221.tar.gz
Improve BlobDepot a bit
-rw-r--r--ydb/core/blob_depot/blocks.cpp2
-rw-r--r--ydb/core/blob_depot/data.cpp50
-rw-r--r--ydb/core/blob_depot/data_uncertain.cpp96
-rw-r--r--ydb/core/blob_depot/data_uncertain.h18
-rw-r--r--ydb/core/blob_depot/op_commit_blob_seq.cpp2
-rw-r--r--ydb/core/util/testactorsys.h15
6 files changed, 110 insertions, 73 deletions
diff --git a/ydb/core/blob_depot/blocks.cpp b/ydb/core/blob_depot/blocks.cpp
index 8c6778f9918..9505929d943 100644
--- a/ydb/core/blob_depot/blocks.cpp
+++ b/ydb/core/blob_depot/blocks.cpp
@@ -132,7 +132,7 @@ namespace NKikimr::NBlobDepot {
void IssueBlocksToStorage() {
THashSet<ui32> processedGroups;
- for (const auto& [_, kind] : Self->ChannelKinds) { // FIXME: SIGSEGV here?
+ for (const auto& [_, kind] : Self->ChannelKinds) {
for (const auto& [channel, groupId] : kind.ChannelGroups) {
// FIXME: consider previous group generations (because agent can write in obsolete tablet generation)
// !!!!!!!!!!!
diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp
index bd5e99f4049..8aac540989f 100644
--- a/ydb/core/blob_depot/data.cpp
+++ b/ydb/core/blob_depot/data.cpp
@@ -366,6 +366,7 @@ namespace NKikimr::NBlobDepot {
auto& channel = Self->Channels[record.Channel];
TGenStep nextGenStep(*--record.Trash.end());
+ std::set<TLogoBlobID>::iterator trashEndIter = record.Trash.end();
// step we are going to invalidate (including blobs with this one)
if (TGenStep(record.LeastExpectedBlobId) <= nextGenStep) {
@@ -403,50 +404,57 @@ namespace NKikimr::NBlobDepot {
// adjust the barrier to keep it safe now
const TLogoBlobID maxId(record.TabletId, record.LeastExpectedBlobId.Generation,
record.LeastExpectedBlobId.Step, record.Channel, 0, 0);
- const auto it = record.Trash.lower_bound(maxId);
- if (it != record.Trash.begin()) {
- nextGenStep = TGenStep(*std::prev(it));
+ trashEndIter = record.Trash.lower_bound(maxId);
+ if (trashEndIter != record.Trash.begin()) {
+ nextGenStep = TGenStep(*std::prev(trashEndIter));
} else {
nextGenStep = {};
}
}
- auto keep = std::make_unique<TVector<TLogoBlobID>>();
- auto doNotKeep = std::make_unique<TVector<TLogoBlobID>>();
+ TVector<TLogoBlobID> keep;
+ TVector<TLogoBlobID> doNotKeep;
+ std::vector<TLogoBlobID> trashInFlight;
- for (auto it = record.Trash.begin(); it != record.Trash.end() && TGenStep(*it) <= record.LastConfirmedGenStep; ++it) {
- doNotKeep->push_back(*it);
+ for (auto it = record.Trash.begin(); it != trashEndIter; ++it) {
+ if (const TGenStep genStep(*it); genStep <= record.LastConfirmedGenStep) {
+ doNotKeep.push_back(*it);
+ } else if (nextGenStep < genStep) {
+ Y_FAIL();
+ }
+ trashInFlight.push_back(*it);
}
const TLogoBlobID keepFrom(record.TabletId, record.LastConfirmedGenStep.Generation(),
- record.LastConfirmedGenStep.Step(), record.Channel, 0, 0);
+ record.LastConfirmedGenStep.Step(), record.Channel, TLogoBlobID::MaxBlobSize, TLogoBlobID::MaxCookie,
+ TLogoBlobID::MaxPartId, TLogoBlobID::MaxCrcMode);
for (auto it = record.Used.upper_bound(keepFrom); it != record.Used.end() && TGenStep(*it) <= nextGenStep; ++it) {
- keep->push_back(*it);
+ Y_VERIFY(record.LastConfirmedGenStep < TGenStep(*it));
+ keep.push_back(*it);
}
- if (keep->empty()) {
- keep.reset();
- }
- if (doNotKeep->empty()) {
- doNotKeep.reset();
- }
const bool collect = nextGenStep > record.LastConfirmedGenStep;
- if (!keep && !doNotKeep && !collect) {
+ if (trashInFlight.empty()) {
+ Y_VERIFY(keep.empty());
continue; // nothing to do here
}
+ auto keep_ = keep ? std::make_unique<TVector<TLogoBlobID>>(std::move(keep)) : nullptr;
+ auto doNotKeep_ = doNotKeep ? std::make_unique<TVector<TLogoBlobID>>(std::move(doNotKeep)) : nullptr;
auto ev = std::make_unique<TEvBlobStorage::TEvCollectGarbage>(record.TabletId, generation,
record.PerGenerationCounter, record.Channel, collect, nextGenStep.Generation(), nextGenStep.Step(),
- keep.get(), doNotKeep.get(), TInstant::Max(), true);
- keep.release();
- doNotKeep.release();
+ keep_.get(), doNotKeep_.get(), TInstant::Max(), true);
+ keep_.release();
+ doNotKeep_.release();
record.CollectGarbageRequestInFlight = true;
record.PerGenerationCounter += ev->Collect ? ev->PerGenerationCounterStepSize() : 0;
- record.TrashInFlight.insert(record.TrashInFlight.end(), record.Trash.begin(), record.Trash.end());
+ record.TrashInFlight.swap(trashInFlight);
record.IssuedGenStep = Max(nextGenStep, record.LastConfirmedGenStep);
+ Y_VERIFY(trashInFlight.empty());
+
record.TIntrusiveListItem<TRecordsPerChannelGroup, TRecordWithTrash>::Unlink();
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT11, "issuing TEvCollectGarbage", (Id, Self->GetLogId()),
@@ -488,6 +496,7 @@ namespace NKikimr::NBlobDepot {
ExecuteConfirmGC(record.Channel, record.GroupId, std::exchange(record.TrashInFlight, {}),
record.LastConfirmedGenStep);
} else {
+ record.TrashInFlight.clear();
record.ClearInFlight(this);
HandleTrash();
}
@@ -634,7 +643,6 @@ namespace NKikimr::NBlobDepot {
self->AccountBlob(id, false);
}
LastConfirmedGenStep = IssuedGenStep;
- EnqueueForCollectionIfPossible(self);
}
void TData::TRecordsPerChannelGroup::OnLeastExpectedBlobIdChange(TData *self, TBlobSeqId leastExpectedBlobId) {
diff --git a/ydb/core/blob_depot/data_uncertain.cpp b/ydb/core/blob_depot/data_uncertain.cpp
index 7ce56924b25..5474a98f833 100644
--- a/ydb/core/blob_depot/data_uncertain.cpp
+++ b/ydb/core/blob_depot/data_uncertain.cpp
@@ -41,7 +41,7 @@ namespace NKikimr::NBlobDepot {
void TData::TUncertaintyResolver::DropBlobs(const std::vector<TLogoBlobID>& blobIds) {
for (const TLogoBlobID& id : blobIds) {
- FinishBlob(id, false);
+ FinishBlob(id, EKeyBlobState::WASNT_WRITTEN);
}
}
@@ -49,31 +49,16 @@ namespace NKikimr::NBlobDepot {
FinishKey(key, false);
}
- void TData::TUncertaintyResolver::IssueIndexRestoreGetQuery(TKeys::value_type *keyRecord, TLogoBlobID id) {
- const auto [it, inserted] = Blobs.try_emplace(id);
- TBlobContext& blobContext = it->second;
-
- const bool inserted1 = blobContext.ReferringKeys.insert(keyRecord).second;
- Y_VERIFY(inserted1);
-
- const bool inserted2 = keyRecord->second.BlobQueriesInFlight.insert(id).second;
- Y_VERIFY(inserted2);
-
- if (inserted) {
- const ui32 groupId = Self->Info()->GroupFor(id.Channel(), id.Generation());
- SendToBSProxy(Self->SelfId(), groupId, new TEvBlobStorage::TEvGet(id, 0, 0, TInstant::Max(),
- NKikimrBlobStorage::EGetHandleClass::FastRead, true, true));
- }
- }
-
void TData::TUncertaintyResolver::Handle(TEvBlobStorage::TEvGetResult::TPtr ev) {
auto& msg = *ev->Get();
Y_VERIFY(msg.ResponseSz == 1);
auto& resp = msg.Responses[0];
- FinishBlob(resp.Id, resp.Status == NKikimrProto::OK);
+ FinishBlob(resp.Id, resp.Status == NKikimrProto::OK ? EKeyBlobState::CONFIRMED :
+ resp.Status == NKikimrProto::NODATA ? EKeyBlobState::WASNT_WRITTEN :
+ EKeyBlobState::ERROR);
}
- void TData::TUncertaintyResolver::FinishBlob(TLogoBlobID id, bool success) {
+ void TData::TUncertaintyResolver::FinishBlob(TLogoBlobID id, EKeyBlobState state) {
const auto blobIt = Blobs.find(id);
if (blobIt == Blobs.end()) {
return;
@@ -84,12 +69,9 @@ namespace NKikimr::NBlobDepot {
for (TKeys::value_type *keyRecord : blobContext.ReferringKeys) {
auto& [key, keyContext] = *keyRecord;
- const auto blobInFlightIt = keyContext.BlobQueriesInFlight.find(id);
- Y_VERIFY(blobInFlightIt != keyContext.BlobQueriesInFlight.end());
- auto blobInFlight = keyContext.BlobQueriesInFlight.extract(blobInFlightIt);
- if (success) {
- keyContext.ConfirmedBlobs.insert(std::move(blobInFlight));
- }
+ const auto blobStateIt = keyContext.BlobState.find(id);
+ Y_VERIFY(blobStateIt != keyContext.BlobState.end());
+ blobStateIt->second = state;
CheckAndFinishKeyIfPossible(keyRecord);
}
@@ -105,16 +87,40 @@ namespace NKikimr::NBlobDepot {
EnumerateBlobsForValueChain(value->ValueChain, Self->TabletID(), [&](TLogoBlobID id, ui32, ui32) {
auto& [key, keyContext] = *keyRecord;
-
- if (keyContext.ConfirmedBlobs.contains(id)) {
- // this blob is in, okay
- } else if (keyContext.BlobQueriesInFlight.contains(id)) {
- // still have to wait for this one
- okay = false;
- } else {
- // have to additionally query this blob and wait for it
- okay = false;
- IssueIndexRestoreGetQuery(keyRecord, id);
+ switch (EKeyBlobState& state = keyContext.BlobState[id]) {
+ case EKeyBlobState::INITIAL: {
+ // have to additionally query this blob and wait for it
+ TBlobContext& blobContext = Blobs[id];
+ const bool inserted = blobContext.ReferringKeys.insert(keyRecord).second;
+ Y_VERIFY(inserted);
+ if (blobContext.ReferringKeys.size() == 1) {
+ const ui32 groupId = Self->Info()->GroupFor(id.Channel(), id.Generation());
+ SendToBSProxy(Self->SelfId(), groupId, new TEvBlobStorage::TEvGet(id, 0, 0, TInstant::Max(),
+ NKikimrBlobStorage::EGetHandleClass::FastRead, true, true));
+ }
+
+ okay = false;
+ state = EKeyBlobState::QUERY_IN_FLIGHT;
+ break;
+ }
+
+ case EKeyBlobState::QUERY_IN_FLIGHT:
+ // still have to wait for this one
+ okay = false;
+ break;
+
+ case EKeyBlobState::CONFIRMED:
+ // blob was found and it is ok
+ break;
+
+ case EKeyBlobState::WASNT_WRITTEN:
+ // the blob hasn't been written completely; this may also be a race when it is being written
+ // right now, but we are asking for the data too early (like in scan request)
+ break;
+
+ case EKeyBlobState::ERROR:
+ // we can't figure out this blob's state
+ break;
}
});
} else { // key has been deleted, we have to drop it from the response
@@ -146,14 +152,16 @@ namespace NKikimr::NBlobDepot {
}
}
- for (const TLogoBlobID& id : keyContext.BlobQueriesInFlight) {
- const auto blobIt = Blobs.find(id);
- Y_VERIFY(blobIt != Blobs.end());
- TBlobContext& blobContext = blobIt->second;
- const size_t numErased = blobContext.ReferringKeys.erase(&*keyIt);
- Y_VERIFY(numErased == 1);
- if (blobContext.ReferringKeys.empty()) {
- Blobs.erase(blobIt);
+ for (const auto& [id, state] : keyContext.BlobState) {
+ if (state == EKeyBlobState::QUERY_IN_FLIGHT) {
+ const auto blobIt = Blobs.find(id);
+ Y_VERIFY(blobIt != Blobs.end());
+ TBlobContext& blobContext = blobIt->second;
+ const size_t numErased = blobContext.ReferringKeys.erase(&*keyIt);
+ Y_VERIFY(numErased == 1);
+ if (blobContext.ReferringKeys.empty()) {
+ Blobs.erase(blobIt);
+ }
}
}
}
diff --git a/ydb/core/blob_depot/data_uncertain.h b/ydb/core/blob_depot/data_uncertain.h
index 8f461573ad7..66ca3a7ab6d 100644
--- a/ydb/core/blob_depot/data_uncertain.h
+++ b/ydb/core/blob_depot/data_uncertain.h
@@ -22,15 +22,20 @@ namespace NKikimr::NBlobDepot {
{}
};
+ enum class EKeyBlobState {
+ INITIAL, // just created blob, no activity
+ QUERY_IN_FLIGHT, // blob should have BlobContext referring to this key too
+ CONFIRMED, // we got OK for this blob
+ WASNT_WRITTEN, // we got NODATA for this blob, this key needs to be deleted if possible
+ ERROR, // we got ERROR or any other reply for this blob
+ };
+
struct TKeyContext {
// requests dependent on this key
std::vector<TIntrusivePtr<TResolveOnHold>> DependentRequests;
- // blob queries involved
- std::set<TLogoBlobID> BlobQueriesInFlight;
-
- // found blobs
- std::set<TLogoBlobID> ConfirmedBlobs;
+ // blob queries issued and replied
+ std::unordered_map<TLogoBlobID, EKeyBlobState> BlobState;
};
using TKeys = std::map<TKey, TKeyContext>;
@@ -52,8 +57,7 @@ namespace NKikimr::NBlobDepot {
void Handle(TEvBlobStorage::TEvGetResult::TPtr ev);
private:
- void IssueIndexRestoreGetQuery(TKeys::value_type *keyRecord, TLogoBlobID id);
- void FinishBlob(TLogoBlobID id, bool success);
+ void FinishBlob(TLogoBlobID id, EKeyBlobState state);
void CheckAndFinishKeyIfPossible(TKeys::value_type *keyRecord);
void FinishKey(const TKey& key, bool success);
};
diff --git a/ydb/core/blob_depot/op_commit_blob_seq.cpp b/ydb/core/blob_depot/op_commit_blob_seq.cpp
index a76ce189bcf..972b6dd6779 100644
--- a/ydb/core/blob_depot/op_commit_blob_seq.cpp
+++ b/ydb/core/blob_depot/op_commit_blob_seq.cpp
@@ -140,6 +140,8 @@ namespace NKikimr::NBlobDepot {
STLOG(PRI_DEBUG, BLOB_DEPOT, BDTxx, "TEvDiscardSpoiledBlobSeq", (Id, GetLogId()), (AgentId, agent.ConnectedNodeId),
(Msg, ev->Get()->Record));
+ // FIXME(alexvru): delete uncertain keys containing this BlobSeqId as they were never written
+
for (const auto& item : ev->Get()->Record.GetItems()) {
const auto blobSeqId = TBlobSeqId::FromProto(item);
if (blobSeqId.Generation == generation) {
diff --git a/ydb/core/util/testactorsys.h b/ydb/core/util/testactorsys.h
index bd3976c79e1..ffae02033f9 100644
--- a/ydb/core/util/testactorsys.h
+++ b/ydb/core/util/testactorsys.h
@@ -203,6 +203,13 @@ public:
return &AppData;
}
+ template<typename T>
+ void EnumActors(T&& callback) {
+ for (const auto& [actor, _] : ActorName) {
+ callback(actor);
+ }
+ }
+
void Start() {
for (auto& [nodeId, info] : PerNodeInfo) {
SetupNode(nodeId, info);
@@ -460,6 +467,14 @@ public:
GetNode(nodeId)->ActorSystem->RegisterLocalService(serviceId, actorId);
}
+ bool HasImmediateEvents() const {
+ if (ScheduleQ.empty()) {
+ return false;
+ }
+ const auto& [ts, _] = *ScheduleQ.begin();
+ return ts <= Clock;
+ }
+
template<typename TCallback>
void Sim(TCallback&& callback, std::function<void(IEventHandle&)> witness = {}) {
bool progress = true;