diff options
author | alexvru <[email protected]> | 2022-08-30 09:26:02 +0300 |
---|---|---|
committer | alexvru <[email protected]> | 2022-08-30 09:26:02 +0300 |
commit | f1dc9cbd2577c2a61181b0095146755c1f8b8c35 (patch) | |
tree | 185b91826c24326591166d5e16b650d7b71962ed | |
parent | d539d1c8b29f406e83140be05ced18eac998b4f4 (diff) |
BlobDepot work in progress
-rw-r--r-- | ydb/core/blob_depot/assimilator.cpp | 157 | ||||
-rw-r--r-- | ydb/core/blob_depot/assimilator.h | 1 | ||||
-rw-r--r-- | ydb/core/blob_depot/data.cpp | 300 | ||||
-rw-r--r-- | ydb/core/blob_depot/data.h | 72 | ||||
-rw-r--r-- | ydb/core/blob_depot/data_load.cpp | 7 | ||||
-rw-r--r-- | ydb/core/blob_depot/data_resolve.cpp | 8 | ||||
-rw-r--r-- | ydb/core/blob_depot/garbage_collection.cpp | 313 | ||||
-rw-r--r-- | ydb/core/blob_depot/garbage_collection.h | 8 | ||||
-rw-r--r-- | ydb/core/blob_depot/given_id_range.cpp | 13 | ||||
-rw-r--r-- | ydb/core/blob_depot/op_commit_blob_seq.cpp | 43 | ||||
-rw-r--r-- | ydb/core/blobstorage/nodewarden/node_warden_impl.h | 1 | ||||
-rw-r--r-- | ydb/core/blobstorage/vdisk/common/vdisk_response.cpp | 4 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_storage_read_request.cpp | 7 |
13 files changed, 586 insertions, 348 deletions
diff --git a/ydb/core/blob_depot/assimilator.cpp b/ydb/core/blob_depot/assimilator.cpp index f7fc00806b1..84dae88a4da 100644 --- a/ydb/core/blob_depot/assimilator.cpp +++ b/ydb/core/blob_depot/assimilator.cpp @@ -42,6 +42,8 @@ namespace NKikimr::NBlobDepot { } void TAssimilator::PassAway() { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT52, "TAssimilator::PassAway", (Id, Self->GetLogId())); + TActorBootstrapped::PassAway(); } STATEFN(TAssimilator::StateFunc) { @@ -51,6 +53,7 @@ namespace NKikimr::NBlobDepot { switch (const ui32 type = ev->GetTypeRewrite()) { hFunc(TEvBlobStorage::TEvAssimilateResult, Handle); + hFunc(TEvents::TEvUndelivered, Handle); hFunc(TEvBlobStorage::TEvGetResult, Handle); hFunc(TEvBlobStorage::TEvPutResult, Handle); hFunc(TEvBlobStorage::TEvCollectGarbageResult, Handle); @@ -58,6 +61,7 @@ namespace NKikimr::NBlobDepot { hFunc(TEvTabletPipe::TEvClientDestroyed, Handle); hFunc(TEvBlobStorage::TEvControllerGroupDecommittedResponse, Handle); cFunc(TEvPrivate::EvResume, Action); + cFunc(TEvents::TSystem::Poison, PassAway); default: Y_VERIFY_DEBUG(false, "unexpected event Type# %08" PRIx32, type); @@ -79,15 +83,28 @@ namespace NKikimr::NBlobDepot { } void TAssimilator::SendAssimilateRequest() { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT53, "TAssimilator::SendAssimilateRequest", (Id, Self->GetLogId()), + (SelfId, SelfId()), (DecommitGroupId, Self->Config.GetDecommitGroupId())); SendToBSProxy(SelfId(), Self->Config.GetDecommitGroupId(), new TEvBlobStorage::TEvAssimilate(SkipBlocksUpTo, SkipBarriersUpTo, SkipBlobsUpTo)); } + void TAssimilator::Handle(TEvents::TEvUndelivered::TPtr ev) { + STLOG(PRI_ERROR, BLOB_DEPOT, BDT55, "received TEvUndelivered", (Id, Self->GetLogId()), (Sender, ev->Sender), + (Cookie, ev->Cookie), (Type, ev->Get()->SourceType), (Reason, ev->Get()->Reason)); + TActivationContext::Schedule(TDuration::Seconds(1), new IEventHandle(TEvPrivate::EvResume, 0, SelfId(), {}, + nullptr, 0)); + } + void TAssimilator::Handle(TEvBlobStorage::TEvAssimilateResult::TPtr ev) { class TTxPutAssimilatedData : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { - TAssimilator *Self; + TAssimilator* const Self; std::unique_ptr<TEvBlobStorage::TEvAssimilateResult> Ev; + bool BlocksFinished = false; + bool BarriersFinished = false; + bool UnblockRegisterActorQ = false; + bool MoreData = false; public: TTxPutAssimilatedData(TAssimilator *self, TEvBlobStorage::TEvAssimilateResult::TPtr ev) @@ -96,85 +113,102 @@ namespace NKikimr::NBlobDepot { , Ev(ev->Release().Release()) {} + TTxPutAssimilatedData(TTxPutAssimilatedData& predecessor) + : TTransactionBase(predecessor.Self->Self) + , Self(predecessor.Self) + , Ev(std::move(predecessor.Ev)) + , BlocksFinished(predecessor.BlocksFinished) + , BarriersFinished(predecessor.BarriersFinished) + {} + bool Execute(TTransactionContext& txc, const TActorContext&) override { NIceDb::TNiceDb db(txc.DB); - bool blocksFinished = false; - bool barriersFinished = false; - - if (const auto& blocks = Ev->Blocks; !blocks.empty()) { - Self->SkipBlocksUpTo = blocks.back().TabletId; - } - if (const auto& barriers = Ev->Barriers; !barriers.empty()) { - Self->SkipBarriersUpTo = {barriers.back().TabletId, barriers.back().Channel}; - } - if (const auto& blobs = Ev->Blobs; !blobs.empty()) { - Self->SkipBlobsUpTo = blobs.back().Id; + const bool wasEmpty = Ev->Blocks.empty() && Ev->Barriers.empty() && Ev->Blobs.empty(); + if (wasEmpty) { + BlocksFinished = BarriersFinished = true; } - for (const auto& block : Ev->Blocks) { + ui32 maxItems = 10'000; + for (auto& blocks = Ev->Blocks; maxItems && !blocks.empty(); blocks.pop_front(), --maxItems) { + auto& block = blocks.front(); STLOG(PRI_DEBUG, BLOB_DEPOT, BDT31, "assimilated block", (Id, Self->Self->GetLogId()), (Block, block)); Self->Self->BlocksManager->AddBlockOnDecommit(block, txc); + Self->SkipBlocksUpTo.emplace(block.TabletId); } - for (const auto& barrier : Ev->Barriers) { + for (auto& barriers = Ev->Barriers; maxItems && !barriers.empty(); barriers.pop_front(), --maxItems) { + auto& barrier = barriers.front(); STLOG(PRI_DEBUG, BLOB_DEPOT, BDT32, "assimilated barrier", (Id, Self->Self->GetLogId()), (Barrier, barrier)); - Self->Self->BarrierServer->AddBarrierOnDecommit(barrier, txc); - blocksFinished = true; // there will be no blocks for sure + if (!Self->Self->BarrierServer->AddBarrierOnDecommit(barrier, maxItems, txc, this)) { + Y_VERIFY(!maxItems); + break; + } + Self->SkipBarriersUpTo.emplace(barrier.TabletId, barrier.Channel); + BlocksFinished = true; // there will be no blocks for sure } - for (const auto& blob : Ev->Blobs) { + for (auto& blobs = Ev->Blobs; maxItems && !blobs.empty(); blobs.pop_front(), --maxItems) { + auto& blob = blobs.front(); STLOG(PRI_DEBUG, BLOB_DEPOT, BDT33, "assimilated blob", (Id, Self->Self->GetLogId()), (Blob, blob)); - Self->Self->Data->AddDataOnDecommit(blob, txc); - blocksFinished = barriersFinished = true; // no blocks and no more barriers - } - - auto& decommitState = Self->Self->DecommitState; - const auto decommitStateOnEntry = decommitState; - if (blocksFinished && decommitState < EDecommitState::BlocksFinished) { - decommitState = EDecommitState::BlocksFinished; - UnblockRegisterActorQ = true; - } - if (barriersFinished && decommitState < EDecommitState::BarriersFinished) { - decommitState = EDecommitState::BarriersFinished; + Self->Self->Data->AddDataOnDecommit(blob, txc, this); + Self->SkipBlobsUpTo.emplace(blob.Id); + Self->Self->Data->LastAssimilatedBlobId = blob.Id; + BlocksFinished = BarriersFinished = true; // no blocks and no more barriers } - const bool done = Ev->Blocks.empty() && Ev->Barriers.empty() && Ev->Blobs.empty(); - if (done && decommitState < EDecommitState::BlobsFinished) { - decommitState = EDecommitState::BlobsFinished; - } - - db.Table<Schema::Config>().Key(Schema::Config::Key::Value).Update( - NIceDb::TUpdate<Schema::Config::DecommitState>(decommitState), - NIceDb::TUpdate<Schema::Config::AssimilatorState>(Self->SerializeAssimilatorState()) - ); - auto toString = [](EDecommitState state) { - switch (state) { - case EDecommitState::Default: return "Default"; - case EDecommitState::BlocksFinished: return "BlocksFinished"; - case EDecommitState::BarriersFinished: return "BarriersFinished"; - case EDecommitState::BlobsFinished: return "BlobsFinished"; - case EDecommitState::BlobsCopied: return "BlobsCopied"; - case EDecommitState::Done: return "Done"; + if (Ev->Blocks.empty() && Ev->Barriers.empty() && Ev->Blobs.empty()) { + auto& decommitState = Self->Self->DecommitState; + const auto decommitStateOnEntry = decommitState; + if (BlocksFinished && decommitState < EDecommitState::BlocksFinished) { + decommitState = EDecommitState::BlocksFinished; + UnblockRegisterActorQ = true; + } + if (BarriersFinished && decommitState < EDecommitState::BarriersFinished) { + decommitState = EDecommitState::BarriersFinished; + } + if (wasEmpty && decommitState < EDecommitState::BlobsFinished) { + decommitState = EDecommitState::BlobsFinished; } - }; - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT47, "decommit state change", (Id, Self->Self->GetLogId()), - (From, toString(decommitStateOnEntry)), (To, toString(decommitState)), - (UnblockRegisterActorQ, UnblockRegisterActorQ)); - if (!Ev->Blobs.empty()) { - Self->Self->Data->LastAssimilatedBlobId = Ev->Blobs.back().Id; + db.Table<Schema::Config>().Key(Schema::Config::Key::Value).Update( + NIceDb::TUpdate<Schema::Config::DecommitState>(decommitState), + NIceDb::TUpdate<Schema::Config::AssimilatorState>(Self->SerializeAssimilatorState()) + ); + + auto toString = [](EDecommitState state) { + switch (state) { + case EDecommitState::Default: return "Default"; + case EDecommitState::BlocksFinished: return "BlocksFinished"; + case EDecommitState::BarriersFinished: return "BarriersFinished"; + case EDecommitState::BlobsFinished: return "BlobsFinished"; + case EDecommitState::BlobsCopied: return "BlobsCopied"; + case EDecommitState::Done: return "Done"; + } + }; + + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT47, "decommit state change", (Id, Self->Self->GetLogId()), + (From, toString(decommitStateOnEntry)), (To, toString(decommitState)), + (UnblockRegisterActorQ, UnblockRegisterActorQ)); + } else { + MoreData = true; } return true; } void Complete(const TActorContext&) override { - if (UnblockRegisterActorQ) { - STLOG(PRI_INFO, BLOB_DEPOT, BDT35, "blocks assimilation complete", (Id, Self->Self->GetLogId()), - (DecommitGroupId, Self->Self->Config.GetDecommitGroupId())); - Self->Self->ProcessRegisterAgentQ(); - } + Self->Self->Data->CommitTrash(this); + + if (MoreData) { + Self->Self->Execute(std::make_unique<TTxPutAssimilatedData>(*this)); + } else { + if (UnblockRegisterActorQ) { + STLOG(PRI_INFO, BLOB_DEPOT, BDT35, "blocks assimilation complete", (Id, Self->Self->GetLogId()), + (DecommitGroupId, Self->Self->Config.GetDecommitGroupId())); + Self->Self->ProcessRegisterAgentQ(); + } - Self->Action(); + Self->Action(); + } } }; @@ -183,6 +217,8 @@ namespace NKikimr::NBlobDepot { void TAssimilator::ScanDataForCopying() { const bool fromTheBeginning = !LastScannedKey; + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT54, "TAssimilator::ScanDataForCopying", (Id, Self->GetLogId()), + (LastScannedKey, LastScannedKey)); TData::TKey lastScannedKey; if (LastScannedKey) { @@ -241,7 +277,7 @@ namespace NKikimr::NBlobDepot { for (ui32 i = 0; i < msg.ResponseSz; ++i) { auto& resp = msg.Responses[i]; STLOG(PRI_DEBUG, BLOB_DEPOT, BDT34, "got TEvGetResult", (Id, Self->GetLogId()), (BlobId, resp.Id), - (Status, resp.Status)); + (Status, resp.Status), (NumPutsInFlight, NumPutsInFlight)); if (resp.Status == NKikimrProto::OK) { auto ev = std::make_unique<TEvBlobStorage::TEvPut>(resp.Id, resp.Buffer, TInstant::Max()); ev->Decommission = true; @@ -256,7 +292,8 @@ namespace NKikimr::NBlobDepot { void TAssimilator::Handle(TEvBlobStorage::TEvPutResult::TPtr ev) { auto& msg = *ev->Get(); - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT37, "got TEvPutResult", (Id, Self->GetLogId()), (Msg, msg)); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT37, "got TEvPutResult", (Id, Self->GetLogId()), (Msg, msg), + (NumPutsInFlight, NumPutsInFlight)); if (msg.Status == NKikimrProto::OK) { const size_t numErased = NeedfulBlobs.erase(msg.Id); Y_VERIFY(numErased == 1); diff --git a/ydb/core/blob_depot/assimilator.h b/ydb/core/blob_depot/assimilator.h index 2a1a7117288..aa6d4088d28 100644 --- a/ydb/core/blob_depot/assimilator.h +++ b/ydb/core/blob_depot/assimilator.h @@ -43,6 +43,7 @@ namespace NKikimr::NBlobDepot { private: void Action(); void SendAssimilateRequest(); + void Handle(TEvents::TEvUndelivered::TPtr ev); void Handle(TEvBlobStorage::TEvAssimilateResult::TPtr ev); void ScanDataForCopying(); void Handle(TEvBlobStorage::TEvGetResult::TPtr ev); diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp index a13ea999d60..e5ecfc81579 100644 --- a/ydb/core/blob_depot/data.cpp +++ b/ydb/core/blob_depot/data.cpp @@ -5,9 +5,125 @@ namespace NKikimr::NBlobDepot { using TData = TBlobDepot::TData; - NKikimrBlobDepot::EKeepState TData::GetKeepState(const TKey& key) const { + enum class EUpdateOutcome { + CHANGE, + NO_CHANGE, + DROP + }; + + template<typename T, typename... TArgs> + bool TData::UpdateKey(TKey key, NTabletFlatExecutor::TTransactionContext& txc, void *cookie, T&& callback, TArgs&&... args) { + bool underSoft = false, underHard = false; + auto var = key.AsVariant(); + if (auto *id = std::get_if<TLogoBlobID>(&var)) { + Self->BarrierServer->GetBlobBarrierRelation(*id, &underSoft, &underHard); + } + if (underHard || underSoft) { + if (const auto it = Data.find(key); it == Data.end()) { + return false; // no such key existed and will not be created as it hits the barrier + } else { + Y_VERIFY_S(!underHard && it->second.KeepState == NKikimrBlobDepot::EKeepState::Keep, + "barrier invariant failed Key# " << key.ToString() << " Value# " << it->second.ToString()); + } + } + + const auto [it, inserted] = Data.try_emplace(std::move(key), std::forward<TArgs>(args)...); + { + auto& [key, value] = *it; + Y_VERIFY(!underHard); + Y_VERIFY(!underSoft || !inserted); + + std::vector<TLogoBlobID> deleteQ; + + if (!inserted) { + EnumerateBlobsForValueChain(value.ValueChain, Self->TabletID(), [&](TLogoBlobID id, ui32, ui32) { + const auto it = RefCount.find(id); + Y_VERIFY(it != RefCount.end()); + if (!--it->second) { + deleteQ.push_back(id); + } + }); + } + + EUpdateOutcome outcome = callback(value, inserted); + + Y_VERIFY(!inserted || outcome != EUpdateOutcome::NO_CHANGE); + if (underSoft && value.KeepState != NKikimrBlobDepot::EKeepState::Keep) { + outcome = EUpdateOutcome::DROP; + } + + EnumerateBlobsForValueChain(value.ValueChain, Self->TabletID(), [&](TLogoBlobID id, ui32, ui32) { + const auto [it, inserted] = RefCount.try_emplace(id, 1); + if (inserted) { + // first mention of this id + auto& record = GetRecordsPerChannelGroup(id); + const auto [_, inserted] = record.Used.insert(id); + Y_VERIFY(inserted); + AccountBlob(id, 1); + + // blob is first mentioned and deleted as well + if (outcome == EUpdateOutcome::DROP) { + it->second = 0; + deleteQ.push_back(id); + } + } else if (outcome != EUpdateOutcome::DROP) { + ++it->second; + } + }); + + for (const TLogoBlobID& id : deleteQ) { + const auto it = RefCount.find(id); + Y_VERIFY(it != RefCount.end()); + if (!it->second) { + InFlightTrash.emplace(cookie, id); + NIceDb::TNiceDb(txc.DB).Table<Schema::Trash>().Key(id.AsBinaryString()).Update(); + RefCount.erase(it); + } + } + + auto row = NIceDb::TNiceDb(txc.DB).Table<Schema::Data>().Key(key.MakeBinaryKey()); + switch (outcome) { + case EUpdateOutcome::DROP: + Data.erase(it); + row.Delete(); + return true; + + case EUpdateOutcome::CHANGE: + row.template Update<Schema::Data::Value>(value.SerializeToString()); + return true; + + case EUpdateOutcome::NO_CHANGE: + return false; + } + } + } + + const TData::TValue *TData::FindKey(const TKey& key) const { const auto it = Data.find(key); - return it != Data.end() ? it->second.KeepState : NKikimrBlobDepot::EKeepState::Default; + return it != Data.end() ? &it->second : nullptr; + } + + void TData::UpdateKey(const TKey& key, const NKikimrBlobDepot::TEvCommitBlobSeq::TItem& item, + NTabletFlatExecutor::TTransactionContext& txc, void *cookie) { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT10, "UpdateKey", (Id, Self->GetLogId()), (Key, key), (Item, item)); + UpdateKey(key, txc, cookie, [&](TValue& value, bool inserted) { + if (!inserted) { // update value items + value.Meta = item.GetMeta(); + value.Public = false; + value.Unconfirmed = item.GetUnconfirmed(); + + // update it to keep new blob locator + value.ValueChain.Clear(); + auto *chain = value.ValueChain.Add(); + auto *locator = chain->MutableLocator(); + locator->CopyFrom(item.GetBlobLocator()); + + // reset original blob id, if any + value.OriginalBlobId.reset(); + } + + return EUpdateOutcome::CHANGE; + }, item); } TData::TRecordsPerChannelGroup& TData::GetRecordsPerChannelGroup(TLogoBlobID id) { @@ -19,50 +135,44 @@ namespace NKikimr::NBlobDepot { return it->second; } - void TData::AddDataOnLoad(TKey key, TString value) { - if (Data.contains(key)) { - return; // we are racing with the offline load procedure -- skip this key, it is already new in memory - } - + void TData::AddDataOnLoad(TKey key, TString value, NTabletFlatExecutor::TTransactionContext& txc, void *cookie) { NKikimrBlobDepot::TValue proto; const bool success = proto.ParseFromString(value); Y_VERIFY(success); - PutKey(std::move(key), TValue(std::move(proto))); - } - - void TData::AddDataOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBlob& blob, - NTabletFlatExecutor::TTransactionContext& txc) { - TKey key(blob.Id); - - bool underSoft, underHard; - Self->BarrierServer->GetBlobBarrierRelation(blob.Id, &underSoft, &underHard); - - // calculate keep state for this blob - const auto it = Data.find(key); - const NKikimrBlobDepot::EKeepState keepState = Max(it != Data.end() ? it->second.KeepState : NKikimrBlobDepot::EKeepState::Default, - blob.DoNotKeep ? NKikimrBlobDepot::EKeepState::DoNotKeep : - blob.Keep ? NKikimrBlobDepot::EKeepState::Keep : NKikimrBlobDepot::EKeepState::Default); - - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT49, "AddDataOnDecommit", (Id, Self->GetLogId()), (Blob, blob), - (UnderHard, underHard), (UnderSoft, underSoft), (KeepState, keepState)); - if (underHard || (underSoft && keepState != NKikimrBlobDepot::EKeepState::Keep)) { - return; // we can skip this blob as it is already being collected - } + UpdateKey(std::move(key), txc, cookie, [&](TValue& value, bool inserted) { + if (!inserted) { // do some merge logic + value.KeepState = Max(value.KeepState, proto.GetKeepState()); + if (value.ValueChain.empty() && proto.ValueChainSize()) { + value.ValueChain.CopyFrom(proto.GetValueChain()); + value.OriginalBlobId.reset(); + } + } - NKikimrBlobDepot::TValue value; - value.SetKeepState(keepState); - value.SetUnconfirmed(true); - LogoBlobIDFromLogoBlobID(blob.Id, value.MutableOriginalBlobId()); + return EUpdateOutcome::CHANGE; + }, std::move(proto)); + } - TString valueData; - const bool success = value.SerializeToString(&valueData); - Y_VERIFY(success); + void TData::AddDataOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBlob& blob, + NTabletFlatExecutor::TTransactionContext& txc, void *cookie) { + UpdateKey(TKey(blob.Id), txc, cookie, [&](TValue& value, bool inserted) { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT49, "AddDataOnDecommit", (Id, Self->GetLogId()), (Blob, blob), + (Value, value), (Inserted, inserted)); + + // update keep state if necessary + if (blob.DoNotKeep && value.KeepState < NKikimrBlobDepot::EKeepState::DoNotKeep) { + value.KeepState = NKikimrBlobDepot::EKeepState::DoNotKeep; + } else if (blob.Keep && value.KeepState < NKikimrBlobDepot::EKeepState::Keep) { + value.KeepState = NKikimrBlobDepot::EKeepState::Keep; + } - NIceDb::TNiceDb db(txc.DB); - db.Table<Schema::Data>().Key(key.MakeBinaryKey()).Update<Schema::Data::Value>(valueData); + // if there is not value chain for this blob, map it to the original blob id + if (value.ValueChain.empty()) { + value.OriginalBlobId = blob.Id; + } - PutKey(key, TValue(std::move(value))); + return EUpdateOutcome::CHANGE; + }); } void TData::AddTrashOnLoad(TLogoBlobID id) { @@ -80,84 +190,37 @@ namespace NKikimr::NBlobDepot { record.LastConfirmedGenStep = confirmedGenStep; } - void TData::PutKey(TKey key, TValue&& data) { - ui64 referencedBytes = 0; - - EnumerateBlobsForValueChain(data.ValueChain, Self->TabletID(), [&](TLogoBlobID id, ui32 /*begin*/, ui32 /*end*/) { - if (!RefCount[id]++) { - // first mention of this id - auto& record = GetRecordsPerChannelGroup(id); - const auto [_, inserted] = record.Used.insert(id); - Y_VERIFY(inserted); - AccountBlob(id, 1); - } - referencedBytes += id.BlobSize(); - }); - - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT10, "PutKey", (Id, Self->GetLogId()), (Key, key), (Value, data)); - - const auto [it, inserted] = Data.try_emplace(std::move(key), std::move(data)); - if (!inserted) { - it->second = std::move(data); - } + bool TData::UpdateKeepState(TKey key, NKikimrBlobDepot::EKeepState keepState, + NTabletFlatExecutor::TTransactionContext& txc, void *cookie) { + return UpdateKey(std::move(key), txc, cookie, [&](TValue& value, bool inserted) { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT51, "UpdateKeepState", (Id, Self->GetLogId()), (Key, key), + (KeepState, keepState), (Value, value)); + if (inserted) { + return EUpdateOutcome::CHANGE; + } else if (value.KeepState < keepState) { + value.KeepState = keepState; + return EUpdateOutcome::CHANGE; + } else { + return EUpdateOutcome::NO_CHANGE; + } + }, keepState); } - std::optional<TString> TData::UpdateKeepState(TKey key, NKikimrBlobDepot::EKeepState keepState) { - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT51, "UpdateKeepState", (Id, Self->GetLogId()), (Key, key), - (KeepState, keepState)); - - const auto [it, inserted] = Data.try_emplace(std::move(key), TValue(keepState)); - if (!inserted) { - if (keepState <= it->second.KeepState) { - return std::nullopt; - } - it->second.KeepState = keepState; - } - return ToValueProto(it->second); - } - - void TData::DeleteKey(const TKey& key, const std::function<void(TLogoBlobID)>& updateTrash, void *cookie) { - const auto it = Data.find(key); - Y_VERIFY(it != Data.end()); - TValue& value = it->second; - EnumerateBlobsForValueChain(value.ValueChain, Self->TabletID(), [&](TLogoBlobID id, ui32 /*begin*/, ui32 /*end*/) { - const auto it = RefCount.find(id); - Y_VERIFY(it != RefCount.end()); - if (!--it->second) { - InFlightTrash.emplace(cookie, id); - RefCount.erase(it); - updateTrash(id); - } + void TData::DeleteKey(const TKey& key, NTabletFlatExecutor::TTransactionContext& txc, void *cookie) { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT14, "DeleteKey", (Id, Self->GetLogId()), (Key, key)); + UpdateKey(key, txc, cookie, [&](TValue&, bool inserted) { + Y_VERIFY(!inserted); + return EUpdateOutcome::DROP; }); - Data.erase(it); } void TData::CommitTrash(void *cookie) { - auto range = InFlightTrash.equal_range(cookie); - for (auto it = range.first; it != range.second; ++it) { + auto [first, last] = InFlightTrash.equal_range(cookie); + for (auto it = first; it != last; ++it) { auto& record = GetRecordsPerChannelGroup(it->second); record.MoveToTrash(this, it->second); } - InFlightTrash.erase(range.first, range.second); - } - - TString TData::ToValueProto(const TValue& value) { - NKikimrBlobDepot::TValue proto; - if (value.Meta) { - proto.SetMeta(value.Meta); - } - proto.MutableValueChain()->CopyFrom(value.ValueChain); - if (proto.GetKeepState() != value.KeepState) { - proto.SetKeepState(value.KeepState); - } - if (proto.GetPublic() != value.Public) { - proto.SetPublic(value.Public); - } - - TString s; - const bool success = proto.SerializeToString(&s); - Y_VERIFY(success); - return s; + InFlightTrash.erase(first, last); } void TData::HandleTrash() { @@ -369,6 +432,29 @@ namespace NKikimr::NBlobDepot { it->second.ClearInFlight(this); } + bool TData::OnBarrierShift(ui64 tabletId, ui8 channel, bool hard, TGenStep previous, TGenStep current, ui32& maxItems, + NTabletFlatExecutor::TTransactionContext& txc, void *cookie) { + const TData::TKey first(TLogoBlobID(tabletId, previous.Generation(), previous.Step(), channel, 0, 0)); + const TData::TKey last(TLogoBlobID(tabletId, current.Generation(), current.Step(), channel, + TLogoBlobID::MaxBlobSize, TLogoBlobID::MaxCookie, TLogoBlobID::MaxPartId, TLogoBlobID::MaxCrcMode)); + + bool finished = true; + Self->Data->ScanRange(&first, &last, TData::EScanFlags::INCLUDE_END, [&](auto& key, auto& value) { + if (value.KeepState != NKikimrBlobDepot::EKeepState::Keep || hard) { + if (maxItems) { + Self->Data->DeleteKey(key, txc, cookie); + --maxItems; + } else { + finished = false; + return false; + } + } + return true; + }); + + return finished; + } + void TData::AccountBlob(TLogoBlobID id, bool add) { // account record const ui32 groupId = Self->Info()->GroupFor(id.Channel(), id.Generation()); diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h index 7a4c44c2cdb..d94b8e6a318 100644 --- a/ydb/core/blob_depot/data.h +++ b/ydb/core/blob_depot/data.h @@ -210,12 +210,12 @@ namespace NKikimr::NBlobDepot { struct TValue { TString Meta; TValueChain ValueChain; - NKikimrBlobDepot::EKeepState KeepState; - bool Public; - bool Unconfirmed; + NKikimrBlobDepot::EKeepState KeepState = NKikimrBlobDepot::EKeepState::Default; + bool Public = false; + bool Unconfirmed = false; std::optional<TLogoBlobID> OriginalBlobId; - TValue() = delete; + TValue() = default; TValue(const TValue&) = delete; TValue(TValue&&) = default; @@ -233,12 +233,52 @@ namespace NKikimr::NBlobDepot { : std::nullopt) {} + explicit TValue(const NKikimrBlobDepot::TEvCommitBlobSeq::TItem& item) + : Meta(item.GetMeta()) + , Public(false) + , Unconfirmed(item.GetUnconfirmed()) + { + auto *chain = ValueChain.Add(); + auto *locator = chain->MutableLocator(); + locator->CopyFrom(item.GetBlobLocator()); + } + explicit TValue(NKikimrBlobDepot::EKeepState keepState) : KeepState(keepState) , Public(false) , Unconfirmed(false) {} + void SerializeToProto(NKikimrBlobDepot::TValue *proto) const { + if (Meta) { + proto->SetMeta(Meta); + } + if (!ValueChain.empty()) { + proto->MutableValueChain()->CopyFrom(ValueChain); + } + if (KeepState != proto->GetKeepState()) { + proto->SetKeepState(KeepState); + } + if (Public != proto->GetPublic()) { + proto->SetPublic(Public); + } + if (Unconfirmed != proto->GetUnconfirmed()) { + proto->SetUnconfirmed(Unconfirmed); + } + if (OriginalBlobId) { + LogoBlobIDFromLogoBlobID(*OriginalBlobId, proto->MutableOriginalBlobId()); + } + } + + TString SerializeToString() const { + NKikimrBlobDepot::TValue proto; + SerializeToProto(&proto); + TString s; + const bool success = proto.SerializeToString(&s); + Y_VERIFY(success); + return s; + } + TString ToString() const { TStringStream s; Output(s); @@ -359,24 +399,32 @@ namespace NKikimr::NBlobDepot { } } - NKikimrBlobDepot::EKeepState GetKeepState(const TKey& key) const; + const TValue *FindKey(const TKey& key) const; + + template<typename T, typename... TArgs> + bool UpdateKey(TKey key, NTabletFlatExecutor::TTransactionContext& txc, void *cookie, T&& callback, TArgs&&... args); + + void UpdateKey(const TKey& key, const NKikimrBlobDepot::TEvCommitBlobSeq::TItem& item, + NTabletFlatExecutor::TTransactionContext& txc, void *cookie); TRecordsPerChannelGroup& GetRecordsPerChannelGroup(TLogoBlobID id); - void AddDataOnLoad(TKey key, TString value); - void AddDataOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBlob& blob, NTabletFlatExecutor::TTransactionContext& txc); + void AddDataOnLoad(TKey key, TString value, NTabletFlatExecutor::TTransactionContext& txc, void *cookie); + void AddDataOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBlob& blob, + NTabletFlatExecutor::TTransactionContext& txc, void *cookie); void AddTrashOnLoad(TLogoBlobID id); void AddGenStepOnLoad(ui8 channel, ui32 groupId, TGenStep issuedGenStep, TGenStep confirmedGenStep); - void PutKey(TKey key, TValue&& data); - - std::optional<TString> UpdateKeepState(TKey key, NKikimrBlobDepot::EKeepState keepState); - void DeleteKey(const TKey& key, const std::function<void(TLogoBlobID)>& updateTrash, void *cookie); + bool UpdateKeepState(TKey key, NKikimrBlobDepot::EKeepState keepState, + NTabletFlatExecutor::TTransactionContext& txc, void *cookie); + void DeleteKey(const TKey& key, NTabletFlatExecutor::TTransactionContext& txc, void *cookie); void CommitTrash(void *cookie); void HandleTrash(); void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev); void OnPushNotifyResult(TEvBlobDepot::TEvPushNotifyResult::TPtr ev); void OnCommitConfirmedGC(ui8 channel, ui32 groupId); + bool OnBarrierShift(ui64 tabletId, ui8 channel, bool hard, TGenStep previous, TGenStep current, ui32& maxItems, + NTabletFlatExecutor::TTransactionContext& txc, void *cookie); void AccountBlob(TLogoBlobID id, bool add); @@ -384,8 +432,6 @@ namespace NKikimr::NBlobDepot { void OnLeastExpectedBlobIdChange(ui8 channel); - static TString ToValueProto(const TValue& value); - template<typename TCallback> void EnumerateRefCount(TCallback&& callback) { for (const auto& [key, value] : RefCount) { diff --git a/ydb/core/blob_depot/data_load.cpp b/ydb/core/blob_depot/data_load.cpp index 1356ca11c11..b39f806757f 100644 --- a/ydb/core/blob_depot/data_load.cpp +++ b/ydb/core/blob_depot/data_load.cpp @@ -36,6 +36,7 @@ namespace NKikimr::NBlobDepot { TLoadState LoadState; std::unique_ptr<TTxLoad> SuccessorTx; + TTransactionContext *Txc; public: TTxLoad(TBlobDepot *self, TLoadState loadState = TLoadDataBegin{}) @@ -51,6 +52,8 @@ namespace NKikimr::NBlobDepot { bool Execute(TTransactionContext& txc, const TActorContext&) override { STLOG(PRI_DEBUG, BLOB_DEPOT, BDT28, "TData::TTxLoad::Execute", (Id, Self->GetLogId())); + Txc = &txc; + NIceDb::TNiceDb db(txc.DB); bool progress = false; auto visitor = [&](auto& item) { return ExecuteLoad(db, item, progress); }; @@ -117,7 +120,7 @@ namespace NKikimr::NBlobDepot { template<typename T> void ProcessRow(T&& row, Schema::Data*) { auto key = TData::TKey::FromBinaryKey(row.template GetValue<Schema::Data::Key>(), Self->Config); - Self->Data->AddDataOnLoad(key, row.template GetValue<Schema::Data::Value>()); + Self->Data->AddDataOnLoad(key, row.template GetValue<Schema::Data::Value>(), *Txc, this); Y_VERIFY(!Self->Data->LastLoadedKey || *Self->Data->LastLoadedKey < key); Self->Data->LastLoadedKey = std::move(key); } @@ -141,6 +144,8 @@ namespace NKikimr::NBlobDepot { STLOG(PRI_DEBUG, BLOB_DEPOT, BDT29, "TData::TTxLoad::Complete", (Id, Self->GetLogId()), (SuccessorTx, bool(SuccessorTx)), (LoadState.index, LoadState.index())); + Self->Data->CommitTrash(this); + if (SuccessorTx) { Self->Execute(std::move(SuccessorTx)); } else { diff --git a/ydb/core/blob_depot/data_resolve.cpp b/ydb/core/blob_depot/data_resolve.cpp index f8d9f897b7e..4f9464d6d30 100644 --- a/ydb/core/blob_depot/data_resolve.cpp +++ b/ydb/core/blob_depot/data_resolve.cpp @@ -111,7 +111,7 @@ namespace NKikimr::NBlobDepot { if (key != LastScannedKey) { LastScannedKey = key; progress = true; - Self->Data->AddDataOnLoad(key, rowset.template GetValue<Schema::Data::Value>()); + Self->Data->AddDataOnLoad(key, rowset.template GetValue<Schema::Data::Value>(), txc, this); const bool matchBegin = !begin || (flags & EScanFlags::INCLUDE_BEGIN ? *begin <= key : *begin < key); const bool matchEnd = !end || (flags & EScanFlags::INCLUDE_END ? key <= *end : key < *end); @@ -162,6 +162,8 @@ namespace NKikimr::NBlobDepot { (Sender, Request->Sender), (Cookie, Request->Cookie), (SuccessorTx, bool(SuccessorTx)), (Outbox.size, Outbox.size())); + Self->Data->CommitTrash(this); + if (SuccessorTx) { Self->Execute(std::move(SuccessorTx)); } else { @@ -354,13 +356,15 @@ namespace NKikimr::NBlobDepot { .Id = response.Id, .Keep = response.Keep, .DoNotKeep = response.DoNotKeep - }, txc); + }, txc, this); } } return true; } void Complete(const TActorContext&) override { + Self->Data->CommitTrash(this); + auto& contexts = Self->Data->ResolveDecommitContexts; if (const auto it = contexts.find(Ev->Cookie); it != contexts.end()) { TResolveDecommitContext& context = it->second; diff --git a/ydb/core/blob_depot/garbage_collection.cpp b/ydb/core/blob_depot/garbage_collection.cpp index 71119ed8b95..b90254400bc 100644 --- a/ydb/core/blob_depot/garbage_collection.cpp +++ b/ydb/core/blob_depot/garbage_collection.cpp @@ -4,6 +4,107 @@ namespace NKikimr::NBlobDepot { + using TAdvanceBarrierCallback = std::function<void(std::optional<TString>)>; + + class TBlobDepot::TBarrierServer::TTxAdvanceBarrier : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { + static constexpr ui32 MaxKeysToProcessAtOnce = 10'000; + + const ui64 TabletId; + const ui8 Channel; + const bool Hard; + const TGenStep GenCtr; + const TGenStep CollectGenStep; + TAdvanceBarrierCallback Callback; + + bool MoreData = false; + bool Success = false; + + public: + TTxAdvanceBarrier(TBlobDepot *self, ui64 tabletId, ui8 channel, bool hard, TGenStep genCtr, TGenStep collectGenStep, + TAdvanceBarrierCallback callback) + : TTransactionBase(self) + , TabletId(tabletId) + , Channel(channel) + , Hard(hard) + , GenCtr(genCtr) + , CollectGenStep(collectGenStep) + , Callback(std::move(callback)) + {} + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + Y_VERIFY(Self->Data->IsLoaded()); + + const auto key = std::make_tuple(TabletId, Channel); + auto& barriers = Self->BarrierServer->Barriers; + const auto [it, inserted] = barriers.try_emplace(key); + if (!inserted) { + // extract existing barrier record + auto& barrier = it->second; + const TGenStep barrierGenCtr = Hard ? barrier.HardGenCtr : barrier.SoftGenCtr; + const TGenStep barrierGenStep = Hard ? barrier.Hard : barrier.Soft; + + // validate them + if (GenCtr < barrierGenCtr) { + Callback("record generation:counter is obsolete"); + return true; + } else if (GenCtr == barrierGenCtr) { + if (barrierGenStep != CollectGenStep) { + Callback("repeated command with different collect parameters received"); + return true; + } + } else if (CollectGenStep < barrierGenStep) { + Callback("decreasing barrier"); + return true; + } + } + + TBarrier& barrier = it->second; + TGenStep& barrierGenCtr = Hard ? barrier.HardGenCtr : barrier.SoftGenCtr; + TGenStep& barrierGenStep = Hard ? barrier.Hard : barrier.Soft; + + Y_VERIFY(barrierGenCtr <= GenCtr); + Y_VERIFY(barrierGenStep <= CollectGenStep); + + ui32 maxItems = MaxKeysToProcessAtOnce; + if (!Self->Data->OnBarrierShift(TabletId, Channel, Hard, barrierGenStep, CollectGenStep, maxItems, txc, this)) { + MoreData = true; + return true; + } + + barrierGenCtr = GenCtr; + barrierGenStep = CollectGenStep; + + Self->BarrierServer->ValidateBlobInvariant(TabletId, Channel); + + auto row = NIceDb::TNiceDb(txc.DB).Table<Schema::Barriers>().Key(TabletId, Channel); + if (Hard) { + row.Update( + NIceDb::TUpdate<Schema::Barriers::HardGenCtr>(ui64(GenCtr)), + NIceDb::TUpdate<Schema::Barriers::Hard>(ui64(CollectGenStep)) + ); + } else { + row.Update( + NIceDb::TUpdate<Schema::Barriers::SoftGenCtr>(ui64(GenCtr)), + NIceDb::TUpdate<Schema::Barriers::Soft>(ui64(CollectGenStep)) + ); + } + + Success = true; + return true; + } + + void Complete(const TActorContext&) override { + Self->Data->CommitTrash(this); + + if (MoreData) { + Self->Execute(std::make_unique<TTxAdvanceBarrier>(Self, TabletId, Channel, Hard, GenCtr, CollectGenStep, + std::move(Callback))); + } else if (Success) { + Callback(std::nullopt); + } + } + }; + class TBlobDepot::TBarrierServer::TTxCollectGarbage : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { std::optional<TString> Error; @@ -13,8 +114,9 @@ namespace NKikimr::NBlobDepot { const ui8 Channel; int KeepIndex = 0; int DoNotKeepIndex = 0; + ui32 NumKeysProcessed = 0; - bool Done = false; + bool MoreData = false; static constexpr ui32 MaxKeysToProcessAtOnce = 10'000; @@ -30,69 +132,26 @@ namespace NKikimr::NBlobDepot { {} bool Execute(TTransactionContext& txc, const TActorContext&) override { - Y_VERIFY(Self->Data->IsLoaded()); - Validate(); - if (!Error) { - auto& record = Request->Get()->Record; - Done = ProcessFlags(txc, KeepIndex, record.GetKeep(), NKikimrBlobDepot::EKeepState::Keep) - && ProcessFlags(txc, DoNotKeepIndex, record.GetDoNotKeep(), NKikimrBlobDepot::EKeepState::DoNotKeep) - && ProcessBarrier(txc); - } + auto& record = Request->Get()->Record; + MoreData = !ProcessFlags(txc, KeepIndex, record.GetKeep(), NKikimrBlobDepot::EKeepState::Keep) + || !ProcessFlags(txc, DoNotKeepIndex, record.GetDoNotKeep(), NKikimrBlobDepot::EKeepState::DoNotKeep); return true; } void Complete(const TActorContext&) override { - Self->Data->CommitTrash(this); - - if (Done || Error) { - auto [response, _] = TEvBlobDepot::MakeResponseFor(*Request, Self->SelfId(), - Error ? NKikimrProto::ERROR : NKikimrProto::OK, std::move(Error)); - TActivationContext::Send(response.release()); - Barrier.ProcessingQ.pop_front(); - Self->Data->HandleTrash(); - if (!Barrier.ProcessingQ.empty()) { - Self->Execute(std::make_unique<TTxCollectGarbage>(Self, TabletId, Channel)); - } - } else { + if (MoreData) { Self->Execute(std::make_unique<TTxCollectGarbage>(Self, TabletId, Channel, KeepIndex, DoNotKeepIndex)); - } - } - - void Validate() { - // validate the command first - auto& record = Request->Get()->Record; - if (record.HasCollectGeneration() && record.HasCollectStep()) { - if (!record.HasTabletId() || !record.HasChannel() || record.GetChannel() > TLogoBlobID::MaxChannel) { - Error = "TabletId/Channel are either not set or invalid"; - } else if (!record.HasGeneration() || !record.HasPerGenerationCounter()) { - Error = "Generation/PerGenerationCounter are not set"; - } else { - const auto key = std::make_tuple(record.GetTabletId(), record.GetChannel()); - auto& barriers = Self->BarrierServer->Barriers; - if (const auto it = barriers.find(key); it != barriers.end()) { - // extract existing barrier record - auto& barrier = it->second; - const TGenStep barrierGenCtr = record.GetHard() ? barrier.HardGenCtr : barrier.SoftGenCtr; - const TGenStep barrierGenStep = record.GetHard() ? barrier.Hard : barrier.Soft; - - // extract new parameters from protobuf - const TGenStep genCtr(record.GetGeneration(), record.GetPerGenerationCounter()); - const TGenStep collectGenStep(record.GetCollectGeneration(), record.GetCollectStep()); - - // validate them - if (genCtr < barrierGenCtr) { - Error = "record generation:counter is obsolete"; - } else if (genCtr == barrierGenCtr) { - if (barrierGenStep != collectGenStep) { - Error = "repeated command with different collect parameters received"; - } - } else if (collectGenStep < barrierGenStep) { - Error = "decreasing barrier"; - } - } - } + } else if (auto& record = Request->Get()->Record; record.HasCollectGeneration() && record.HasCollectStep()) { + Self->Execute(std::make_unique<TTxAdvanceBarrier>(Self, TabletId, Channel, record.GetHard(), + TGenStep(record.GetGeneration(), record.GetPerGenerationCounter()), + TGenStep(record.GetCollectGeneration(), record.GetCollectStep()), + [self = Self, tabletId = TabletId, channel = Channel](std::optional<TString> error) { + self->BarrierServer->FinishRequest(tabletId, channel, std::move(error)); + })); } else if (record.HasCollectGeneration() || record.HasCollectStep()) { - Error = "CollectGeneration/CollectStep set incorrectly"; + Self->BarrierServer->FinishRequest(TabletId, Channel, "incorrect CollectGeneration/CollectStep setting"); + } else { // do not collect anything here + Self->BarrierServer->FinishRequest(TabletId, Channel, std::nullopt); } } @@ -103,76 +162,10 @@ namespace NKikimr::NBlobDepot { for (; index < items.size() && NumKeysProcessed < MaxKeysToProcessAtOnce; ++index) { const auto id = LogoBlobIDFromLogoBlobID(items[index]); TData::TKey key(id); - if (const auto& value = Self->Data->UpdateKeepState(key, state)) { - db.Table<Schema::Data>().Key(key.MakeBinaryKey()).Update<Schema::Data::Value>(*value); - ++NumKeysProcessed; - } + NumKeysProcessed += Self->Data->UpdateKeepState(key, state, txc, this); } - return index == items.size(); } - - bool ProcessBarrier(TTransactionContext& txc) { - NIceDb::TNiceDb db(txc.DB); - - const auto& record = Request->Get()->Record; - if (record.HasCollectGeneration() && record.HasCollectStep()) { - const bool hard = record.GetHard(); - - auto processKey = [&](const TData::TKey& key, const TData::TValue& value) { - if (value.KeepState != NKikimrBlobDepot::EKeepState::Keep || hard) { - const TLogoBlobID id(key.GetBlobId()); - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT14, "DeleteKey", (Id, Self->GetLogId()), (BlobId, id)); - db.Table<Schema::Data>().Key(key.MakeBinaryKey()).Delete(); - auto updateTrash = [&](TLogoBlobID id) { - db.Table<Schema::Trash>().Key(id.AsBinaryString()).Update(); - }; - Self->Data->DeleteKey(key, updateTrash, this); - ++NumKeysProcessed; - } - - return NumKeysProcessed < MaxKeysToProcessAtOnce; - }; - - const TData::TKey first(TLogoBlobID(record.GetTabletId(), 0, 0, record.GetChannel(), 0, 0)); - const TData::TKey last(TLogoBlobID(record.GetTabletId(), record.GetCollectGeneration(), - record.GetCollectStep(), record.GetChannel(), TLogoBlobID::MaxBlobSize, TLogoBlobID::MaxCookie, - TLogoBlobID::MaxPartId, TLogoBlobID::MaxCrcMode)); - - Self->Data->ScanRange(&first, &last, TData::EScanFlags::INCLUDE_BEGIN | TData::EScanFlags::INCLUDE_END, - processKey); - - if (NumKeysProcessed == MaxKeysToProcessAtOnce) { - return false; - } - - const auto key = std::make_tuple(record.GetTabletId(), record.GetChannel()); - auto& barrier = Self->BarrierServer->Barriers[key]; - TGenStep& barrierGenCtr = record.GetHard() ? barrier.HardGenCtr : barrier.SoftGenCtr; - TGenStep& barrierGenStep = record.GetHard() ? barrier.Hard : barrier.Soft; - - const TGenStep genCtr(record.GetGeneration(), record.GetPerGenerationCounter()); - const TGenStep collectGenStep(record.GetCollectGeneration(), record.GetCollectStep()); - Y_VERIFY(barrierGenCtr <= genCtr); - barrierGenCtr = genCtr; - Y_VERIFY(barrierGenStep <= collectGenStep); - barrierGenStep = collectGenStep; - - if (record.GetHard()) { - db.Table<Schema::Barriers>().Key(record.GetTabletId(), record.GetChannel()).Update( - NIceDb::TUpdate<Schema::Barriers::HardGenCtr>(ui64(genCtr)), - NIceDb::TUpdate<Schema::Barriers::Hard>(ui64(collectGenStep)) - ); - } else { - db.Table<Schema::Barriers>().Key(record.GetTabletId(), record.GetChannel()).Update( - NIceDb::TUpdate<Schema::Barriers::SoftGenCtr>(ui64(genCtr)), - NIceDb::TUpdate<Schema::Barriers::Soft>(ui64(collectGenStep)) - ); - } - } - - return true; - } }; void TBlobDepot::TBarrierServer::AddBarrierOnLoad(ui64 tabletId, ui8 channel, TGenStep softGenCtr, TGenStep soft, @@ -183,27 +176,33 @@ namespace NKikimr::NBlobDepot { .HardGenCtr = hardGenCtr, .Hard = hard, }; + + Self->BarrierServer->ValidateBlobInvariant(tabletId, channel); } - void TBlobDepot::TBarrierServer::AddBarrierOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBarrier& barrier, - NTabletFlatExecutor::TTransactionContext& txc) { + bool TBlobDepot::TBarrierServer::AddBarrierOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBarrier& barrier, + ui32& maxItems, NTabletFlatExecutor::TTransactionContext& txc, void *cookie) { NIceDb::TNiceDb db(txc.DB); const auto key = std::make_tuple(barrier.TabletId, barrier.Channel); auto& current = Barriers[key]; -#define DO(TYPE) \ +#define DO(TYPE, HARD) \ { \ const TGenStep barrierGenCtr(barrier.TYPE.RecordGeneration, barrier.TYPE.PerGenerationCounter); \ const TGenStep barrierCollect(barrier.TYPE.CollectGeneration, barrier.TYPE.CollectStep); \ if (current.TYPE##GenCtr < barrierGenCtr) { \ if (current.TYPE <= barrierCollect) { \ + if (!Self->Data->OnBarrierShift(barrier.TabletId, barrier.Channel, HARD, current.TYPE, \ + barrierCollect, maxItems, txc, cookie)) { \ + return false; \ + } \ current.TYPE##GenCtr = barrierGenCtr; \ current.TYPE = barrierCollect; \ db.Table<Schema::Barriers>().Key(barrier.TabletId, barrier.Channel).Update( \ NIceDb::TUpdate<Schema::Barriers::TYPE##GenCtr>(ui64(barrierGenCtr)), \ NIceDb::TUpdate<Schema::Barriers::TYPE>(ui64(barrierCollect)) \ ); \ - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT45, "replacing " #TYPE " barrier through decommission", \ + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT45, "replaced " #TYPE " barrier through decommission", \ (TabletId, barrier.TabletId), (Channel, int(barrier.Channel)), \ (GenCtr, current.TYPE##GenCtr), (Collect, current.TYPE), \ (Barrier, barrier)); \ @@ -221,8 +220,12 @@ namespace NKikimr::NBlobDepot { } \ } - DO(Hard) - DO(Soft) + DO(Hard, true) + DO(Soft, false) + + Self->BarrierServer->ValidateBlobInvariant(barrier.TabletId, barrier.Channel); + + return true; } void TBlobDepot::TBarrierServer::Handle(TEvBlobDepot::TEvCollectGarbage::TPtr ev) { @@ -251,4 +254,52 @@ namespace NKikimr::NBlobDepot { } } + void TBlobDepot::TBarrierServer::ValidateBlobInvariant(ui64 tabletId, ui8 channel) { +#ifndef NDEBUG + for (const bool hard : {true, false}) { + const auto it = Barriers.find(std::make_tuple(tabletId, channel)); + Y_VERIFY(it != Barriers.end()); + const TBarrier& barrier = it->second; + const TGenStep& barrierGenStep = hard ? barrier.Hard : barrier.Soft; + const TData::TKey first(TLogoBlobID(tabletId, 0, 0, channel, 0, 0)); + const TData::TKey last(TLogoBlobID(tabletId, barrierGenStep.Generation(), barrierGenStep.Step(), + channel, TLogoBlobID::MaxBlobSize, TLogoBlobID::MaxCookie, TLogoBlobID::MaxPartId, + TLogoBlobID::MaxCrcMode)); + Self->Data->ScanRange(&first, &last, TData::EScanFlags::INCLUDE_BEGIN | TData::EScanFlags::INCLUDE_END, + [&](const TData::TKey& key, const TData::TValue& value) { + // there must be no blobs under the hard barrier and no blobs with mode other than Keep under the soft one + Y_VERIFY_S(!hard && value.KeepState == NKikimrBlobDepot::EKeepState::Keep, "Key# " << key.ToString() + << " Value# " << value.ToString()); + return true; + }); + } +# if 0 + Self->Data->ScanRange(nullptr, nullptr, {}, [&](const TData::TKey& key, const TData::TValue& value) { + bool underSoft, underHard; + Self->BarrierServer->GetBlobBarrierRelation(key.GetBlobId(), &underSoft, &underHard); + Y_VERIFY(!underHard && (!underSoft || value.KeepState == NKikimrBlobDepot::EKeepState::Keep)); + return true; + }); +# endif +#else + Y_UNUSED(tabletId); + Y_UNUSED(channel); +#endif + } + + void TBlobDepot::TBarrierServer::FinishRequest(ui64 tabletId, ui8 channel, std::optional<TString> error) { + const auto key = std::make_tuple(tabletId, channel); + auto& barrier = Barriers[key]; + Y_VERIFY(!barrier.ProcessingQ.empty()); + auto& request = barrier.ProcessingQ.front(); + auto [response, _] = TEvBlobDepot::MakeResponseFor(*request, Self->SelfId(), + error ? NKikimrProto::ERROR : NKikimrProto::OK, std::move(error)); + TActivationContext::Send(response.release()); + barrier.ProcessingQ.pop_front(); + Self->Data->HandleTrash(); + if (!barrier.ProcessingQ.empty()) { + Self->Execute(std::make_unique<TTxCollectGarbage>(Self, tabletId, channel)); + } + } + } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/garbage_collection.h b/ydb/core/blob_depot/garbage_collection.h index 578f69ccba7..55c5fa01d6b 100644 --- a/ydb/core/blob_depot/garbage_collection.h +++ b/ydb/core/blob_depot/garbage_collection.h @@ -19,6 +19,7 @@ namespace NKikimr::NBlobDepot { THashMap<std::tuple<ui64, ui8>, TBarrier> Barriers; private: + class TTxAdvanceBarrier; class TTxCollectGarbage; public: @@ -27,11 +28,14 @@ namespace NKikimr::NBlobDepot { {} void AddBarrierOnLoad(ui64 tabletId, ui8 channel, TGenStep softGenCtr, TGenStep soft, TGenStep hardGenCtr, TGenStep hard); - void AddBarrierOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBarrier& barrier, NTabletFlatExecutor::TTransactionContext& txc); + bool AddBarrierOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBarrier& barrier, ui32& maxItems, + NTabletFlatExecutor::TTransactionContext& txc, void *cookie); void Handle(TEvBlobDepot::TEvCollectGarbage::TPtr ev); void GetBlobBarrierRelation(TLogoBlobID id, bool *underSoft, bool *underHard) const; void OnDataLoaded(); + void ValidateBlobInvariant(ui64 tabletId, ui8 channel); + TString ToStringBarrier(ui64 tabletId, ui8 channel, bool hard) const { if (const auto it = Barriers.find(std::make_tuple(tabletId, channel)); it == Barriers.end()) { return "<none>"; @@ -49,6 +53,8 @@ namespace NKikimr::NBlobDepot { callback(tabletId, channel, value.SoftGenCtr, value.Soft, value.HardGenCtr, value.Hard); } } + + void FinishRequest(ui64 tabletId, ui8 channel, std::optional<TString> error); }; } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/given_id_range.cpp b/ydb/core/blob_depot/given_id_range.cpp index f66a1f3fc05..c65ca410cea 100644 --- a/ydb/core/blob_depot/given_id_range.cpp +++ b/ydb/core/blob_depot/given_id_range.cpp @@ -145,7 +145,12 @@ namespace NKikimr::NBlobDepot { myChunk -= otherChunk; NumAvailableItems -= otherChunk.Count(); - ++myIt; + if (myChunk.Count()) { + ++myIt; + } else { + myIt = Ranges.erase(myIt); + } + ++otherIt; } } @@ -155,7 +160,11 @@ namespace NKikimr::NBlobDepot { void TGivenIdRange::Output(IOutputStream& s) const { s << "{"; - for (const auto& [key, chunk] : Ranges) { + for (auto it = Ranges.begin(); it != Ranges.end(); ++it) { + const auto& [key, chunk] = *it; + if (it != Ranges.begin()) { + s << " "; + } s << key << ":"; for (ui32 i = 0; i < chunk.Size(); ++i) { s << int(chunk[i]); diff --git a/ydb/core/blob_depot/op_commit_blob_seq.cpp b/ydb/core/blob_depot/op_commit_blob_seq.cpp index fe83420eb30..173d7eec958 100644 --- a/ydb/core/blob_depot/op_commit_blob_seq.cpp +++ b/ydb/core/blob_depot/op_commit_blob_seq.cpp @@ -31,19 +31,8 @@ namespace NKikimr::NBlobDepot { auto *responseItem = responseRecord->AddItems(); responseItem->SetStatus(NKikimrProto::OK); - NKikimrBlobDepot::TValue value; - if (item.HasMeta()) { - value.SetMeta(item.GetMeta()); - } - if (const auto keepState = Self->Data->GetKeepState(key); keepState != NKikimrBlobDepot::EKeepState::Default) { - value.SetKeepState(keepState); - } - auto *chain = value.AddValueChain(); - auto *locator = chain->MutableLocator(); - locator->CopyFrom(item.GetBlobLocator()); - - const auto blobSeqId = TBlobSeqId::FromProto(locator->GetBlobSeqId()); - const bool canBeCollected = Self->Data->CanBeCollected(locator->GetGroupId(), blobSeqId); + const auto blobSeqId = TBlobSeqId::FromProto(item.GetBlobLocator().GetBlobSeqId()); + const bool canBeCollected = Self->Data->CanBeCollected(item.GetBlobLocator().GetGroupId(), blobSeqId); if (blobSeqId.Generation == generation) { // check for internal sanity -- we can't issue barriers on given ids without confirmed trimming @@ -60,20 +49,15 @@ namespace NKikimr::NBlobDepot { } TString error; - if (!CheckKeyAgainstBarrier(key, value, &error)) { + if (!CheckKeyAgainstBarrier(key, &error)) { responseItem->SetStatus(NKikimrProto::ERROR); responseItem->SetErrorReason(TStringBuilder() << "BlobId# " << key.ToString() << " is being put beyond the barrier: " << error); continue; } - TString valueData; - const bool success = value.SerializeToString(&valueData); - Y_VERIFY(success); - - db.Table<Schema::Data>().Key(item.GetKey()).Update<Schema::Data::Value>(valueData); - - Self->Data->PutKey(std::move(key), TData::TValue(std::move(value))); + // obtain value + Self->Data->UpdateKey(key, item, txc, this); } return true; @@ -97,26 +81,29 @@ namespace NKikimr::NBlobDepot { } } - bool CheckKeyAgainstBarrier(const TData::TKey& key, const NKikimrBlobDepot::TValue& value, - TString *error) { + bool CheckKeyAgainstBarrier(const TData::TKey& key, TString *error) { const auto& v = key.AsVariant(); if (const auto *id = std::get_if<TLogoBlobID>(&v)) { bool underSoft, underHard; Self->BarrierServer->GetBlobBarrierRelation(*id, &underSoft, &underHard); if (underHard) { - *error = TStringBuilder() << "under barrier# " << Self->BarrierServer->ToStringBarrier( + *error = TStringBuilder() << "under hard barrier# " << Self->BarrierServer->ToStringBarrier( id->TabletID(), id->Channel(), true); return false; - } else if (underSoft && value.GetKeepState() != NKikimrBlobDepot::EKeepState::Keep) { - *error = TStringBuilder() << "under barrier# " << Self->BarrierServer->ToStringBarrier( - id->TabletID(), id->Channel(), false); - return false; + } else if (underSoft) { + const TData::TValue *value = Self->Data->FindKey(key); + if (!value || value->KeepState != NKikimrBlobDepot::EKeepState::Keep) { + *error = TStringBuilder() << "under soft barrier# " << Self->BarrierServer->ToStringBarrier( + id->TabletID(), id->Channel(), false); + return false; + } } } return true; } void Complete(const TActorContext&) override { + Self->Data->CommitTrash(this); Self->Data->HandleTrash(); TActivationContext::Send(Response.release()); } diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.h b/ydb/core/blobstorage/nodewarden/node_warden_impl.h index 53d8f837720..15afa7787e1 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_impl.h +++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.h @@ -459,6 +459,7 @@ namespace NKikimr::NStorage { fFunc(TEvBlobStorage::TEvRange::EventType, HandleForwarded); fFunc(TEvBlobStorage::TEvCollectGarbage::EventType, HandleForwarded); fFunc(TEvBlobStorage::TEvStatus::EventType, HandleForwarded); + fFunc(TEvBlobStorage::TEvAssimilate::EventType, HandleForwarded); fFunc(TEvBlobStorage::TEvBunchOfEvents::EventType, HandleForwarded); fFunc(TEvRequestProxySessionsState::EventType, HandleForwarded); diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_response.cpp b/ydb/core/blobstorage/vdisk/common/vdisk_response.cpp index 8613e05c0ef..df61982aa02 100644 --- a/ydb/core/blobstorage/vdisk/common/vdisk_response.cpp +++ b/ydb/core/blobstorage/vdisk/common/vdisk_response.cpp @@ -28,6 +28,10 @@ void SendVDiskResponse(const TActorContext &ctx, const TActorId &recipient, IEve HANDLE_EVENT(TEvVGetResult, EvVGetResultSent) #undef HANDLE_EVENT + + case TEvBlobStorage::EvVAssimilateResult: // override channel for assimilation result + channel = TInterconnectChannels::IC_BLOBSTORAGE_ASYNC_DATA; + break; } auto event = std::make_unique<IEventHandle>(recipient, ctx.SelfID, ev, IEventHandle::MakeFlags(channel, 0), cookie); diff --git a/ydb/core/keyvalue/keyvalue_storage_read_request.cpp b/ydb/core/keyvalue/keyvalue_storage_read_request.cpp index 182a2c513c4..d9c2625e6bb 100644 --- a/ydb/core/keyvalue/keyvalue_storage_read_request.cpp +++ b/ydb/core/keyvalue/keyvalue_storage_read_request.cpp @@ -283,11 +283,12 @@ public: "Unexpected EvGetResult.", (KeyValue, TabletInfo->TabletID), (Status, result->Status), + (Id, response.Id), (ResponseStatus, response.Status), - (Deadline, IntermediateResult->Deadline.MilliSeconds()), - (Now, TActivationContext::Now().MilliSeconds()), + (Deadline, IntermediateResult->Deadline), + (Now, TActivationContext::Now()), (SentAt, batch.SentTime), - (GotAt, IntermediateResult->Stat.IntermediateCreatedAt.MilliSeconds()), + (GotAt, IntermediateResult->Stat.IntermediateCreatedAt), (ErrorReason, result->ErrorReason)); hasErrorResponses = true; } |