summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <[email protected]>2022-08-30 09:26:02 +0300
committeralexvru <[email protected]>2022-08-30 09:26:02 +0300
commitf1dc9cbd2577c2a61181b0095146755c1f8b8c35 (patch)
tree185b91826c24326591166d5e16b650d7b71962ed
parentd539d1c8b29f406e83140be05ced18eac998b4f4 (diff)
BlobDepot work in progress
-rw-r--r--ydb/core/blob_depot/assimilator.cpp157
-rw-r--r--ydb/core/blob_depot/assimilator.h1
-rw-r--r--ydb/core/blob_depot/data.cpp300
-rw-r--r--ydb/core/blob_depot/data.h72
-rw-r--r--ydb/core/blob_depot/data_load.cpp7
-rw-r--r--ydb/core/blob_depot/data_resolve.cpp8
-rw-r--r--ydb/core/blob_depot/garbage_collection.cpp313
-rw-r--r--ydb/core/blob_depot/garbage_collection.h8
-rw-r--r--ydb/core/blob_depot/given_id_range.cpp13
-rw-r--r--ydb/core/blob_depot/op_commit_blob_seq.cpp43
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_impl.h1
-rw-r--r--ydb/core/blobstorage/vdisk/common/vdisk_response.cpp4
-rw-r--r--ydb/core/keyvalue/keyvalue_storage_read_request.cpp7
13 files changed, 586 insertions, 348 deletions
diff --git a/ydb/core/blob_depot/assimilator.cpp b/ydb/core/blob_depot/assimilator.cpp
index f7fc00806b1..84dae88a4da 100644
--- a/ydb/core/blob_depot/assimilator.cpp
+++ b/ydb/core/blob_depot/assimilator.cpp
@@ -42,6 +42,8 @@ namespace NKikimr::NBlobDepot {
}
void TAssimilator::PassAway() {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT52, "TAssimilator::PassAway", (Id, Self->GetLogId()));
+ TActorBootstrapped::PassAway();
}
STATEFN(TAssimilator::StateFunc) {
@@ -51,6 +53,7 @@ namespace NKikimr::NBlobDepot {
switch (const ui32 type = ev->GetTypeRewrite()) {
hFunc(TEvBlobStorage::TEvAssimilateResult, Handle);
+ hFunc(TEvents::TEvUndelivered, Handle);
hFunc(TEvBlobStorage::TEvGetResult, Handle);
hFunc(TEvBlobStorage::TEvPutResult, Handle);
hFunc(TEvBlobStorage::TEvCollectGarbageResult, Handle);
@@ -58,6 +61,7 @@ namespace NKikimr::NBlobDepot {
hFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
hFunc(TEvBlobStorage::TEvControllerGroupDecommittedResponse, Handle);
cFunc(TEvPrivate::EvResume, Action);
+ cFunc(TEvents::TSystem::Poison, PassAway);
default:
Y_VERIFY_DEBUG(false, "unexpected event Type# %08" PRIx32, type);
@@ -79,15 +83,28 @@ namespace NKikimr::NBlobDepot {
}
void TAssimilator::SendAssimilateRequest() {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT53, "TAssimilator::SendAssimilateRequest", (Id, Self->GetLogId()),
+ (SelfId, SelfId()), (DecommitGroupId, Self->Config.GetDecommitGroupId()));
SendToBSProxy(SelfId(), Self->Config.GetDecommitGroupId(), new TEvBlobStorage::TEvAssimilate(SkipBlocksUpTo,
SkipBarriersUpTo, SkipBlobsUpTo));
}
+ void TAssimilator::Handle(TEvents::TEvUndelivered::TPtr ev) {
+ STLOG(PRI_ERROR, BLOB_DEPOT, BDT55, "received TEvUndelivered", (Id, Self->GetLogId()), (Sender, ev->Sender),
+ (Cookie, ev->Cookie), (Type, ev->Get()->SourceType), (Reason, ev->Get()->Reason));
+ TActivationContext::Schedule(TDuration::Seconds(1), new IEventHandle(TEvPrivate::EvResume, 0, SelfId(), {},
+ nullptr, 0));
+ }
+
void TAssimilator::Handle(TEvBlobStorage::TEvAssimilateResult::TPtr ev) {
class TTxPutAssimilatedData : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
- TAssimilator *Self;
+ TAssimilator* const Self;
std::unique_ptr<TEvBlobStorage::TEvAssimilateResult> Ev;
+ bool BlocksFinished = false;
+ bool BarriersFinished = false;
+
bool UnblockRegisterActorQ = false;
+ bool MoreData = false;
public:
TTxPutAssimilatedData(TAssimilator *self, TEvBlobStorage::TEvAssimilateResult::TPtr ev)
@@ -96,85 +113,102 @@ namespace NKikimr::NBlobDepot {
, Ev(ev->Release().Release())
{}
+ TTxPutAssimilatedData(TTxPutAssimilatedData& predecessor)
+ : TTransactionBase(predecessor.Self->Self)
+ , Self(predecessor.Self)
+ , Ev(std::move(predecessor.Ev))
+ , BlocksFinished(predecessor.BlocksFinished)
+ , BarriersFinished(predecessor.BarriersFinished)
+ {}
+
bool Execute(TTransactionContext& txc, const TActorContext&) override {
NIceDb::TNiceDb db(txc.DB);
- bool blocksFinished = false;
- bool barriersFinished = false;
-
- if (const auto& blocks = Ev->Blocks; !blocks.empty()) {
- Self->SkipBlocksUpTo = blocks.back().TabletId;
- }
- if (const auto& barriers = Ev->Barriers; !barriers.empty()) {
- Self->SkipBarriersUpTo = {barriers.back().TabletId, barriers.back().Channel};
- }
- if (const auto& blobs = Ev->Blobs; !blobs.empty()) {
- Self->SkipBlobsUpTo = blobs.back().Id;
+ const bool wasEmpty = Ev->Blocks.empty() && Ev->Barriers.empty() && Ev->Blobs.empty();
+ if (wasEmpty) {
+ BlocksFinished = BarriersFinished = true;
}
- for (const auto& block : Ev->Blocks) {
+ ui32 maxItems = 10'000;
+ for (auto& blocks = Ev->Blocks; maxItems && !blocks.empty(); blocks.pop_front(), --maxItems) {
+ auto& block = blocks.front();
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT31, "assimilated block", (Id, Self->Self->GetLogId()), (Block, block));
Self->Self->BlocksManager->AddBlockOnDecommit(block, txc);
+ Self->SkipBlocksUpTo.emplace(block.TabletId);
}
- for (const auto& barrier : Ev->Barriers) {
+ for (auto& barriers = Ev->Barriers; maxItems && !barriers.empty(); barriers.pop_front(), --maxItems) {
+ auto& barrier = barriers.front();
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT32, "assimilated barrier", (Id, Self->Self->GetLogId()), (Barrier, barrier));
- Self->Self->BarrierServer->AddBarrierOnDecommit(barrier, txc);
- blocksFinished = true; // there will be no blocks for sure
+ if (!Self->Self->BarrierServer->AddBarrierOnDecommit(barrier, maxItems, txc, this)) {
+ Y_VERIFY(!maxItems);
+ break;
+ }
+ Self->SkipBarriersUpTo.emplace(barrier.TabletId, barrier.Channel);
+ BlocksFinished = true; // there will be no blocks for sure
}
- for (const auto& blob : Ev->Blobs) {
+ for (auto& blobs = Ev->Blobs; maxItems && !blobs.empty(); blobs.pop_front(), --maxItems) {
+ auto& blob = blobs.front();
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT33, "assimilated blob", (Id, Self->Self->GetLogId()), (Blob, blob));
- Self->Self->Data->AddDataOnDecommit(blob, txc);
- blocksFinished = barriersFinished = true; // no blocks and no more barriers
- }
-
- auto& decommitState = Self->Self->DecommitState;
- const auto decommitStateOnEntry = decommitState;
- if (blocksFinished && decommitState < EDecommitState::BlocksFinished) {
- decommitState = EDecommitState::BlocksFinished;
- UnblockRegisterActorQ = true;
- }
- if (barriersFinished && decommitState < EDecommitState::BarriersFinished) {
- decommitState = EDecommitState::BarriersFinished;
+ Self->Self->Data->AddDataOnDecommit(blob, txc, this);
+ Self->SkipBlobsUpTo.emplace(blob.Id);
+ Self->Self->Data->LastAssimilatedBlobId = blob.Id;
+ BlocksFinished = BarriersFinished = true; // no blocks and no more barriers
}
- const bool done = Ev->Blocks.empty() && Ev->Barriers.empty() && Ev->Blobs.empty();
- if (done && decommitState < EDecommitState::BlobsFinished) {
- decommitState = EDecommitState::BlobsFinished;
- }
-
- db.Table<Schema::Config>().Key(Schema::Config::Key::Value).Update(
- NIceDb::TUpdate<Schema::Config::DecommitState>(decommitState),
- NIceDb::TUpdate<Schema::Config::AssimilatorState>(Self->SerializeAssimilatorState())
- );
- auto toString = [](EDecommitState state) {
- switch (state) {
- case EDecommitState::Default: return "Default";
- case EDecommitState::BlocksFinished: return "BlocksFinished";
- case EDecommitState::BarriersFinished: return "BarriersFinished";
- case EDecommitState::BlobsFinished: return "BlobsFinished";
- case EDecommitState::BlobsCopied: return "BlobsCopied";
- case EDecommitState::Done: return "Done";
+ if (Ev->Blocks.empty() && Ev->Barriers.empty() && Ev->Blobs.empty()) {
+ auto& decommitState = Self->Self->DecommitState;
+ const auto decommitStateOnEntry = decommitState;
+ if (BlocksFinished && decommitState < EDecommitState::BlocksFinished) {
+ decommitState = EDecommitState::BlocksFinished;
+ UnblockRegisterActorQ = true;
+ }
+ if (BarriersFinished && decommitState < EDecommitState::BarriersFinished) {
+ decommitState = EDecommitState::BarriersFinished;
+ }
+ if (wasEmpty && decommitState < EDecommitState::BlobsFinished) {
+ decommitState = EDecommitState::BlobsFinished;
}
- };
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT47, "decommit state change", (Id, Self->Self->GetLogId()),
- (From, toString(decommitStateOnEntry)), (To, toString(decommitState)),
- (UnblockRegisterActorQ, UnblockRegisterActorQ));
- if (!Ev->Blobs.empty()) {
- Self->Self->Data->LastAssimilatedBlobId = Ev->Blobs.back().Id;
+ db.Table<Schema::Config>().Key(Schema::Config::Key::Value).Update(
+ NIceDb::TUpdate<Schema::Config::DecommitState>(decommitState),
+ NIceDb::TUpdate<Schema::Config::AssimilatorState>(Self->SerializeAssimilatorState())
+ );
+
+ auto toString = [](EDecommitState state) {
+ switch (state) {
+ case EDecommitState::Default: return "Default";
+ case EDecommitState::BlocksFinished: return "BlocksFinished";
+ case EDecommitState::BarriersFinished: return "BarriersFinished";
+ case EDecommitState::BlobsFinished: return "BlobsFinished";
+ case EDecommitState::BlobsCopied: return "BlobsCopied";
+ case EDecommitState::Done: return "Done";
+ }
+ };
+
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT47, "decommit state change", (Id, Self->Self->GetLogId()),
+ (From, toString(decommitStateOnEntry)), (To, toString(decommitState)),
+ (UnblockRegisterActorQ, UnblockRegisterActorQ));
+ } else {
+ MoreData = true;
}
return true;
}
void Complete(const TActorContext&) override {
- if (UnblockRegisterActorQ) {
- STLOG(PRI_INFO, BLOB_DEPOT, BDT35, "blocks assimilation complete", (Id, Self->Self->GetLogId()),
- (DecommitGroupId, Self->Self->Config.GetDecommitGroupId()));
- Self->Self->ProcessRegisterAgentQ();
- }
+ Self->Self->Data->CommitTrash(this);
+
+ if (MoreData) {
+ Self->Self->Execute(std::make_unique<TTxPutAssimilatedData>(*this));
+ } else {
+ if (UnblockRegisterActorQ) {
+ STLOG(PRI_INFO, BLOB_DEPOT, BDT35, "blocks assimilation complete", (Id, Self->Self->GetLogId()),
+ (DecommitGroupId, Self->Self->Config.GetDecommitGroupId()));
+ Self->Self->ProcessRegisterAgentQ();
+ }
- Self->Action();
+ Self->Action();
+ }
}
};
@@ -183,6 +217,8 @@ namespace NKikimr::NBlobDepot {
void TAssimilator::ScanDataForCopying() {
const bool fromTheBeginning = !LastScannedKey;
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT54, "TAssimilator::ScanDataForCopying", (Id, Self->GetLogId()),
+ (LastScannedKey, LastScannedKey));
TData::TKey lastScannedKey;
if (LastScannedKey) {
@@ -241,7 +277,7 @@ namespace NKikimr::NBlobDepot {
for (ui32 i = 0; i < msg.ResponseSz; ++i) {
auto& resp = msg.Responses[i];
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT34, "got TEvGetResult", (Id, Self->GetLogId()), (BlobId, resp.Id),
- (Status, resp.Status));
+ (Status, resp.Status), (NumPutsInFlight, NumPutsInFlight));
if (resp.Status == NKikimrProto::OK) {
auto ev = std::make_unique<TEvBlobStorage::TEvPut>(resp.Id, resp.Buffer, TInstant::Max());
ev->Decommission = true;
@@ -256,7 +292,8 @@ namespace NKikimr::NBlobDepot {
void TAssimilator::Handle(TEvBlobStorage::TEvPutResult::TPtr ev) {
auto& msg = *ev->Get();
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT37, "got TEvPutResult", (Id, Self->GetLogId()), (Msg, msg));
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT37, "got TEvPutResult", (Id, Self->GetLogId()), (Msg, msg),
+ (NumPutsInFlight, NumPutsInFlight));
if (msg.Status == NKikimrProto::OK) {
const size_t numErased = NeedfulBlobs.erase(msg.Id);
Y_VERIFY(numErased == 1);
diff --git a/ydb/core/blob_depot/assimilator.h b/ydb/core/blob_depot/assimilator.h
index 2a1a7117288..aa6d4088d28 100644
--- a/ydb/core/blob_depot/assimilator.h
+++ b/ydb/core/blob_depot/assimilator.h
@@ -43,6 +43,7 @@ namespace NKikimr::NBlobDepot {
private:
void Action();
void SendAssimilateRequest();
+ void Handle(TEvents::TEvUndelivered::TPtr ev);
void Handle(TEvBlobStorage::TEvAssimilateResult::TPtr ev);
void ScanDataForCopying();
void Handle(TEvBlobStorage::TEvGetResult::TPtr ev);
diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp
index a13ea999d60..e5ecfc81579 100644
--- a/ydb/core/blob_depot/data.cpp
+++ b/ydb/core/blob_depot/data.cpp
@@ -5,9 +5,125 @@ namespace NKikimr::NBlobDepot {
using TData = TBlobDepot::TData;
- NKikimrBlobDepot::EKeepState TData::GetKeepState(const TKey& key) const {
+ enum class EUpdateOutcome {
+ CHANGE,
+ NO_CHANGE,
+ DROP
+ };
+
+ template<typename T, typename... TArgs>
+ bool TData::UpdateKey(TKey key, NTabletFlatExecutor::TTransactionContext& txc, void *cookie, T&& callback, TArgs&&... args) {
+ bool underSoft = false, underHard = false;
+ auto var = key.AsVariant();
+ if (auto *id = std::get_if<TLogoBlobID>(&var)) {
+ Self->BarrierServer->GetBlobBarrierRelation(*id, &underSoft, &underHard);
+ }
+ if (underHard || underSoft) {
+ if (const auto it = Data.find(key); it == Data.end()) {
+ return false; // no such key existed and will not be created as it hits the barrier
+ } else {
+ Y_VERIFY_S(!underHard && it->second.KeepState == NKikimrBlobDepot::EKeepState::Keep,
+ "barrier invariant failed Key# " << key.ToString() << " Value# " << it->second.ToString());
+ }
+ }
+
+ const auto [it, inserted] = Data.try_emplace(std::move(key), std::forward<TArgs>(args)...);
+ {
+ auto& [key, value] = *it;
+ Y_VERIFY(!underHard);
+ Y_VERIFY(!underSoft || !inserted);
+
+ std::vector<TLogoBlobID> deleteQ;
+
+ if (!inserted) {
+ EnumerateBlobsForValueChain(value.ValueChain, Self->TabletID(), [&](TLogoBlobID id, ui32, ui32) {
+ const auto it = RefCount.find(id);
+ Y_VERIFY(it != RefCount.end());
+ if (!--it->second) {
+ deleteQ.push_back(id);
+ }
+ });
+ }
+
+ EUpdateOutcome outcome = callback(value, inserted);
+
+ Y_VERIFY(!inserted || outcome != EUpdateOutcome::NO_CHANGE);
+ if (underSoft && value.KeepState != NKikimrBlobDepot::EKeepState::Keep) {
+ outcome = EUpdateOutcome::DROP;
+ }
+
+ EnumerateBlobsForValueChain(value.ValueChain, Self->TabletID(), [&](TLogoBlobID id, ui32, ui32) {
+ const auto [it, inserted] = RefCount.try_emplace(id, 1);
+ if (inserted) {
+ // first mention of this id
+ auto& record = GetRecordsPerChannelGroup(id);
+ const auto [_, inserted] = record.Used.insert(id);
+ Y_VERIFY(inserted);
+ AccountBlob(id, 1);
+
+ // blob is first mentioned and deleted as well
+ if (outcome == EUpdateOutcome::DROP) {
+ it->second = 0;
+ deleteQ.push_back(id);
+ }
+ } else if (outcome != EUpdateOutcome::DROP) {
+ ++it->second;
+ }
+ });
+
+ for (const TLogoBlobID& id : deleteQ) {
+ const auto it = RefCount.find(id);
+ Y_VERIFY(it != RefCount.end());
+ if (!it->second) {
+ InFlightTrash.emplace(cookie, id);
+ NIceDb::TNiceDb(txc.DB).Table<Schema::Trash>().Key(id.AsBinaryString()).Update();
+ RefCount.erase(it);
+ }
+ }
+
+ auto row = NIceDb::TNiceDb(txc.DB).Table<Schema::Data>().Key(key.MakeBinaryKey());
+ switch (outcome) {
+ case EUpdateOutcome::DROP:
+ Data.erase(it);
+ row.Delete();
+ return true;
+
+ case EUpdateOutcome::CHANGE:
+ row.template Update<Schema::Data::Value>(value.SerializeToString());
+ return true;
+
+ case EUpdateOutcome::NO_CHANGE:
+ return false;
+ }
+ }
+ }
+
+ const TData::TValue *TData::FindKey(const TKey& key) const {
const auto it = Data.find(key);
- return it != Data.end() ? it->second.KeepState : NKikimrBlobDepot::EKeepState::Default;
+ return it != Data.end() ? &it->second : nullptr;
+ }
+
+ void TData::UpdateKey(const TKey& key, const NKikimrBlobDepot::TEvCommitBlobSeq::TItem& item,
+ NTabletFlatExecutor::TTransactionContext& txc, void *cookie) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT10, "UpdateKey", (Id, Self->GetLogId()), (Key, key), (Item, item));
+ UpdateKey(key, txc, cookie, [&](TValue& value, bool inserted) {
+ if (!inserted) { // update value items
+ value.Meta = item.GetMeta();
+ value.Public = false;
+ value.Unconfirmed = item.GetUnconfirmed();
+
+ // update it to keep new blob locator
+ value.ValueChain.Clear();
+ auto *chain = value.ValueChain.Add();
+ auto *locator = chain->MutableLocator();
+ locator->CopyFrom(item.GetBlobLocator());
+
+ // reset original blob id, if any
+ value.OriginalBlobId.reset();
+ }
+
+ return EUpdateOutcome::CHANGE;
+ }, item);
}
TData::TRecordsPerChannelGroup& TData::GetRecordsPerChannelGroup(TLogoBlobID id) {
@@ -19,50 +135,44 @@ namespace NKikimr::NBlobDepot {
return it->second;
}
- void TData::AddDataOnLoad(TKey key, TString value) {
- if (Data.contains(key)) {
- return; // we are racing with the offline load procedure -- skip this key, it is already new in memory
- }
-
+ void TData::AddDataOnLoad(TKey key, TString value, NTabletFlatExecutor::TTransactionContext& txc, void *cookie) {
NKikimrBlobDepot::TValue proto;
const bool success = proto.ParseFromString(value);
Y_VERIFY(success);
- PutKey(std::move(key), TValue(std::move(proto)));
- }
-
- void TData::AddDataOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBlob& blob,
- NTabletFlatExecutor::TTransactionContext& txc) {
- TKey key(blob.Id);
-
- bool underSoft, underHard;
- Self->BarrierServer->GetBlobBarrierRelation(blob.Id, &underSoft, &underHard);
-
- // calculate keep state for this blob
- const auto it = Data.find(key);
- const NKikimrBlobDepot::EKeepState keepState = Max(it != Data.end() ? it->second.KeepState : NKikimrBlobDepot::EKeepState::Default,
- blob.DoNotKeep ? NKikimrBlobDepot::EKeepState::DoNotKeep :
- blob.Keep ? NKikimrBlobDepot::EKeepState::Keep : NKikimrBlobDepot::EKeepState::Default);
-
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT49, "AddDataOnDecommit", (Id, Self->GetLogId()), (Blob, blob),
- (UnderHard, underHard), (UnderSoft, underSoft), (KeepState, keepState));
- if (underHard || (underSoft && keepState != NKikimrBlobDepot::EKeepState::Keep)) {
- return; // we can skip this blob as it is already being collected
- }
+ UpdateKey(std::move(key), txc, cookie, [&](TValue& value, bool inserted) {
+ if (!inserted) { // do some merge logic
+ value.KeepState = Max(value.KeepState, proto.GetKeepState());
+ if (value.ValueChain.empty() && proto.ValueChainSize()) {
+ value.ValueChain.CopyFrom(proto.GetValueChain());
+ value.OriginalBlobId.reset();
+ }
+ }
- NKikimrBlobDepot::TValue value;
- value.SetKeepState(keepState);
- value.SetUnconfirmed(true);
- LogoBlobIDFromLogoBlobID(blob.Id, value.MutableOriginalBlobId());
+ return EUpdateOutcome::CHANGE;
+ }, std::move(proto));
+ }
- TString valueData;
- const bool success = value.SerializeToString(&valueData);
- Y_VERIFY(success);
+ void TData::AddDataOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBlob& blob,
+ NTabletFlatExecutor::TTransactionContext& txc, void *cookie) {
+ UpdateKey(TKey(blob.Id), txc, cookie, [&](TValue& value, bool inserted) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT49, "AddDataOnDecommit", (Id, Self->GetLogId()), (Blob, blob),
+ (Value, value), (Inserted, inserted));
+
+ // update keep state if necessary
+ if (blob.DoNotKeep && value.KeepState < NKikimrBlobDepot::EKeepState::DoNotKeep) {
+ value.KeepState = NKikimrBlobDepot::EKeepState::DoNotKeep;
+ } else if (blob.Keep && value.KeepState < NKikimrBlobDepot::EKeepState::Keep) {
+ value.KeepState = NKikimrBlobDepot::EKeepState::Keep;
+ }
- NIceDb::TNiceDb db(txc.DB);
- db.Table<Schema::Data>().Key(key.MakeBinaryKey()).Update<Schema::Data::Value>(valueData);
+ // if there is not value chain for this blob, map it to the original blob id
+ if (value.ValueChain.empty()) {
+ value.OriginalBlobId = blob.Id;
+ }
- PutKey(key, TValue(std::move(value)));
+ return EUpdateOutcome::CHANGE;
+ });
}
void TData::AddTrashOnLoad(TLogoBlobID id) {
@@ -80,84 +190,37 @@ namespace NKikimr::NBlobDepot {
record.LastConfirmedGenStep = confirmedGenStep;
}
- void TData::PutKey(TKey key, TValue&& data) {
- ui64 referencedBytes = 0;
-
- EnumerateBlobsForValueChain(data.ValueChain, Self->TabletID(), [&](TLogoBlobID id, ui32 /*begin*/, ui32 /*end*/) {
- if (!RefCount[id]++) {
- // first mention of this id
- auto& record = GetRecordsPerChannelGroup(id);
- const auto [_, inserted] = record.Used.insert(id);
- Y_VERIFY(inserted);
- AccountBlob(id, 1);
- }
- referencedBytes += id.BlobSize();
- });
-
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT10, "PutKey", (Id, Self->GetLogId()), (Key, key), (Value, data));
-
- const auto [it, inserted] = Data.try_emplace(std::move(key), std::move(data));
- if (!inserted) {
- it->second = std::move(data);
- }
+ bool TData::UpdateKeepState(TKey key, NKikimrBlobDepot::EKeepState keepState,
+ NTabletFlatExecutor::TTransactionContext& txc, void *cookie) {
+ return UpdateKey(std::move(key), txc, cookie, [&](TValue& value, bool inserted) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT51, "UpdateKeepState", (Id, Self->GetLogId()), (Key, key),
+ (KeepState, keepState), (Value, value));
+ if (inserted) {
+ return EUpdateOutcome::CHANGE;
+ } else if (value.KeepState < keepState) {
+ value.KeepState = keepState;
+ return EUpdateOutcome::CHANGE;
+ } else {
+ return EUpdateOutcome::NO_CHANGE;
+ }
+ }, keepState);
}
- std::optional<TString> TData::UpdateKeepState(TKey key, NKikimrBlobDepot::EKeepState keepState) {
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT51, "UpdateKeepState", (Id, Self->GetLogId()), (Key, key),
- (KeepState, keepState));
-
- const auto [it, inserted] = Data.try_emplace(std::move(key), TValue(keepState));
- if (!inserted) {
- if (keepState <= it->second.KeepState) {
- return std::nullopt;
- }
- it->second.KeepState = keepState;
- }
- return ToValueProto(it->second);
- }
-
- void TData::DeleteKey(const TKey& key, const std::function<void(TLogoBlobID)>& updateTrash, void *cookie) {
- const auto it = Data.find(key);
- Y_VERIFY(it != Data.end());
- TValue& value = it->second;
- EnumerateBlobsForValueChain(value.ValueChain, Self->TabletID(), [&](TLogoBlobID id, ui32 /*begin*/, ui32 /*end*/) {
- const auto it = RefCount.find(id);
- Y_VERIFY(it != RefCount.end());
- if (!--it->second) {
- InFlightTrash.emplace(cookie, id);
- RefCount.erase(it);
- updateTrash(id);
- }
+ void TData::DeleteKey(const TKey& key, NTabletFlatExecutor::TTransactionContext& txc, void *cookie) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT14, "DeleteKey", (Id, Self->GetLogId()), (Key, key));
+ UpdateKey(key, txc, cookie, [&](TValue&, bool inserted) {
+ Y_VERIFY(!inserted);
+ return EUpdateOutcome::DROP;
});
- Data.erase(it);
}
void TData::CommitTrash(void *cookie) {
- auto range = InFlightTrash.equal_range(cookie);
- for (auto it = range.first; it != range.second; ++it) {
+ auto [first, last] = InFlightTrash.equal_range(cookie);
+ for (auto it = first; it != last; ++it) {
auto& record = GetRecordsPerChannelGroup(it->second);
record.MoveToTrash(this, it->second);
}
- InFlightTrash.erase(range.first, range.second);
- }
-
- TString TData::ToValueProto(const TValue& value) {
- NKikimrBlobDepot::TValue proto;
- if (value.Meta) {
- proto.SetMeta(value.Meta);
- }
- proto.MutableValueChain()->CopyFrom(value.ValueChain);
- if (proto.GetKeepState() != value.KeepState) {
- proto.SetKeepState(value.KeepState);
- }
- if (proto.GetPublic() != value.Public) {
- proto.SetPublic(value.Public);
- }
-
- TString s;
- const bool success = proto.SerializeToString(&s);
- Y_VERIFY(success);
- return s;
+ InFlightTrash.erase(first, last);
}
void TData::HandleTrash() {
@@ -369,6 +432,29 @@ namespace NKikimr::NBlobDepot {
it->second.ClearInFlight(this);
}
+ bool TData::OnBarrierShift(ui64 tabletId, ui8 channel, bool hard, TGenStep previous, TGenStep current, ui32& maxItems,
+ NTabletFlatExecutor::TTransactionContext& txc, void *cookie) {
+ const TData::TKey first(TLogoBlobID(tabletId, previous.Generation(), previous.Step(), channel, 0, 0));
+ const TData::TKey last(TLogoBlobID(tabletId, current.Generation(), current.Step(), channel,
+ TLogoBlobID::MaxBlobSize, TLogoBlobID::MaxCookie, TLogoBlobID::MaxPartId, TLogoBlobID::MaxCrcMode));
+
+ bool finished = true;
+ Self->Data->ScanRange(&first, &last, TData::EScanFlags::INCLUDE_END, [&](auto& key, auto& value) {
+ if (value.KeepState != NKikimrBlobDepot::EKeepState::Keep || hard) {
+ if (maxItems) {
+ Self->Data->DeleteKey(key, txc, cookie);
+ --maxItems;
+ } else {
+ finished = false;
+ return false;
+ }
+ }
+ return true;
+ });
+
+ return finished;
+ }
+
void TData::AccountBlob(TLogoBlobID id, bool add) {
// account record
const ui32 groupId = Self->Info()->GroupFor(id.Channel(), id.Generation());
diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h
index 7a4c44c2cdb..d94b8e6a318 100644
--- a/ydb/core/blob_depot/data.h
+++ b/ydb/core/blob_depot/data.h
@@ -210,12 +210,12 @@ namespace NKikimr::NBlobDepot {
struct TValue {
TString Meta;
TValueChain ValueChain;
- NKikimrBlobDepot::EKeepState KeepState;
- bool Public;
- bool Unconfirmed;
+ NKikimrBlobDepot::EKeepState KeepState = NKikimrBlobDepot::EKeepState::Default;
+ bool Public = false;
+ bool Unconfirmed = false;
std::optional<TLogoBlobID> OriginalBlobId;
- TValue() = delete;
+ TValue() = default;
TValue(const TValue&) = delete;
TValue(TValue&&) = default;
@@ -233,12 +233,52 @@ namespace NKikimr::NBlobDepot {
: std::nullopt)
{}
+ explicit TValue(const NKikimrBlobDepot::TEvCommitBlobSeq::TItem& item)
+ : Meta(item.GetMeta())
+ , Public(false)
+ , Unconfirmed(item.GetUnconfirmed())
+ {
+ auto *chain = ValueChain.Add();
+ auto *locator = chain->MutableLocator();
+ locator->CopyFrom(item.GetBlobLocator());
+ }
+
explicit TValue(NKikimrBlobDepot::EKeepState keepState)
: KeepState(keepState)
, Public(false)
, Unconfirmed(false)
{}
+ void SerializeToProto(NKikimrBlobDepot::TValue *proto) const {
+ if (Meta) {
+ proto->SetMeta(Meta);
+ }
+ if (!ValueChain.empty()) {
+ proto->MutableValueChain()->CopyFrom(ValueChain);
+ }
+ if (KeepState != proto->GetKeepState()) {
+ proto->SetKeepState(KeepState);
+ }
+ if (Public != proto->GetPublic()) {
+ proto->SetPublic(Public);
+ }
+ if (Unconfirmed != proto->GetUnconfirmed()) {
+ proto->SetUnconfirmed(Unconfirmed);
+ }
+ if (OriginalBlobId) {
+ LogoBlobIDFromLogoBlobID(*OriginalBlobId, proto->MutableOriginalBlobId());
+ }
+ }
+
+ TString SerializeToString() const {
+ NKikimrBlobDepot::TValue proto;
+ SerializeToProto(&proto);
+ TString s;
+ const bool success = proto.SerializeToString(&s);
+ Y_VERIFY(success);
+ return s;
+ }
+
TString ToString() const {
TStringStream s;
Output(s);
@@ -359,24 +399,32 @@ namespace NKikimr::NBlobDepot {
}
}
- NKikimrBlobDepot::EKeepState GetKeepState(const TKey& key) const;
+ const TValue *FindKey(const TKey& key) const;
+
+ template<typename T, typename... TArgs>
+ bool UpdateKey(TKey key, NTabletFlatExecutor::TTransactionContext& txc, void *cookie, T&& callback, TArgs&&... args);
+
+ void UpdateKey(const TKey& key, const NKikimrBlobDepot::TEvCommitBlobSeq::TItem& item,
+ NTabletFlatExecutor::TTransactionContext& txc, void *cookie);
TRecordsPerChannelGroup& GetRecordsPerChannelGroup(TLogoBlobID id);
- void AddDataOnLoad(TKey key, TString value);
- void AddDataOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBlob& blob, NTabletFlatExecutor::TTransactionContext& txc);
+ void AddDataOnLoad(TKey key, TString value, NTabletFlatExecutor::TTransactionContext& txc, void *cookie);
+ void AddDataOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBlob& blob,
+ NTabletFlatExecutor::TTransactionContext& txc, void *cookie);
void AddTrashOnLoad(TLogoBlobID id);
void AddGenStepOnLoad(ui8 channel, ui32 groupId, TGenStep issuedGenStep, TGenStep confirmedGenStep);
- void PutKey(TKey key, TValue&& data);
-
- std::optional<TString> UpdateKeepState(TKey key, NKikimrBlobDepot::EKeepState keepState);
- void DeleteKey(const TKey& key, const std::function<void(TLogoBlobID)>& updateTrash, void *cookie);
+ bool UpdateKeepState(TKey key, NKikimrBlobDepot::EKeepState keepState,
+ NTabletFlatExecutor::TTransactionContext& txc, void *cookie);
+ void DeleteKey(const TKey& key, NTabletFlatExecutor::TTransactionContext& txc, void *cookie);
void CommitTrash(void *cookie);
void HandleTrash();
void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev);
void OnPushNotifyResult(TEvBlobDepot::TEvPushNotifyResult::TPtr ev);
void OnCommitConfirmedGC(ui8 channel, ui32 groupId);
+ bool OnBarrierShift(ui64 tabletId, ui8 channel, bool hard, TGenStep previous, TGenStep current, ui32& maxItems,
+ NTabletFlatExecutor::TTransactionContext& txc, void *cookie);
void AccountBlob(TLogoBlobID id, bool add);
@@ -384,8 +432,6 @@ namespace NKikimr::NBlobDepot {
void OnLeastExpectedBlobIdChange(ui8 channel);
- static TString ToValueProto(const TValue& value);
-
template<typename TCallback>
void EnumerateRefCount(TCallback&& callback) {
for (const auto& [key, value] : RefCount) {
diff --git a/ydb/core/blob_depot/data_load.cpp b/ydb/core/blob_depot/data_load.cpp
index 1356ca11c11..b39f806757f 100644
--- a/ydb/core/blob_depot/data_load.cpp
+++ b/ydb/core/blob_depot/data_load.cpp
@@ -36,6 +36,7 @@ namespace NKikimr::NBlobDepot {
TLoadState LoadState;
std::unique_ptr<TTxLoad> SuccessorTx;
+ TTransactionContext *Txc;
public:
TTxLoad(TBlobDepot *self, TLoadState loadState = TLoadDataBegin{})
@@ -51,6 +52,8 @@ namespace NKikimr::NBlobDepot {
bool Execute(TTransactionContext& txc, const TActorContext&) override {
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT28, "TData::TTxLoad::Execute", (Id, Self->GetLogId()));
+ Txc = &txc;
+
NIceDb::TNiceDb db(txc.DB);
bool progress = false;
auto visitor = [&](auto& item) { return ExecuteLoad(db, item, progress); };
@@ -117,7 +120,7 @@ namespace NKikimr::NBlobDepot {
template<typename T>
void ProcessRow(T&& row, Schema::Data*) {
auto key = TData::TKey::FromBinaryKey(row.template GetValue<Schema::Data::Key>(), Self->Config);
- Self->Data->AddDataOnLoad(key, row.template GetValue<Schema::Data::Value>());
+ Self->Data->AddDataOnLoad(key, row.template GetValue<Schema::Data::Value>(), *Txc, this);
Y_VERIFY(!Self->Data->LastLoadedKey || *Self->Data->LastLoadedKey < key);
Self->Data->LastLoadedKey = std::move(key);
}
@@ -141,6 +144,8 @@ namespace NKikimr::NBlobDepot {
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT29, "TData::TTxLoad::Complete", (Id, Self->GetLogId()),
(SuccessorTx, bool(SuccessorTx)), (LoadState.index, LoadState.index()));
+ Self->Data->CommitTrash(this);
+
if (SuccessorTx) {
Self->Execute(std::move(SuccessorTx));
} else {
diff --git a/ydb/core/blob_depot/data_resolve.cpp b/ydb/core/blob_depot/data_resolve.cpp
index f8d9f897b7e..4f9464d6d30 100644
--- a/ydb/core/blob_depot/data_resolve.cpp
+++ b/ydb/core/blob_depot/data_resolve.cpp
@@ -111,7 +111,7 @@ namespace NKikimr::NBlobDepot {
if (key != LastScannedKey) {
LastScannedKey = key;
progress = true;
- Self->Data->AddDataOnLoad(key, rowset.template GetValue<Schema::Data::Value>());
+ Self->Data->AddDataOnLoad(key, rowset.template GetValue<Schema::Data::Value>(), txc, this);
const bool matchBegin = !begin || (flags & EScanFlags::INCLUDE_BEGIN ? *begin <= key : *begin < key);
const bool matchEnd = !end || (flags & EScanFlags::INCLUDE_END ? key <= *end : key < *end);
@@ -162,6 +162,8 @@ namespace NKikimr::NBlobDepot {
(Sender, Request->Sender), (Cookie, Request->Cookie), (SuccessorTx, bool(SuccessorTx)),
(Outbox.size, Outbox.size()));
+ Self->Data->CommitTrash(this);
+
if (SuccessorTx) {
Self->Execute(std::move(SuccessorTx));
} else {
@@ -354,13 +356,15 @@ namespace NKikimr::NBlobDepot {
.Id = response.Id,
.Keep = response.Keep,
.DoNotKeep = response.DoNotKeep
- }, txc);
+ }, txc, this);
}
}
return true;
}
void Complete(const TActorContext&) override {
+ Self->Data->CommitTrash(this);
+
auto& contexts = Self->Data->ResolveDecommitContexts;
if (const auto it = contexts.find(Ev->Cookie); it != contexts.end()) {
TResolveDecommitContext& context = it->second;
diff --git a/ydb/core/blob_depot/garbage_collection.cpp b/ydb/core/blob_depot/garbage_collection.cpp
index 71119ed8b95..b90254400bc 100644
--- a/ydb/core/blob_depot/garbage_collection.cpp
+++ b/ydb/core/blob_depot/garbage_collection.cpp
@@ -4,6 +4,107 @@
namespace NKikimr::NBlobDepot {
+ using TAdvanceBarrierCallback = std::function<void(std::optional<TString>)>;
+
+ class TBlobDepot::TBarrierServer::TTxAdvanceBarrier : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
+ static constexpr ui32 MaxKeysToProcessAtOnce = 10'000;
+
+ const ui64 TabletId;
+ const ui8 Channel;
+ const bool Hard;
+ const TGenStep GenCtr;
+ const TGenStep CollectGenStep;
+ TAdvanceBarrierCallback Callback;
+
+ bool MoreData = false;
+ bool Success = false;
+
+ public:
+ TTxAdvanceBarrier(TBlobDepot *self, ui64 tabletId, ui8 channel, bool hard, TGenStep genCtr, TGenStep collectGenStep,
+ TAdvanceBarrierCallback callback)
+ : TTransactionBase(self)
+ , TabletId(tabletId)
+ , Channel(channel)
+ , Hard(hard)
+ , GenCtr(genCtr)
+ , CollectGenStep(collectGenStep)
+ , Callback(std::move(callback))
+ {}
+
+ bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ Y_VERIFY(Self->Data->IsLoaded());
+
+ const auto key = std::make_tuple(TabletId, Channel);
+ auto& barriers = Self->BarrierServer->Barriers;
+ const auto [it, inserted] = barriers.try_emplace(key);
+ if (!inserted) {
+ // extract existing barrier record
+ auto& barrier = it->second;
+ const TGenStep barrierGenCtr = Hard ? barrier.HardGenCtr : barrier.SoftGenCtr;
+ const TGenStep barrierGenStep = Hard ? barrier.Hard : barrier.Soft;
+
+ // validate them
+ if (GenCtr < barrierGenCtr) {
+ Callback("record generation:counter is obsolete");
+ return true;
+ } else if (GenCtr == barrierGenCtr) {
+ if (barrierGenStep != CollectGenStep) {
+ Callback("repeated command with different collect parameters received");
+ return true;
+ }
+ } else if (CollectGenStep < barrierGenStep) {
+ Callback("decreasing barrier");
+ return true;
+ }
+ }
+
+ TBarrier& barrier = it->second;
+ TGenStep& barrierGenCtr = Hard ? barrier.HardGenCtr : barrier.SoftGenCtr;
+ TGenStep& barrierGenStep = Hard ? barrier.Hard : barrier.Soft;
+
+ Y_VERIFY(barrierGenCtr <= GenCtr);
+ Y_VERIFY(barrierGenStep <= CollectGenStep);
+
+ ui32 maxItems = MaxKeysToProcessAtOnce;
+ if (!Self->Data->OnBarrierShift(TabletId, Channel, Hard, barrierGenStep, CollectGenStep, maxItems, txc, this)) {
+ MoreData = true;
+ return true;
+ }
+
+ barrierGenCtr = GenCtr;
+ barrierGenStep = CollectGenStep;
+
+ Self->BarrierServer->ValidateBlobInvariant(TabletId, Channel);
+
+ auto row = NIceDb::TNiceDb(txc.DB).Table<Schema::Barriers>().Key(TabletId, Channel);
+ if (Hard) {
+ row.Update(
+ NIceDb::TUpdate<Schema::Barriers::HardGenCtr>(ui64(GenCtr)),
+ NIceDb::TUpdate<Schema::Barriers::Hard>(ui64(CollectGenStep))
+ );
+ } else {
+ row.Update(
+ NIceDb::TUpdate<Schema::Barriers::SoftGenCtr>(ui64(GenCtr)),
+ NIceDb::TUpdate<Schema::Barriers::Soft>(ui64(CollectGenStep))
+ );
+ }
+
+ Success = true;
+ return true;
+ }
+
+ void Complete(const TActorContext&) override {
+ Self->Data->CommitTrash(this);
+
+ if (MoreData) {
+ Self->Execute(std::make_unique<TTxAdvanceBarrier>(Self, TabletId, Channel, Hard, GenCtr, CollectGenStep,
+ std::move(Callback)));
+ } else if (Success) {
+ Callback(std::nullopt);
+ }
+ }
+ };
+
class TBlobDepot::TBarrierServer::TTxCollectGarbage : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
std::optional<TString> Error;
@@ -13,8 +114,9 @@ namespace NKikimr::NBlobDepot {
const ui8 Channel;
int KeepIndex = 0;
int DoNotKeepIndex = 0;
+
ui32 NumKeysProcessed = 0;
- bool Done = false;
+ bool MoreData = false;
static constexpr ui32 MaxKeysToProcessAtOnce = 10'000;
@@ -30,69 +132,26 @@ namespace NKikimr::NBlobDepot {
{}
bool Execute(TTransactionContext& txc, const TActorContext&) override {
- Y_VERIFY(Self->Data->IsLoaded());
- Validate();
- if (!Error) {
- auto& record = Request->Get()->Record;
- Done = ProcessFlags(txc, KeepIndex, record.GetKeep(), NKikimrBlobDepot::EKeepState::Keep)
- && ProcessFlags(txc, DoNotKeepIndex, record.GetDoNotKeep(), NKikimrBlobDepot::EKeepState::DoNotKeep)
- && ProcessBarrier(txc);
- }
+ auto& record = Request->Get()->Record;
+ MoreData = !ProcessFlags(txc, KeepIndex, record.GetKeep(), NKikimrBlobDepot::EKeepState::Keep)
+ || !ProcessFlags(txc, DoNotKeepIndex, record.GetDoNotKeep(), NKikimrBlobDepot::EKeepState::DoNotKeep);
return true;
}
void Complete(const TActorContext&) override {
- Self->Data->CommitTrash(this);
-
- if (Done || Error) {
- auto [response, _] = TEvBlobDepot::MakeResponseFor(*Request, Self->SelfId(),
- Error ? NKikimrProto::ERROR : NKikimrProto::OK, std::move(Error));
- TActivationContext::Send(response.release());
- Barrier.ProcessingQ.pop_front();
- Self->Data->HandleTrash();
- if (!Barrier.ProcessingQ.empty()) {
- Self->Execute(std::make_unique<TTxCollectGarbage>(Self, TabletId, Channel));
- }
- } else {
+ if (MoreData) {
Self->Execute(std::make_unique<TTxCollectGarbage>(Self, TabletId, Channel, KeepIndex, DoNotKeepIndex));
- }
- }
-
- void Validate() {
- // validate the command first
- auto& record = Request->Get()->Record;
- if (record.HasCollectGeneration() && record.HasCollectStep()) {
- if (!record.HasTabletId() || !record.HasChannel() || record.GetChannel() > TLogoBlobID::MaxChannel) {
- Error = "TabletId/Channel are either not set or invalid";
- } else if (!record.HasGeneration() || !record.HasPerGenerationCounter()) {
- Error = "Generation/PerGenerationCounter are not set";
- } else {
- const auto key = std::make_tuple(record.GetTabletId(), record.GetChannel());
- auto& barriers = Self->BarrierServer->Barriers;
- if (const auto it = barriers.find(key); it != barriers.end()) {
- // extract existing barrier record
- auto& barrier = it->second;
- const TGenStep barrierGenCtr = record.GetHard() ? barrier.HardGenCtr : barrier.SoftGenCtr;
- const TGenStep barrierGenStep = record.GetHard() ? barrier.Hard : barrier.Soft;
-
- // extract new parameters from protobuf
- const TGenStep genCtr(record.GetGeneration(), record.GetPerGenerationCounter());
- const TGenStep collectGenStep(record.GetCollectGeneration(), record.GetCollectStep());
-
- // validate them
- if (genCtr < barrierGenCtr) {
- Error = "record generation:counter is obsolete";
- } else if (genCtr == barrierGenCtr) {
- if (barrierGenStep != collectGenStep) {
- Error = "repeated command with different collect parameters received";
- }
- } else if (collectGenStep < barrierGenStep) {
- Error = "decreasing barrier";
- }
- }
- }
+ } else if (auto& record = Request->Get()->Record; record.HasCollectGeneration() && record.HasCollectStep()) {
+ Self->Execute(std::make_unique<TTxAdvanceBarrier>(Self, TabletId, Channel, record.GetHard(),
+ TGenStep(record.GetGeneration(), record.GetPerGenerationCounter()),
+ TGenStep(record.GetCollectGeneration(), record.GetCollectStep()),
+ [self = Self, tabletId = TabletId, channel = Channel](std::optional<TString> error) {
+ self->BarrierServer->FinishRequest(tabletId, channel, std::move(error));
+ }));
} else if (record.HasCollectGeneration() || record.HasCollectStep()) {
- Error = "CollectGeneration/CollectStep set incorrectly";
+ Self->BarrierServer->FinishRequest(TabletId, Channel, "incorrect CollectGeneration/CollectStep setting");
+ } else { // do not collect anything here
+ Self->BarrierServer->FinishRequest(TabletId, Channel, std::nullopt);
}
}
@@ -103,76 +162,10 @@ namespace NKikimr::NBlobDepot {
for (; index < items.size() && NumKeysProcessed < MaxKeysToProcessAtOnce; ++index) {
const auto id = LogoBlobIDFromLogoBlobID(items[index]);
TData::TKey key(id);
- if (const auto& value = Self->Data->UpdateKeepState(key, state)) {
- db.Table<Schema::Data>().Key(key.MakeBinaryKey()).Update<Schema::Data::Value>(*value);
- ++NumKeysProcessed;
- }
+ NumKeysProcessed += Self->Data->UpdateKeepState(key, state, txc, this);
}
-
return index == items.size();
}
-
- bool ProcessBarrier(TTransactionContext& txc) {
- NIceDb::TNiceDb db(txc.DB);
-
- const auto& record = Request->Get()->Record;
- if (record.HasCollectGeneration() && record.HasCollectStep()) {
- const bool hard = record.GetHard();
-
- auto processKey = [&](const TData::TKey& key, const TData::TValue& value) {
- if (value.KeepState != NKikimrBlobDepot::EKeepState::Keep || hard) {
- const TLogoBlobID id(key.GetBlobId());
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT14, "DeleteKey", (Id, Self->GetLogId()), (BlobId, id));
- db.Table<Schema::Data>().Key(key.MakeBinaryKey()).Delete();
- auto updateTrash = [&](TLogoBlobID id) {
- db.Table<Schema::Trash>().Key(id.AsBinaryString()).Update();
- };
- Self->Data->DeleteKey(key, updateTrash, this);
- ++NumKeysProcessed;
- }
-
- return NumKeysProcessed < MaxKeysToProcessAtOnce;
- };
-
- const TData::TKey first(TLogoBlobID(record.GetTabletId(), 0, 0, record.GetChannel(), 0, 0));
- const TData::TKey last(TLogoBlobID(record.GetTabletId(), record.GetCollectGeneration(),
- record.GetCollectStep(), record.GetChannel(), TLogoBlobID::MaxBlobSize, TLogoBlobID::MaxCookie,
- TLogoBlobID::MaxPartId, TLogoBlobID::MaxCrcMode));
-
- Self->Data->ScanRange(&first, &last, TData::EScanFlags::INCLUDE_BEGIN | TData::EScanFlags::INCLUDE_END,
- processKey);
-
- if (NumKeysProcessed == MaxKeysToProcessAtOnce) {
- return false;
- }
-
- const auto key = std::make_tuple(record.GetTabletId(), record.GetChannel());
- auto& barrier = Self->BarrierServer->Barriers[key];
- TGenStep& barrierGenCtr = record.GetHard() ? barrier.HardGenCtr : barrier.SoftGenCtr;
- TGenStep& barrierGenStep = record.GetHard() ? barrier.Hard : barrier.Soft;
-
- const TGenStep genCtr(record.GetGeneration(), record.GetPerGenerationCounter());
- const TGenStep collectGenStep(record.GetCollectGeneration(), record.GetCollectStep());
- Y_VERIFY(barrierGenCtr <= genCtr);
- barrierGenCtr = genCtr;
- Y_VERIFY(barrierGenStep <= collectGenStep);
- barrierGenStep = collectGenStep;
-
- if (record.GetHard()) {
- db.Table<Schema::Barriers>().Key(record.GetTabletId(), record.GetChannel()).Update(
- NIceDb::TUpdate<Schema::Barriers::HardGenCtr>(ui64(genCtr)),
- NIceDb::TUpdate<Schema::Barriers::Hard>(ui64(collectGenStep))
- );
- } else {
- db.Table<Schema::Barriers>().Key(record.GetTabletId(), record.GetChannel()).Update(
- NIceDb::TUpdate<Schema::Barriers::SoftGenCtr>(ui64(genCtr)),
- NIceDb::TUpdate<Schema::Barriers::Soft>(ui64(collectGenStep))
- );
- }
- }
-
- return true;
- }
};
void TBlobDepot::TBarrierServer::AddBarrierOnLoad(ui64 tabletId, ui8 channel, TGenStep softGenCtr, TGenStep soft,
@@ -183,27 +176,33 @@ namespace NKikimr::NBlobDepot {
.HardGenCtr = hardGenCtr,
.Hard = hard,
};
+
+ Self->BarrierServer->ValidateBlobInvariant(tabletId, channel);
}
- void TBlobDepot::TBarrierServer::AddBarrierOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBarrier& barrier,
- NTabletFlatExecutor::TTransactionContext& txc) {
+ bool TBlobDepot::TBarrierServer::AddBarrierOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBarrier& barrier,
+ ui32& maxItems, NTabletFlatExecutor::TTransactionContext& txc, void *cookie) {
NIceDb::TNiceDb db(txc.DB);
const auto key = std::make_tuple(barrier.TabletId, barrier.Channel);
auto& current = Barriers[key];
-#define DO(TYPE) \
+#define DO(TYPE, HARD) \
{ \
const TGenStep barrierGenCtr(barrier.TYPE.RecordGeneration, barrier.TYPE.PerGenerationCounter); \
const TGenStep barrierCollect(barrier.TYPE.CollectGeneration, barrier.TYPE.CollectStep); \
if (current.TYPE##GenCtr < barrierGenCtr) { \
if (current.TYPE <= barrierCollect) { \
+ if (!Self->Data->OnBarrierShift(barrier.TabletId, barrier.Channel, HARD, current.TYPE, \
+ barrierCollect, maxItems, txc, cookie)) { \
+ return false; \
+ } \
current.TYPE##GenCtr = barrierGenCtr; \
current.TYPE = barrierCollect; \
db.Table<Schema::Barriers>().Key(barrier.TabletId, barrier.Channel).Update( \
NIceDb::TUpdate<Schema::Barriers::TYPE##GenCtr>(ui64(barrierGenCtr)), \
NIceDb::TUpdate<Schema::Barriers::TYPE>(ui64(barrierCollect)) \
); \
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT45, "replacing " #TYPE " barrier through decommission", \
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT45, "replaced " #TYPE " barrier through decommission", \
(TabletId, barrier.TabletId), (Channel, int(barrier.Channel)), \
(GenCtr, current.TYPE##GenCtr), (Collect, current.TYPE), \
(Barrier, barrier)); \
@@ -221,8 +220,12 @@ namespace NKikimr::NBlobDepot {
} \
}
- DO(Hard)
- DO(Soft)
+ DO(Hard, true)
+ DO(Soft, false)
+
+ Self->BarrierServer->ValidateBlobInvariant(barrier.TabletId, barrier.Channel);
+
+ return true;
}
void TBlobDepot::TBarrierServer::Handle(TEvBlobDepot::TEvCollectGarbage::TPtr ev) {
@@ -251,4 +254,52 @@ namespace NKikimr::NBlobDepot {
}
}
+ void TBlobDepot::TBarrierServer::ValidateBlobInvariant(ui64 tabletId, ui8 channel) {
+#ifndef NDEBUG
+ for (const bool hard : {true, false}) {
+ const auto it = Barriers.find(std::make_tuple(tabletId, channel));
+ Y_VERIFY(it != Barriers.end());
+ const TBarrier& barrier = it->second;
+ const TGenStep& barrierGenStep = hard ? barrier.Hard : barrier.Soft;
+ const TData::TKey first(TLogoBlobID(tabletId, 0, 0, channel, 0, 0));
+ const TData::TKey last(TLogoBlobID(tabletId, barrierGenStep.Generation(), barrierGenStep.Step(),
+ channel, TLogoBlobID::MaxBlobSize, TLogoBlobID::MaxCookie, TLogoBlobID::MaxPartId,
+ TLogoBlobID::MaxCrcMode));
+ Self->Data->ScanRange(&first, &last, TData::EScanFlags::INCLUDE_BEGIN | TData::EScanFlags::INCLUDE_END,
+ [&](const TData::TKey& key, const TData::TValue& value) {
+ // there must be no blobs under the hard barrier and no blobs with mode other than Keep under the soft one
+ Y_VERIFY_S(!hard && value.KeepState == NKikimrBlobDepot::EKeepState::Keep, "Key# " << key.ToString()
+ << " Value# " << value.ToString());
+ return true;
+ });
+ }
+# if 0
+ Self->Data->ScanRange(nullptr, nullptr, {}, [&](const TData::TKey& key, const TData::TValue& value) {
+ bool underSoft, underHard;
+ Self->BarrierServer->GetBlobBarrierRelation(key.GetBlobId(), &underSoft, &underHard);
+ Y_VERIFY(!underHard && (!underSoft || value.KeepState == NKikimrBlobDepot::EKeepState::Keep));
+ return true;
+ });
+# endif
+#else
+ Y_UNUSED(tabletId);
+ Y_UNUSED(channel);
+#endif
+ }
+
+ void TBlobDepot::TBarrierServer::FinishRequest(ui64 tabletId, ui8 channel, std::optional<TString> error) {
+ const auto key = std::make_tuple(tabletId, channel);
+ auto& barrier = Barriers[key];
+ Y_VERIFY(!barrier.ProcessingQ.empty());
+ auto& request = barrier.ProcessingQ.front();
+ auto [response, _] = TEvBlobDepot::MakeResponseFor(*request, Self->SelfId(),
+ error ? NKikimrProto::ERROR : NKikimrProto::OK, std::move(error));
+ TActivationContext::Send(response.release());
+ barrier.ProcessingQ.pop_front();
+ Self->Data->HandleTrash();
+ if (!barrier.ProcessingQ.empty()) {
+ Self->Execute(std::make_unique<TTxCollectGarbage>(Self, tabletId, channel));
+ }
+ }
+
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/garbage_collection.h b/ydb/core/blob_depot/garbage_collection.h
index 578f69ccba7..55c5fa01d6b 100644
--- a/ydb/core/blob_depot/garbage_collection.h
+++ b/ydb/core/blob_depot/garbage_collection.h
@@ -19,6 +19,7 @@ namespace NKikimr::NBlobDepot {
THashMap<std::tuple<ui64, ui8>, TBarrier> Barriers;
private:
+ class TTxAdvanceBarrier;
class TTxCollectGarbage;
public:
@@ -27,11 +28,14 @@ namespace NKikimr::NBlobDepot {
{}
void AddBarrierOnLoad(ui64 tabletId, ui8 channel, TGenStep softGenCtr, TGenStep soft, TGenStep hardGenCtr, TGenStep hard);
- void AddBarrierOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBarrier& barrier, NTabletFlatExecutor::TTransactionContext& txc);
+ bool AddBarrierOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBarrier& barrier, ui32& maxItems,
+ NTabletFlatExecutor::TTransactionContext& txc, void *cookie);
void Handle(TEvBlobDepot::TEvCollectGarbage::TPtr ev);
void GetBlobBarrierRelation(TLogoBlobID id, bool *underSoft, bool *underHard) const;
void OnDataLoaded();
+ void ValidateBlobInvariant(ui64 tabletId, ui8 channel);
+
TString ToStringBarrier(ui64 tabletId, ui8 channel, bool hard) const {
if (const auto it = Barriers.find(std::make_tuple(tabletId, channel)); it == Barriers.end()) {
return "<none>";
@@ -49,6 +53,8 @@ namespace NKikimr::NBlobDepot {
callback(tabletId, channel, value.SoftGenCtr, value.Soft, value.HardGenCtr, value.Hard);
}
}
+
+ void FinishRequest(ui64 tabletId, ui8 channel, std::optional<TString> error);
};
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/given_id_range.cpp b/ydb/core/blob_depot/given_id_range.cpp
index f66a1f3fc05..c65ca410cea 100644
--- a/ydb/core/blob_depot/given_id_range.cpp
+++ b/ydb/core/blob_depot/given_id_range.cpp
@@ -145,7 +145,12 @@ namespace NKikimr::NBlobDepot {
myChunk -= otherChunk;
NumAvailableItems -= otherChunk.Count();
- ++myIt;
+ if (myChunk.Count()) {
+ ++myIt;
+ } else {
+ myIt = Ranges.erase(myIt);
+ }
+
++otherIt;
}
}
@@ -155,7 +160,11 @@ namespace NKikimr::NBlobDepot {
void TGivenIdRange::Output(IOutputStream& s) const {
s << "{";
- for (const auto& [key, chunk] : Ranges) {
+ for (auto it = Ranges.begin(); it != Ranges.end(); ++it) {
+ const auto& [key, chunk] = *it;
+ if (it != Ranges.begin()) {
+ s << " ";
+ }
s << key << ":";
for (ui32 i = 0; i < chunk.Size(); ++i) {
s << int(chunk[i]);
diff --git a/ydb/core/blob_depot/op_commit_blob_seq.cpp b/ydb/core/blob_depot/op_commit_blob_seq.cpp
index fe83420eb30..173d7eec958 100644
--- a/ydb/core/blob_depot/op_commit_blob_seq.cpp
+++ b/ydb/core/blob_depot/op_commit_blob_seq.cpp
@@ -31,19 +31,8 @@ namespace NKikimr::NBlobDepot {
auto *responseItem = responseRecord->AddItems();
responseItem->SetStatus(NKikimrProto::OK);
- NKikimrBlobDepot::TValue value;
- if (item.HasMeta()) {
- value.SetMeta(item.GetMeta());
- }
- if (const auto keepState = Self->Data->GetKeepState(key); keepState != NKikimrBlobDepot::EKeepState::Default) {
- value.SetKeepState(keepState);
- }
- auto *chain = value.AddValueChain();
- auto *locator = chain->MutableLocator();
- locator->CopyFrom(item.GetBlobLocator());
-
- const auto blobSeqId = TBlobSeqId::FromProto(locator->GetBlobSeqId());
- const bool canBeCollected = Self->Data->CanBeCollected(locator->GetGroupId(), blobSeqId);
+ const auto blobSeqId = TBlobSeqId::FromProto(item.GetBlobLocator().GetBlobSeqId());
+ const bool canBeCollected = Self->Data->CanBeCollected(item.GetBlobLocator().GetGroupId(), blobSeqId);
if (blobSeqId.Generation == generation) {
// check for internal sanity -- we can't issue barriers on given ids without confirmed trimming
@@ -60,20 +49,15 @@ namespace NKikimr::NBlobDepot {
}
TString error;
- if (!CheckKeyAgainstBarrier(key, value, &error)) {
+ if (!CheckKeyAgainstBarrier(key, &error)) {
responseItem->SetStatus(NKikimrProto::ERROR);
responseItem->SetErrorReason(TStringBuilder() << "BlobId# " << key.ToString()
<< " is being put beyond the barrier: " << error);
continue;
}
- TString valueData;
- const bool success = value.SerializeToString(&valueData);
- Y_VERIFY(success);
-
- db.Table<Schema::Data>().Key(item.GetKey()).Update<Schema::Data::Value>(valueData);
-
- Self->Data->PutKey(std::move(key), TData::TValue(std::move(value)));
+ // obtain value
+ Self->Data->UpdateKey(key, item, txc, this);
}
return true;
@@ -97,26 +81,29 @@ namespace NKikimr::NBlobDepot {
}
}
- bool CheckKeyAgainstBarrier(const TData::TKey& key, const NKikimrBlobDepot::TValue& value,
- TString *error) {
+ bool CheckKeyAgainstBarrier(const TData::TKey& key, TString *error) {
const auto& v = key.AsVariant();
if (const auto *id = std::get_if<TLogoBlobID>(&v)) {
bool underSoft, underHard;
Self->BarrierServer->GetBlobBarrierRelation(*id, &underSoft, &underHard);
if (underHard) {
- *error = TStringBuilder() << "under barrier# " << Self->BarrierServer->ToStringBarrier(
+ *error = TStringBuilder() << "under hard barrier# " << Self->BarrierServer->ToStringBarrier(
id->TabletID(), id->Channel(), true);
return false;
- } else if (underSoft && value.GetKeepState() != NKikimrBlobDepot::EKeepState::Keep) {
- *error = TStringBuilder() << "under barrier# " << Self->BarrierServer->ToStringBarrier(
- id->TabletID(), id->Channel(), false);
- return false;
+ } else if (underSoft) {
+ const TData::TValue *value = Self->Data->FindKey(key);
+ if (!value || value->KeepState != NKikimrBlobDepot::EKeepState::Keep) {
+ *error = TStringBuilder() << "under soft barrier# " << Self->BarrierServer->ToStringBarrier(
+ id->TabletID(), id->Channel(), false);
+ return false;
+ }
}
}
return true;
}
void Complete(const TActorContext&) override {
+ Self->Data->CommitTrash(this);
Self->Data->HandleTrash();
TActivationContext::Send(Response.release());
}
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.h b/ydb/core/blobstorage/nodewarden/node_warden_impl.h
index 53d8f837720..15afa7787e1 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_impl.h
+++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.h
@@ -459,6 +459,7 @@ namespace NKikimr::NStorage {
fFunc(TEvBlobStorage::TEvRange::EventType, HandleForwarded);
fFunc(TEvBlobStorage::TEvCollectGarbage::EventType, HandleForwarded);
fFunc(TEvBlobStorage::TEvStatus::EventType, HandleForwarded);
+ fFunc(TEvBlobStorage::TEvAssimilate::EventType, HandleForwarded);
fFunc(TEvBlobStorage::TEvBunchOfEvents::EventType, HandleForwarded);
fFunc(TEvRequestProxySessionsState::EventType, HandleForwarded);
diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_response.cpp b/ydb/core/blobstorage/vdisk/common/vdisk_response.cpp
index 8613e05c0ef..df61982aa02 100644
--- a/ydb/core/blobstorage/vdisk/common/vdisk_response.cpp
+++ b/ydb/core/blobstorage/vdisk/common/vdisk_response.cpp
@@ -28,6 +28,10 @@ void SendVDiskResponse(const TActorContext &ctx, const TActorId &recipient, IEve
HANDLE_EVENT(TEvVGetResult, EvVGetResultSent)
#undef HANDLE_EVENT
+
+ case TEvBlobStorage::EvVAssimilateResult: // override channel for assimilation result
+ channel = TInterconnectChannels::IC_BLOBSTORAGE_ASYNC_DATA;
+ break;
}
auto event = std::make_unique<IEventHandle>(recipient, ctx.SelfID, ev, IEventHandle::MakeFlags(channel, 0), cookie);
diff --git a/ydb/core/keyvalue/keyvalue_storage_read_request.cpp b/ydb/core/keyvalue/keyvalue_storage_read_request.cpp
index 182a2c513c4..d9c2625e6bb 100644
--- a/ydb/core/keyvalue/keyvalue_storage_read_request.cpp
+++ b/ydb/core/keyvalue/keyvalue_storage_read_request.cpp
@@ -283,11 +283,12 @@ public:
"Unexpected EvGetResult.",
(KeyValue, TabletInfo->TabletID),
(Status, result->Status),
+ (Id, response.Id),
(ResponseStatus, response.Status),
- (Deadline, IntermediateResult->Deadline.MilliSeconds()),
- (Now, TActivationContext::Now().MilliSeconds()),
+ (Deadline, IntermediateResult->Deadline),
+ (Now, TActivationContext::Now()),
(SentAt, batch.SentTime),
- (GotAt, IntermediateResult->Stat.IntermediateCreatedAt.MilliSeconds()),
+ (GotAt, IntermediateResult->Stat.IntermediateCreatedAt),
(ErrorReason, result->ErrorReason));
hasErrorResponses = true;
}