diff options
author | alexvru <alexvru@ydb.tech> | 2022-09-19 10:44:42 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2022-09-19 10:44:42 +0300 |
commit | b8ba0035bc558ad4ec20e5e24447a5f20a7ee221 (patch) | |
tree | 99b8065271debd7c13eb0f8e86bda05fb7b7ad82 | |
parent | 1703ce1bceb11720104e0c3610edb0bc5100153c (diff) | |
download | ydb-b8ba0035bc558ad4ec20e5e24447a5f20a7ee221.tar.gz |
Improve BlobDepot a bit
-rw-r--r-- | ydb/core/blob_depot/blocks.cpp | 2 | ||||
-rw-r--r-- | ydb/core/blob_depot/data.cpp | 50 | ||||
-rw-r--r-- | ydb/core/blob_depot/data_uncertain.cpp | 96 | ||||
-rw-r--r-- | ydb/core/blob_depot/data_uncertain.h | 18 | ||||
-rw-r--r-- | ydb/core/blob_depot/op_commit_blob_seq.cpp | 2 | ||||
-rw-r--r-- | ydb/core/util/testactorsys.h | 15 |
6 files changed, 110 insertions, 73 deletions
diff --git a/ydb/core/blob_depot/blocks.cpp b/ydb/core/blob_depot/blocks.cpp index 8c6778f9918..9505929d943 100644 --- a/ydb/core/blob_depot/blocks.cpp +++ b/ydb/core/blob_depot/blocks.cpp @@ -132,7 +132,7 @@ namespace NKikimr::NBlobDepot { void IssueBlocksToStorage() { THashSet<ui32> processedGroups; - for (const auto& [_, kind] : Self->ChannelKinds) { // FIXME: SIGSEGV here? + for (const auto& [_, kind] : Self->ChannelKinds) { for (const auto& [channel, groupId] : kind.ChannelGroups) { // FIXME: consider previous group generations (because agent can write in obsolete tablet generation) // !!!!!!!!!!! diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp index bd5e99f4049..8aac540989f 100644 --- a/ydb/core/blob_depot/data.cpp +++ b/ydb/core/blob_depot/data.cpp @@ -366,6 +366,7 @@ namespace NKikimr::NBlobDepot { auto& channel = Self->Channels[record.Channel]; TGenStep nextGenStep(*--record.Trash.end()); + std::set<TLogoBlobID>::iterator trashEndIter = record.Trash.end(); // step we are going to invalidate (including blobs with this one) if (TGenStep(record.LeastExpectedBlobId) <= nextGenStep) { @@ -403,50 +404,57 @@ namespace NKikimr::NBlobDepot { // adjust the barrier to keep it safe now const TLogoBlobID maxId(record.TabletId, record.LeastExpectedBlobId.Generation, record.LeastExpectedBlobId.Step, record.Channel, 0, 0); - const auto it = record.Trash.lower_bound(maxId); - if (it != record.Trash.begin()) { - nextGenStep = TGenStep(*std::prev(it)); + trashEndIter = record.Trash.lower_bound(maxId); + if (trashEndIter != record.Trash.begin()) { + nextGenStep = TGenStep(*std::prev(trashEndIter)); } else { nextGenStep = {}; } } - auto keep = std::make_unique<TVector<TLogoBlobID>>(); - auto doNotKeep = std::make_unique<TVector<TLogoBlobID>>(); + TVector<TLogoBlobID> keep; + TVector<TLogoBlobID> doNotKeep; + std::vector<TLogoBlobID> trashInFlight; - for (auto it = record.Trash.begin(); it != record.Trash.end() && TGenStep(*it) <= record.LastConfirmedGenStep; ++it) { - doNotKeep->push_back(*it); + for (auto it = record.Trash.begin(); it != trashEndIter; ++it) { + if (const TGenStep genStep(*it); genStep <= record.LastConfirmedGenStep) { + doNotKeep.push_back(*it); + } else if (nextGenStep < genStep) { + Y_FAIL(); + } + trashInFlight.push_back(*it); } const TLogoBlobID keepFrom(record.TabletId, record.LastConfirmedGenStep.Generation(), - record.LastConfirmedGenStep.Step(), record.Channel, 0, 0); + record.LastConfirmedGenStep.Step(), record.Channel, TLogoBlobID::MaxBlobSize, TLogoBlobID::MaxCookie, + TLogoBlobID::MaxPartId, TLogoBlobID::MaxCrcMode); for (auto it = record.Used.upper_bound(keepFrom); it != record.Used.end() && TGenStep(*it) <= nextGenStep; ++it) { - keep->push_back(*it); + Y_VERIFY(record.LastConfirmedGenStep < TGenStep(*it)); + keep.push_back(*it); } - if (keep->empty()) { - keep.reset(); - } - if (doNotKeep->empty()) { - doNotKeep.reset(); - } const bool collect = nextGenStep > record.LastConfirmedGenStep; - if (!keep && !doNotKeep && !collect) { + if (trashInFlight.empty()) { + Y_VERIFY(keep.empty()); continue; // nothing to do here } + auto keep_ = keep ? std::make_unique<TVector<TLogoBlobID>>(std::move(keep)) : nullptr; + auto doNotKeep_ = doNotKeep ? std::make_unique<TVector<TLogoBlobID>>(std::move(doNotKeep)) : nullptr; auto ev = std::make_unique<TEvBlobStorage::TEvCollectGarbage>(record.TabletId, generation, record.PerGenerationCounter, record.Channel, collect, nextGenStep.Generation(), nextGenStep.Step(), - keep.get(), doNotKeep.get(), TInstant::Max(), true); - keep.release(); - doNotKeep.release(); + keep_.get(), doNotKeep_.get(), TInstant::Max(), true); + keep_.release(); + doNotKeep_.release(); record.CollectGarbageRequestInFlight = true; record.PerGenerationCounter += ev->Collect ? ev->PerGenerationCounterStepSize() : 0; - record.TrashInFlight.insert(record.TrashInFlight.end(), record.Trash.begin(), record.Trash.end()); + record.TrashInFlight.swap(trashInFlight); record.IssuedGenStep = Max(nextGenStep, record.LastConfirmedGenStep); + Y_VERIFY(trashInFlight.empty()); + record.TIntrusiveListItem<TRecordsPerChannelGroup, TRecordWithTrash>::Unlink(); STLOG(PRI_DEBUG, BLOB_DEPOT, BDT11, "issuing TEvCollectGarbage", (Id, Self->GetLogId()), @@ -488,6 +496,7 @@ namespace NKikimr::NBlobDepot { ExecuteConfirmGC(record.Channel, record.GroupId, std::exchange(record.TrashInFlight, {}), record.LastConfirmedGenStep); } else { + record.TrashInFlight.clear(); record.ClearInFlight(this); HandleTrash(); } @@ -634,7 +643,6 @@ namespace NKikimr::NBlobDepot { self->AccountBlob(id, false); } LastConfirmedGenStep = IssuedGenStep; - EnqueueForCollectionIfPossible(self); } void TData::TRecordsPerChannelGroup::OnLeastExpectedBlobIdChange(TData *self, TBlobSeqId leastExpectedBlobId) { diff --git a/ydb/core/blob_depot/data_uncertain.cpp b/ydb/core/blob_depot/data_uncertain.cpp index 7ce56924b25..5474a98f833 100644 --- a/ydb/core/blob_depot/data_uncertain.cpp +++ b/ydb/core/blob_depot/data_uncertain.cpp @@ -41,7 +41,7 @@ namespace NKikimr::NBlobDepot { void TData::TUncertaintyResolver::DropBlobs(const std::vector<TLogoBlobID>& blobIds) { for (const TLogoBlobID& id : blobIds) { - FinishBlob(id, false); + FinishBlob(id, EKeyBlobState::WASNT_WRITTEN); } } @@ -49,31 +49,16 @@ namespace NKikimr::NBlobDepot { FinishKey(key, false); } - void TData::TUncertaintyResolver::IssueIndexRestoreGetQuery(TKeys::value_type *keyRecord, TLogoBlobID id) { - const auto [it, inserted] = Blobs.try_emplace(id); - TBlobContext& blobContext = it->second; - - const bool inserted1 = blobContext.ReferringKeys.insert(keyRecord).second; - Y_VERIFY(inserted1); - - const bool inserted2 = keyRecord->second.BlobQueriesInFlight.insert(id).second; - Y_VERIFY(inserted2); - - if (inserted) { - const ui32 groupId = Self->Info()->GroupFor(id.Channel(), id.Generation()); - SendToBSProxy(Self->SelfId(), groupId, new TEvBlobStorage::TEvGet(id, 0, 0, TInstant::Max(), - NKikimrBlobStorage::EGetHandleClass::FastRead, true, true)); - } - } - void TData::TUncertaintyResolver::Handle(TEvBlobStorage::TEvGetResult::TPtr ev) { auto& msg = *ev->Get(); Y_VERIFY(msg.ResponseSz == 1); auto& resp = msg.Responses[0]; - FinishBlob(resp.Id, resp.Status == NKikimrProto::OK); + FinishBlob(resp.Id, resp.Status == NKikimrProto::OK ? EKeyBlobState::CONFIRMED : + resp.Status == NKikimrProto::NODATA ? EKeyBlobState::WASNT_WRITTEN : + EKeyBlobState::ERROR); } - void TData::TUncertaintyResolver::FinishBlob(TLogoBlobID id, bool success) { + void TData::TUncertaintyResolver::FinishBlob(TLogoBlobID id, EKeyBlobState state) { const auto blobIt = Blobs.find(id); if (blobIt == Blobs.end()) { return; @@ -84,12 +69,9 @@ namespace NKikimr::NBlobDepot { for (TKeys::value_type *keyRecord : blobContext.ReferringKeys) { auto& [key, keyContext] = *keyRecord; - const auto blobInFlightIt = keyContext.BlobQueriesInFlight.find(id); - Y_VERIFY(blobInFlightIt != keyContext.BlobQueriesInFlight.end()); - auto blobInFlight = keyContext.BlobQueriesInFlight.extract(blobInFlightIt); - if (success) { - keyContext.ConfirmedBlobs.insert(std::move(blobInFlight)); - } + const auto blobStateIt = keyContext.BlobState.find(id); + Y_VERIFY(blobStateIt != keyContext.BlobState.end()); + blobStateIt->second = state; CheckAndFinishKeyIfPossible(keyRecord); } @@ -105,16 +87,40 @@ namespace NKikimr::NBlobDepot { EnumerateBlobsForValueChain(value->ValueChain, Self->TabletID(), [&](TLogoBlobID id, ui32, ui32) { auto& [key, keyContext] = *keyRecord; - - if (keyContext.ConfirmedBlobs.contains(id)) { - // this blob is in, okay - } else if (keyContext.BlobQueriesInFlight.contains(id)) { - // still have to wait for this one - okay = false; - } else { - // have to additionally query this blob and wait for it - okay = false; - IssueIndexRestoreGetQuery(keyRecord, id); + switch (EKeyBlobState& state = keyContext.BlobState[id]) { + case EKeyBlobState::INITIAL: { + // have to additionally query this blob and wait for it + TBlobContext& blobContext = Blobs[id]; + const bool inserted = blobContext.ReferringKeys.insert(keyRecord).second; + Y_VERIFY(inserted); + if (blobContext.ReferringKeys.size() == 1) { + const ui32 groupId = Self->Info()->GroupFor(id.Channel(), id.Generation()); + SendToBSProxy(Self->SelfId(), groupId, new TEvBlobStorage::TEvGet(id, 0, 0, TInstant::Max(), + NKikimrBlobStorage::EGetHandleClass::FastRead, true, true)); + } + + okay = false; + state = EKeyBlobState::QUERY_IN_FLIGHT; + break; + } + + case EKeyBlobState::QUERY_IN_FLIGHT: + // still have to wait for this one + okay = false; + break; + + case EKeyBlobState::CONFIRMED: + // blob was found and it is ok + break; + + case EKeyBlobState::WASNT_WRITTEN: + // the blob hasn't been written completely; this may also be a race when it is being written + // right now, but we are asking for the data too early (like in scan request) + break; + + case EKeyBlobState::ERROR: + // we can't figure out this blob's state + break; } }); } else { // key has been deleted, we have to drop it from the response @@ -146,14 +152,16 @@ namespace NKikimr::NBlobDepot { } } - for (const TLogoBlobID& id : keyContext.BlobQueriesInFlight) { - const auto blobIt = Blobs.find(id); - Y_VERIFY(blobIt != Blobs.end()); - TBlobContext& blobContext = blobIt->second; - const size_t numErased = blobContext.ReferringKeys.erase(&*keyIt); - Y_VERIFY(numErased == 1); - if (blobContext.ReferringKeys.empty()) { - Blobs.erase(blobIt); + for (const auto& [id, state] : keyContext.BlobState) { + if (state == EKeyBlobState::QUERY_IN_FLIGHT) { + const auto blobIt = Blobs.find(id); + Y_VERIFY(blobIt != Blobs.end()); + TBlobContext& blobContext = blobIt->second; + const size_t numErased = blobContext.ReferringKeys.erase(&*keyIt); + Y_VERIFY(numErased == 1); + if (blobContext.ReferringKeys.empty()) { + Blobs.erase(blobIt); + } } } } diff --git a/ydb/core/blob_depot/data_uncertain.h b/ydb/core/blob_depot/data_uncertain.h index 8f461573ad7..66ca3a7ab6d 100644 --- a/ydb/core/blob_depot/data_uncertain.h +++ b/ydb/core/blob_depot/data_uncertain.h @@ -22,15 +22,20 @@ namespace NKikimr::NBlobDepot { {} }; + enum class EKeyBlobState { + INITIAL, // just created blob, no activity + QUERY_IN_FLIGHT, // blob should have BlobContext referring to this key too + CONFIRMED, // we got OK for this blob + WASNT_WRITTEN, // we got NODATA for this blob, this key needs to be deleted if possible + ERROR, // we got ERROR or any other reply for this blob + }; + struct TKeyContext { // requests dependent on this key std::vector<TIntrusivePtr<TResolveOnHold>> DependentRequests; - // blob queries involved - std::set<TLogoBlobID> BlobQueriesInFlight; - - // found blobs - std::set<TLogoBlobID> ConfirmedBlobs; + // blob queries issued and replied + std::unordered_map<TLogoBlobID, EKeyBlobState> BlobState; }; using TKeys = std::map<TKey, TKeyContext>; @@ -52,8 +57,7 @@ namespace NKikimr::NBlobDepot { void Handle(TEvBlobStorage::TEvGetResult::TPtr ev); private: - void IssueIndexRestoreGetQuery(TKeys::value_type *keyRecord, TLogoBlobID id); - void FinishBlob(TLogoBlobID id, bool success); + void FinishBlob(TLogoBlobID id, EKeyBlobState state); void CheckAndFinishKeyIfPossible(TKeys::value_type *keyRecord); void FinishKey(const TKey& key, bool success); }; diff --git a/ydb/core/blob_depot/op_commit_blob_seq.cpp b/ydb/core/blob_depot/op_commit_blob_seq.cpp index a76ce189bcf..972b6dd6779 100644 --- a/ydb/core/blob_depot/op_commit_blob_seq.cpp +++ b/ydb/core/blob_depot/op_commit_blob_seq.cpp @@ -140,6 +140,8 @@ namespace NKikimr::NBlobDepot { STLOG(PRI_DEBUG, BLOB_DEPOT, BDTxx, "TEvDiscardSpoiledBlobSeq", (Id, GetLogId()), (AgentId, agent.ConnectedNodeId), (Msg, ev->Get()->Record)); + // FIXME(alexvru): delete uncertain keys containing this BlobSeqId as they were never written + for (const auto& item : ev->Get()->Record.GetItems()) { const auto blobSeqId = TBlobSeqId::FromProto(item); if (blobSeqId.Generation == generation) { diff --git a/ydb/core/util/testactorsys.h b/ydb/core/util/testactorsys.h index bd3976c79e1..ffae02033f9 100644 --- a/ydb/core/util/testactorsys.h +++ b/ydb/core/util/testactorsys.h @@ -203,6 +203,13 @@ public: return &AppData; } + template<typename T> + void EnumActors(T&& callback) { + for (const auto& [actor, _] : ActorName) { + callback(actor); + } + } + void Start() { for (auto& [nodeId, info] : PerNodeInfo) { SetupNode(nodeId, info); @@ -460,6 +467,14 @@ public: GetNode(nodeId)->ActorSystem->RegisterLocalService(serviceId, actorId); } + bool HasImmediateEvents() const { + if (ScheduleQ.empty()) { + return false; + } + const auto& [ts, _] = *ScheduleQ.begin(); + return ts <= Clock; + } + template<typename TCallback> void Sim(TCallback&& callback, std::function<void(IEventHandle&)> witness = {}) { bool progress = true; |