aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2022-08-30 22:07:22 +0300
committeralexvru <alexvru@ydb.tech>2022-08-30 22:07:22 +0300
commit9ef050cd54a801a2e844bced332b8aaba789f14a (patch)
tree6bd0973df85f57f7cc974902702a92e29af76be0
parentd5e6e9e495002c513ed04fa2b6db97f05c7995fa (diff)
downloadydb-9ef050cd54a801a2e844bced332b8aaba789f14a.tar.gz
BlobDepot work in progress
-rw-r--r--ydb/core/blob_depot/assimilator.cpp65
-rw-r--r--ydb/core/blob_depot/assimilator.h4
-rw-r--r--ydb/core/blob_depot/data.h7
3 files changed, 62 insertions, 14 deletions
diff --git a/ydb/core/blob_depot/assimilator.cpp b/ydb/core/blob_depot/assimilator.cpp
index 84dae88a4d..450d415f50 100644
--- a/ydb/core/blob_depot/assimilator.cpp
+++ b/ydb/core/blob_depot/assimilator.cpp
@@ -61,6 +61,7 @@ namespace NKikimr::NBlobDepot {
hFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
hFunc(TEvBlobStorage::TEvControllerGroupDecommittedResponse, Handle);
cFunc(TEvPrivate::EvResume, Action);
+ cFunc(TEvPrivate::EvTxComplete, HandleTxComplete);
cFunc(TEvents::TSystem::Poison, PassAway);
default:
@@ -216,7 +217,6 @@ namespace NKikimr::NBlobDepot {
}
void TAssimilator::ScanDataForCopying() {
- const bool fromTheBeginning = !LastScannedKey;
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT54, "TAssimilator::ScanDataForCopying", (Id, Self->GetLogId()),
(LastScannedKey, LastScannedKey));
@@ -231,8 +231,16 @@ namespace NKikimr::NBlobDepot {
};
std::deque<TScanQueueItem> scanQ;
ui32 totalSize = 0;
+ THPTimer timer;
+ ui32 numItems = 0;
auto callback = [&](const TData::TKey& key, const TData::TValue& value) {
+ if (++numItems == 1000) {
+ numItems = 0;
+ if (TDuration::Seconds(timer.Passed()) >= TDuration::MilliSeconds(1)) {
+ return false;
+ }
+ }
if (!value.OriginalBlobId) {
LastScannedKey.emplace(key.GetBlobId());
return true; // keep scanning
@@ -241,7 +249,7 @@ namespace NKikimr::NBlobDepot {
LastScannedKey.emplace(key.GetBlobId());
scanQ.push_back({.Key = *LastScannedKey, .OriginalBlobId = id});
totalSize += id.BlobSize();
- NeedfulBlobs.insert(id);
+ EntriesToProcess = true;
return totalSize < MaxSizeToQuery;
} else {
return false; // a blob belonging to different tablet
@@ -249,7 +257,11 @@ namespace NKikimr::NBlobDepot {
};
// FIXME: reentrable as it shares mailbox with the BlobDepot tablet itself
- Self->Data->ScanRange(LastScannedKey ? &lastScannedKey : nullptr, nullptr, {}, callback);
+ const bool finished = Self->Data->ScanRange(LastScannedKey ? &lastScannedKey : nullptr, nullptr, {}, callback);
+
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT56, "ScanDataForCopying done", (Id, Self->GetLogId()),
+ (LastScannedKey, LastScannedKey), (ScanQ.size, scanQ.size()), (EntriesToProcess, EntriesToProcess),
+ (Finished, finished));
if (!scanQ.empty()) {
using TQuery = TEvBlobStorage::TEvGet::TQuery;
@@ -263,16 +275,44 @@ namespace NKikimr::NBlobDepot {
auto ev = std::make_unique<TEvBlobStorage::TEvGet>(queries, sz, TInstant::Max(), NKikimrBlobStorage::EGetHandleClass::FastRead);
ev->Decommission = true;
SendToBSProxy(SelfId(), Self->Config.GetDecommitGroupId(), ev.release());
- } else if (fromTheBeginning) {
+ } else if (!finished) { // timeout hit, reschedule work
+ TActivationContext::Send(new IEventHandle(TEvPrivate::EvResume, 0, SelfId(), {}, nullptr, 0));
+ } else if (!EntriesToProcess) { // we have finished scanning the whole table without any entries, copying is done
OnCopyDone();
- } else {
- // restart the scan from the beginning and find other keys to copy or finish it
+ } else { // we have finished scanning, but we have replicated some data, restart scanning to ensure that nothing left
LastScannedKey.reset();
+ EntriesToProcess = false;
TActivationContext::Send(new IEventHandle(TEvPrivate::EvResume, 0, SelfId(), {}, nullptr, 0));
}
}
void TAssimilator::Handle(TEvBlobStorage::TEvGetResult::TPtr ev) {
+ class TTxDropBlobIfNoData : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
+ const TLogoBlobID Id;
+ const TActorId AssimilatorId;
+
+ public:
+ TTxDropBlobIfNoData(TBlobDepot *self, TLogoBlobID id, TActorId assimilatorId)
+ : TTransactionBase(self)
+ , Id(id)
+ , AssimilatorId(assimilatorId)
+ {}
+
+ bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ const TData::TKey key(Id);
+ if (const TData::TValue *v = Self->Data->FindKey(key); v && v->OriginalBlobId &&
+ v->KeepState != NKikimrBlobDepot::EKeepState::Keep) {
+ Self->Data->DeleteKey(key, txc, this);
+ }
+ return true;
+ }
+
+ void Complete(const TActorContext&) override {
+ Self->Data->CommitTrash(this);
+ TActivationContext::Send(new IEventHandle(TEvPrivate::EvTxComplete, 0, AssimilatorId, {}, nullptr, 0));
+ }
+ };
+
auto& msg = *ev->Get();
for (ui32 i = 0; i < msg.ResponseSz; ++i) {
auto& resp = msg.Responses[i];
@@ -283,6 +323,9 @@ namespace NKikimr::NBlobDepot {
ev->Decommission = true;
SendToBSProxy(SelfId(), Self->Config.GetDecommitGroupId(), ev.release());
++NumPutsInFlight;
+ } else if (resp.Status == NKikimrProto::NODATA) {
+ Self->Execute(std::make_unique<TTxDropBlobIfNoData>(Self, resp.Id, SelfId()));
+ ++NumPutsInFlight;
}
}
if (!NumPutsInFlight) {
@@ -290,14 +333,16 @@ namespace NKikimr::NBlobDepot {
}
}
+ void TAssimilator::HandleTxComplete() {
+ if (!--NumPutsInFlight) {
+ Action();
+ }
+ }
+
void TAssimilator::Handle(TEvBlobStorage::TEvPutResult::TPtr ev) {
auto& msg = *ev->Get();
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT37, "got TEvPutResult", (Id, Self->GetLogId()), (Msg, msg),
(NumPutsInFlight, NumPutsInFlight));
- if (msg.Status == NKikimrProto::OK) {
- const size_t numErased = NeedfulBlobs.erase(msg.Id);
- Y_VERIFY(numErased == 1);
- }
if (!--NumPutsInFlight) {
IssueCollects();
}
diff --git a/ydb/core/blob_depot/assimilator.h b/ydb/core/blob_depot/assimilator.h
index aa6d4088d2..385d4ce9e9 100644
--- a/ydb/core/blob_depot/assimilator.h
+++ b/ydb/core/blob_depot/assimilator.h
@@ -9,6 +9,7 @@ namespace NKikimr::NBlobDepot {
struct TEvPrivate {
enum {
EvResume = EventSpaceBegin(TEvents::ES_PRIVATE),
+ EvTxComplete,
};
};
@@ -20,7 +21,7 @@ namespace NKikimr::NBlobDepot {
std::optional<TLogoBlobID> SkipBlobsUpTo;
std::optional<TLogoBlobID> LastScannedKey;
- std::set<TLogoBlobID> NeedfulBlobs; // in current tablet, original blob ids
+ bool EntriesToProcess = false;
static constexpr ui32 MaxSizeToQuery = 10'000'000;
@@ -47,6 +48,7 @@ namespace NKikimr::NBlobDepot {
void Handle(TEvBlobStorage::TEvAssimilateResult::TPtr ev);
void ScanDataForCopying();
void Handle(TEvBlobStorage::TEvGetResult::TPtr ev);
+ void HandleTxComplete();
void Handle(TEvBlobStorage::TEvPutResult::TPtr ev);
void IssueCollects();
void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev);
diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h
index d94b8e6a31..619506dcf0 100644
--- a/ydb/core/blob_depot/data.h
+++ b/ydb/core/blob_depot/data.h
@@ -370,7 +370,7 @@ namespace NKikimr::NBlobDepot {
{}
template<typename TCallback>
- void ScanRange(const TKey *begin, const TKey *end, TScanFlags flags, TCallback&& callback) {
+ bool ScanRange(const TKey *begin, const TKey *end, TScanFlags flags, TCallback&& callback) {
auto beginIt = !begin ? Data.begin()
: flags & EScanFlags::INCLUDE_BEGIN ? Data.lower_bound(*begin)
: Data.upper_bound(*begin);
@@ -385,7 +385,7 @@ namespace NKikimr::NBlobDepot {
do {
auto& current = *endIt--;
if (!callback(current.first, current.second)) {
- break;
+ return false;
}
} while (beginIt != endIt);
}
@@ -393,10 +393,11 @@ namespace NKikimr::NBlobDepot {
while (beginIt != endIt) {
auto& current = *beginIt++;
if (!callback(current.first, current.second)) {
- break;
+ return false;
}
}
}
+ return true;
}
const TValue *FindKey(const TKey& key) const;