diff options
author | alexvru <alexvru@ydb.tech> | 2022-08-30 22:07:22 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2022-08-30 22:07:22 +0300 |
commit | 9ef050cd54a801a2e844bced332b8aaba789f14a (patch) | |
tree | 6bd0973df85f57f7cc974902702a92e29af76be0 | |
parent | d5e6e9e495002c513ed04fa2b6db97f05c7995fa (diff) | |
download | ydb-9ef050cd54a801a2e844bced332b8aaba789f14a.tar.gz |
BlobDepot work in progress
-rw-r--r-- | ydb/core/blob_depot/assimilator.cpp | 65 | ||||
-rw-r--r-- | ydb/core/blob_depot/assimilator.h | 4 | ||||
-rw-r--r-- | ydb/core/blob_depot/data.h | 7 |
3 files changed, 62 insertions, 14 deletions
diff --git a/ydb/core/blob_depot/assimilator.cpp b/ydb/core/blob_depot/assimilator.cpp index 84dae88a4d..450d415f50 100644 --- a/ydb/core/blob_depot/assimilator.cpp +++ b/ydb/core/blob_depot/assimilator.cpp @@ -61,6 +61,7 @@ namespace NKikimr::NBlobDepot { hFunc(TEvTabletPipe::TEvClientDestroyed, Handle); hFunc(TEvBlobStorage::TEvControllerGroupDecommittedResponse, Handle); cFunc(TEvPrivate::EvResume, Action); + cFunc(TEvPrivate::EvTxComplete, HandleTxComplete); cFunc(TEvents::TSystem::Poison, PassAway); default: @@ -216,7 +217,6 @@ namespace NKikimr::NBlobDepot { } void TAssimilator::ScanDataForCopying() { - const bool fromTheBeginning = !LastScannedKey; STLOG(PRI_DEBUG, BLOB_DEPOT, BDT54, "TAssimilator::ScanDataForCopying", (Id, Self->GetLogId()), (LastScannedKey, LastScannedKey)); @@ -231,8 +231,16 @@ namespace NKikimr::NBlobDepot { }; std::deque<TScanQueueItem> scanQ; ui32 totalSize = 0; + THPTimer timer; + ui32 numItems = 0; auto callback = [&](const TData::TKey& key, const TData::TValue& value) { + if (++numItems == 1000) { + numItems = 0; + if (TDuration::Seconds(timer.Passed()) >= TDuration::MilliSeconds(1)) { + return false; + } + } if (!value.OriginalBlobId) { LastScannedKey.emplace(key.GetBlobId()); return true; // keep scanning @@ -241,7 +249,7 @@ namespace NKikimr::NBlobDepot { LastScannedKey.emplace(key.GetBlobId()); scanQ.push_back({.Key = *LastScannedKey, .OriginalBlobId = id}); totalSize += id.BlobSize(); - NeedfulBlobs.insert(id); + EntriesToProcess = true; return totalSize < MaxSizeToQuery; } else { return false; // a blob belonging to different tablet @@ -249,7 +257,11 @@ namespace NKikimr::NBlobDepot { }; // FIXME: reentrable as it shares mailbox with the BlobDepot tablet itself - Self->Data->ScanRange(LastScannedKey ? &lastScannedKey : nullptr, nullptr, {}, callback); + const bool finished = Self->Data->ScanRange(LastScannedKey ? &lastScannedKey : nullptr, nullptr, {}, callback); + + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT56, "ScanDataForCopying done", (Id, Self->GetLogId()), + (LastScannedKey, LastScannedKey), (ScanQ.size, scanQ.size()), (EntriesToProcess, EntriesToProcess), + (Finished, finished)); if (!scanQ.empty()) { using TQuery = TEvBlobStorage::TEvGet::TQuery; @@ -263,16 +275,44 @@ namespace NKikimr::NBlobDepot { auto ev = std::make_unique<TEvBlobStorage::TEvGet>(queries, sz, TInstant::Max(), NKikimrBlobStorage::EGetHandleClass::FastRead); ev->Decommission = true; SendToBSProxy(SelfId(), Self->Config.GetDecommitGroupId(), ev.release()); - } else if (fromTheBeginning) { + } else if (!finished) { // timeout hit, reschedule work + TActivationContext::Send(new IEventHandle(TEvPrivate::EvResume, 0, SelfId(), {}, nullptr, 0)); + } else if (!EntriesToProcess) { // we have finished scanning the whole table without any entries, copying is done OnCopyDone(); - } else { - // restart the scan from the beginning and find other keys to copy or finish it + } else { // we have finished scanning, but we have replicated some data, restart scanning to ensure that nothing left LastScannedKey.reset(); + EntriesToProcess = false; TActivationContext::Send(new IEventHandle(TEvPrivate::EvResume, 0, SelfId(), {}, nullptr, 0)); } } void TAssimilator::Handle(TEvBlobStorage::TEvGetResult::TPtr ev) { + class TTxDropBlobIfNoData : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { + const TLogoBlobID Id; + const TActorId AssimilatorId; + + public: + TTxDropBlobIfNoData(TBlobDepot *self, TLogoBlobID id, TActorId assimilatorId) + : TTransactionBase(self) + , Id(id) + , AssimilatorId(assimilatorId) + {} + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + const TData::TKey key(Id); + if (const TData::TValue *v = Self->Data->FindKey(key); v && v->OriginalBlobId && + v->KeepState != NKikimrBlobDepot::EKeepState::Keep) { + Self->Data->DeleteKey(key, txc, this); + } + return true; + } + + void Complete(const TActorContext&) override { + Self->Data->CommitTrash(this); + TActivationContext::Send(new IEventHandle(TEvPrivate::EvTxComplete, 0, AssimilatorId, {}, nullptr, 0)); + } + }; + auto& msg = *ev->Get(); for (ui32 i = 0; i < msg.ResponseSz; ++i) { auto& resp = msg.Responses[i]; @@ -283,6 +323,9 @@ namespace NKikimr::NBlobDepot { ev->Decommission = true; SendToBSProxy(SelfId(), Self->Config.GetDecommitGroupId(), ev.release()); ++NumPutsInFlight; + } else if (resp.Status == NKikimrProto::NODATA) { + Self->Execute(std::make_unique<TTxDropBlobIfNoData>(Self, resp.Id, SelfId())); + ++NumPutsInFlight; } } if (!NumPutsInFlight) { @@ -290,14 +333,16 @@ namespace NKikimr::NBlobDepot { } } + void TAssimilator::HandleTxComplete() { + if (!--NumPutsInFlight) { + Action(); + } + } + void TAssimilator::Handle(TEvBlobStorage::TEvPutResult::TPtr ev) { auto& msg = *ev->Get(); STLOG(PRI_DEBUG, BLOB_DEPOT, BDT37, "got TEvPutResult", (Id, Self->GetLogId()), (Msg, msg), (NumPutsInFlight, NumPutsInFlight)); - if (msg.Status == NKikimrProto::OK) { - const size_t numErased = NeedfulBlobs.erase(msg.Id); - Y_VERIFY(numErased == 1); - } if (!--NumPutsInFlight) { IssueCollects(); } diff --git a/ydb/core/blob_depot/assimilator.h b/ydb/core/blob_depot/assimilator.h index aa6d4088d2..385d4ce9e9 100644 --- a/ydb/core/blob_depot/assimilator.h +++ b/ydb/core/blob_depot/assimilator.h @@ -9,6 +9,7 @@ namespace NKikimr::NBlobDepot { struct TEvPrivate { enum { EvResume = EventSpaceBegin(TEvents::ES_PRIVATE), + EvTxComplete, }; }; @@ -20,7 +21,7 @@ namespace NKikimr::NBlobDepot { std::optional<TLogoBlobID> SkipBlobsUpTo; std::optional<TLogoBlobID> LastScannedKey; - std::set<TLogoBlobID> NeedfulBlobs; // in current tablet, original blob ids + bool EntriesToProcess = false; static constexpr ui32 MaxSizeToQuery = 10'000'000; @@ -47,6 +48,7 @@ namespace NKikimr::NBlobDepot { void Handle(TEvBlobStorage::TEvAssimilateResult::TPtr ev); void ScanDataForCopying(); void Handle(TEvBlobStorage::TEvGetResult::TPtr ev); + void HandleTxComplete(); void Handle(TEvBlobStorage::TEvPutResult::TPtr ev); void IssueCollects(); void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev); diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h index d94b8e6a31..619506dcf0 100644 --- a/ydb/core/blob_depot/data.h +++ b/ydb/core/blob_depot/data.h @@ -370,7 +370,7 @@ namespace NKikimr::NBlobDepot { {} template<typename TCallback> - void ScanRange(const TKey *begin, const TKey *end, TScanFlags flags, TCallback&& callback) { + bool ScanRange(const TKey *begin, const TKey *end, TScanFlags flags, TCallback&& callback) { auto beginIt = !begin ? Data.begin() : flags & EScanFlags::INCLUDE_BEGIN ? Data.lower_bound(*begin) : Data.upper_bound(*begin); @@ -385,7 +385,7 @@ namespace NKikimr::NBlobDepot { do { auto& current = *endIt--; if (!callback(current.first, current.second)) { - break; + return false; } } while (beginIt != endIt); } @@ -393,10 +393,11 @@ namespace NKikimr::NBlobDepot { while (beginIt != endIt) { auto& current = *beginIt++; if (!callback(current.first, current.second)) { - break; + return false; } } } + return true; } const TValue *FindKey(const TKey& key) const; |