diff options
author | alexvru <alexvru@ydb.tech> | 2022-11-04 19:31:40 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2022-11-04 19:31:40 +0300 |
commit | ea78535b4b129ec0c018388786eea035eb81ec2c (patch) | |
tree | bb26a02ea735c10a1af5c4511f2f650eda78b42c | |
parent | 6f61e21ce1ec50faaae313dd52b1d8578f7a924f (diff) | |
download | ydb-ea78535b4b129ec0c018388786eea035eb81ec2c.tar.gz |
Fix uncertainty resolver error
-rw-r--r-- | ydb/core/blob_depot/CMakeLists.txt | 2 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent.cpp | 2 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/storage_put.cpp | 2 | ||||
-rw-r--r-- | ydb/core/blob_depot/data.cpp | 257 | ||||
-rw-r--r-- | ydb/core/blob_depot/data.h | 37 | ||||
-rw-r--r-- | ydb/core/blob_depot/data_load.cpp | 1 | ||||
-rw-r--r-- | ydb/core/blob_depot/data_mon.cpp | 38 | ||||
-rw-r--r-- | ydb/core/blob_depot/data_trash.cpp | 179 | ||||
-rw-r--r-- | ydb/core/blob_depot/data_uncertain.cpp | 23 | ||||
-rw-r--r-- | ydb/core/blob_depot/data_uncertain.h | 8 | ||||
-rw-r--r-- | ydb/core/blob_depot/garbage_collection.cpp | 1 | ||||
-rw-r--r-- | ydb/core/blob_depot/mon_main.cpp | 169 | ||||
-rw-r--r-- | ydb/core/blob_depot/mon_main.h | 33 | ||||
-rw-r--r-- | ydb/core/blob_depot/op_commit_blob_seq.cpp | 1 |
14 files changed, 460 insertions, 293 deletions
diff --git a/ydb/core/blob_depot/CMakeLists.txt b/ydb/core/blob_depot/CMakeLists.txt index 24b4e70e6d6..b05e2e73a92 100644 --- a/ydb/core/blob_depot/CMakeLists.txt +++ b/ydb/core/blob_depot/CMakeLists.txt @@ -24,7 +24,9 @@ target_sources(ydb-core-blob_depot PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_gc.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_load.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_mon.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_resolve.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_trash.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_uncertain.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/garbage_collection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/given_id_range.cpp diff --git a/ydb/core/blob_depot/agent.cpp b/ydb/core/blob_depot/agent.cpp index b5426c3ff45..e28a2c62f86 100644 --- a/ydb/core/blob_depot/agent.cpp +++ b/ydb/core/blob_depot/agent.cpp @@ -197,8 +197,8 @@ namespace NKikimr::NBlobDepot { (Channel, channel_), (GivenIdRanges, Channels[channel_].GivenIdRanges), (Agent.GivenIdRanges, agentGivenIdRange_)); agentGivenIdRange = {}; + Data->OnLeastExpectedBlobIdChange(channel); } - Data->HandleTrash(); } void TBlobDepot::InitChannelKinds() { diff --git a/ydb/core/blob_depot/agent/storage_put.cpp b/ydb/core/blob_depot/agent/storage_put.cpp index 480992a67bf..a97a38b59a7 100644 --- a/ydb/core/blob_depot/agent/storage_put.cpp +++ b/ydb/core/blob_depot/agent/storage_put.cpp @@ -84,7 +84,7 @@ namespace NKikimr::NBlobDepot { std::optional<TBlobSeqId> blobSeqId = kind.Allocate(Agent); STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA21, "allocated BlobSeqId", (VirtualGroupId, Agent.VirtualGroupId), - (QueryId, GetQueryId()), (BlobSeqId, blobSeqId)); + (QueryId, GetQueryId()), (BlobSeqId, blobSeqId), (BlobId, msg.Id)); if (!blobSeqId) { return kind.EnqueueQueryWaitingForId(this); } diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp index ba2641188ee..a74cac40ac1 100644 --- a/ydb/core/blob_depot/data.cpp +++ b/ydb/core/blob_depot/data.cpp @@ -70,26 +70,24 @@ namespace NKikimr::NBlobDepot { } template<typename T, typename... TArgs> - bool TData::UpdateKey(TKey key, NTabletFlatExecutor::TTransactionContext& txc, void *cookie, T&& callback, TArgs&&... args) { + bool TData::UpdateKey(TKey key, NTabletFlatExecutor::TTransactionContext& txc, void *cookie, const char *reason, + T&& callback, TArgs&&... args) { bool underSoft = false, underHard = false; auto var = key.AsVariant(); if (auto *id = std::get_if<TLogoBlobID>(&var)) { Self->BarrierServer->GetBlobBarrierRelation(*id, &underSoft, &underHard); } - if (underHard || underSoft) { + if (underHard) { if (const auto it = Data.find(key); it == Data.end()) { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT59, "UpdateKey: key under hard barrier, will not be created", + (Id, Self->GetLogId()), (Key, key), (Reason, reason)); return false; // no such key existed and will not be created as it hits the barrier - } else { - Y_VERIFY_S(!underHard && it->second.KeepState == EKeepState::Keep, - "barrier invariant failed Key# " << key.ToString() << " Value# " << it->second.ToString()); } } const auto [it, inserted] = Data.try_emplace(std::move(key), std::forward<TArgs>(args)...); { auto& [key, value] = *it; - Y_VERIFY(!underHard); - Y_VERIFY(!underSoft || !inserted); std::vector<TLogoBlobID> deleteQ; const bool uncertainWriteBefore = value.UncertainWrite; @@ -106,44 +104,56 @@ namespace NKikimr::NBlobDepot { EUpdateOutcome outcome = callback(value, inserted); - Y_VERIFY(!value.UncertainWrite || !value.ValueChain.empty()); - Y_VERIFY(!inserted || outcome != EUpdateOutcome::NO_CHANGE); - if (underSoft && value.KeepState != EKeepState::Keep) { + if ((underSoft && value.KeepState != EKeepState::Keep) || underHard) { outcome = EUpdateOutcome::DROP; } + auto outcomeToString = [outcome] { + switch (outcome) { + case EUpdateOutcome::CHANGE: return "CHANGE"; + case EUpdateOutcome::NO_CHANGE: return "NO_CHANGE"; + case EUpdateOutcome::DROP: return "DROP"; + } + }; + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT60, "UpdateKey", (Id, Self->GetLogId()), (Key, key), (Reason, reason), + (Outcome, outcomeToString()), (UnderSoft, underSoft), (Inserted, inserted), (Value, value), + (UncertainWriteBefore, uncertainWriteBefore)); + EnumerateBlobsForValueChain(value.ValueChain, Self->TabletID(), [&](TLogoBlobID id, ui32, ui32) { - const auto [it, inserted] = RefCount.try_emplace(id, 1); + const auto [it, inserted] = RefCount.try_emplace(id); if (inserted) { // first mention of this id auto& record = GetRecordsPerChannelGroup(id); const auto [_, inserted] = record.Used.insert(id); Y_VERIFY(inserted); AccountBlob(id, 1); + TotalStoredDataSize += id.BlobSize(); // blob is first mentioned and deleted as well if (outcome == EUpdateOutcome::DROP) { - it->second = 0; deleteQ.push_back(id); - } else { - TotalStoredDataSize += id.BlobSize(); } - } else if (outcome != EUpdateOutcome::DROP) { + } + if (outcome != EUpdateOutcome::DROP) { ++it->second; } }); - for (const TLogoBlobID& id : deleteQ) { + auto filter = [&](const TLogoBlobID& id) { const auto it = RefCount.find(id); Y_VERIFY(it != RefCount.end()); - if (!it->second) { + if (it->second) { + return true; // remove this blob from deletion queue, it still has references + } else { InFlightTrash.emplace(cookie, id); NIceDb::TNiceDb(txc.DB).Table<Schema::Trash>().Key(id.AsBinaryString()).Update(); RefCount.erase(it); TotalStoredDataSize -= id.BlobSize(); + return false; // keep this blob in deletion queue } - } + }; + deleteQ.erase(std::remove_if(deleteQ.begin(), deleteQ.end(), filter), deleteQ.end()); if (!deleteQ.empty()) { UncertaintyResolver->DropBlobs(deleteQ); } @@ -184,7 +194,7 @@ namespace NKikimr::NBlobDepot { void TData::UpdateKey(const TKey& key, const NKikimrBlobDepot::TEvCommitBlobSeq::TItem& item, bool uncertainWrite, NTabletFlatExecutor::TTransactionContext& txc, void *cookie) { STLOG(PRI_DEBUG, BLOB_DEPOT, BDT10, "UpdateKey", (Id, Self->GetLogId()), (Key, key), (Item, item)); - UpdateKey(key, txc, cookie, [&](TValue& value, bool inserted) { + UpdateKey(key, txc, cookie, "UpdateKey", [&](TValue& value, bool inserted) { if (!inserted) { // update value items value.Meta = item.GetMeta(); value.Public = false; @@ -255,7 +265,8 @@ namespace NKikimr::NBlobDepot { TTabletStorageInfo *info = Self->Info(); const ui32 groupId = info->GroupFor(id.Channel(), id.Generation()); Y_VERIFY(groupId != Max<ui32>()); - const auto& key = std::make_tuple(id.TabletID(), id.Channel(), groupId); + Y_VERIFY(id.TabletID() == info->TabletID); + const auto& key = std::make_tuple(id.Channel(), groupId); const auto [it, _] = RecordsPerChannelGroup.emplace(std::piecewise_construct, key, key); return it->second; } @@ -266,7 +277,7 @@ namespace NKikimr::NBlobDepot { const bool success = proto.ParseFromString(value); Y_VERIFY(success); - UpdateKey(std::move(key), txc, cookie, [&](TValue& value, bool inserted) { + UpdateKey(std::move(key), txc, cookie, "AddDataOnLoad", [&](TValue& value, bool inserted) { if (!inserted) { // do some merge logic value.KeepState = Max(value.KeepState, proto.GetKeepState()); if (value.ValueChain.empty() && proto.ValueChainSize()) { @@ -286,7 +297,7 @@ namespace NKikimr::NBlobDepot { void TData::AddDataOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBlob& blob, NTabletFlatExecutor::TTransactionContext& txc, void *cookie) { - UpdateKey(TKey(blob.Id), txc, cookie, [&](TValue& value, bool inserted) { + UpdateKey(TKey(blob.Id), txc, cookie, "AddDataOnDecommit", [&](TValue& value, bool inserted) { STLOG(PRI_DEBUG, BLOB_DEPOT, BDT49, "AddDataOnDecommit", (Id, Self->GetLogId()), (Blob, blob), (Value, value), (Inserted, inserted)); @@ -309,12 +320,11 @@ namespace NKikimr::NBlobDepot { void TData::AddTrashOnLoad(TLogoBlobID id) { auto& record = GetRecordsPerChannelGroup(id); record.Trash.insert(id); - record.EnqueueForCollectionIfPossible(this); AccountBlob(id, true); } void TData::AddGenStepOnLoad(ui8 channel, ui32 groupId, TGenStep issuedGenStep, TGenStep confirmedGenStep) { - const auto& key = std::make_tuple(Self->TabletID(), channel, groupId); + const auto& key = std::make_tuple(channel, groupId); const auto [it, _] = RecordsPerChannelGroup.emplace(std::piecewise_construct, key, key); auto& record = it->second; record.IssuedGenStep = issuedGenStep; @@ -323,7 +333,7 @@ namespace NKikimr::NBlobDepot { bool TData::UpdateKeepState(TKey key, EKeepState keepState, NTabletFlatExecutor::TTransactionContext& txc, void *cookie) { - return UpdateKey(std::move(key), txc, cookie, [&](TValue& value, bool inserted) { + return UpdateKey(std::move(key), txc, cookie, "UpdateKeepState", [&](TValue& value, bool inserted) { STLOG(PRI_DEBUG, BLOB_DEPOT, BDT51, "UpdateKeepState", (Id, Self->GetLogId()), (Key, key), (KeepState, keepState), (Value, value)); if (inserted) { @@ -339,181 +349,12 @@ namespace NKikimr::NBlobDepot { void TData::DeleteKey(const TKey& key, NTabletFlatExecutor::TTransactionContext& txc, void *cookie) { STLOG(PRI_DEBUG, BLOB_DEPOT, BDT14, "DeleteKey", (Id, Self->GetLogId()), (Key, key)); - UpdateKey(key, txc, cookie, [&](TValue&, bool inserted) { + UpdateKey(key, txc, cookie, "DeleteKey", [&](TValue&, bool inserted) { Y_VERIFY(!inserted); return EUpdateOutcome::DROP; }); } - void TData::CommitTrash(void *cookie) { - auto [first, last] = InFlightTrash.equal_range(cookie); - for (auto it = first; it != last; ++it) { - auto& record = GetRecordsPerChannelGroup(it->second); - record.MoveToTrash(this, it->second); - } - InFlightTrash.erase(first, last); - } - - void TData::HandleTrash() { - const ui32 generation = Self->Executor()->Generation(); - THashMap<ui32, std::unique_ptr<TEvBlobDepot::TEvPushNotify>> outbox; - - while (RecordsWithTrash) { - TRecordsPerChannelGroup& record = *RecordsWithTrash.PopFront(); - - Y_VERIFY(!record.CollectGarbageRequestInFlight); - Y_VERIFY(record.TabletId == Self->TabletID()); - Y_VERIFY(!record.Trash.empty()); - - Y_VERIFY(record.Channel < Self->Channels.size()); - auto& channel = Self->Channels[record.Channel]; - - TGenStep nextGenStep(*--record.Trash.end()); - std::set<TLogoBlobID>::iterator trashEndIter = record.Trash.end(); - - // step we are going to invalidate (including blobs with this one) - if (TGenStep(record.LeastExpectedBlobId) <= nextGenStep) { - const ui32 invalidatedStep = nextGenStep.Step(); // the step we want to invalidate and garbage collect - - // remove invalidated step from allocations - auto blobSeqId = TBlobSeqId::FromSequentalNumber(record.Channel, generation, channel.NextBlobSeqId); - Y_VERIFY(record.LastConfirmedGenStep < TGenStep(blobSeqId)); - if (blobSeqId.Step <= invalidatedStep) { - blobSeqId.Step = invalidatedStep + 1; - blobSeqId.Index = 0; - channel.NextBlobSeqId = blobSeqId.ToSequentialNumber(); - } - - // issue notifications to agents - for (auto& [agentId, agent] : Self->Agents) { - if (!agent.AgentId) { - continue; - } - const auto [it, inserted] = agent.InvalidatedStepInFlight.emplace(record.Channel, invalidatedStep); - if (inserted || it->second < invalidatedStep) { - it->second = invalidatedStep; - - auto& ev = outbox[agentId]; - if (!ev) { - ev.reset(new TEvBlobDepot::TEvPushNotify); - } - auto *item = ev->Record.AddInvalidatedSteps(); - item->SetChannel(record.Channel); - item->SetGeneration(generation); - item->SetInvalidatedStep(invalidatedStep); - } - } - - // adjust the barrier to keep it safe now - const TLogoBlobID maxId(record.TabletId, record.LeastExpectedBlobId.Generation, - record.LeastExpectedBlobId.Step, record.Channel, 0, 0); - trashEndIter = record.Trash.lower_bound(maxId); - if (trashEndIter != record.Trash.begin()) { - nextGenStep = TGenStep(*std::prev(trashEndIter)); - } else { - nextGenStep = {}; - } - } - - TVector<TLogoBlobID> keep; - TVector<TLogoBlobID> doNotKeep; - std::vector<TLogoBlobID> trashInFlight; - - for (auto it = record.Trash.begin(); it != trashEndIter; ++it) { - if (const TGenStep genStep(*it); genStep <= record.LastConfirmedGenStep) { - doNotKeep.push_back(*it); - } else if (nextGenStep < genStep) { - Y_FAIL(); - } - trashInFlight.push_back(*it); - } - - const TLogoBlobID keepFrom(record.TabletId, record.LastConfirmedGenStep.Generation(), - record.LastConfirmedGenStep.Step(), record.Channel, TLogoBlobID::MaxBlobSize, TLogoBlobID::MaxCookie, - TLogoBlobID::MaxPartId, TLogoBlobID::MaxCrcMode); - for (auto it = record.Used.upper_bound(keepFrom); it != record.Used.end() && TGenStep(*it) <= nextGenStep; ++it) { - Y_VERIFY(record.LastConfirmedGenStep < TGenStep(*it)); - keep.push_back(*it); - } - - const bool collect = nextGenStep > record.LastConfirmedGenStep; - - if (trashInFlight.empty()) { - Y_VERIFY(keep.empty()); - continue; // nothing to do here - } - - auto keep_ = keep ? std::make_unique<TVector<TLogoBlobID>>(std::move(keep)) : nullptr; - auto doNotKeep_ = doNotKeep ? std::make_unique<TVector<TLogoBlobID>>(std::move(doNotKeep)) : nullptr; - auto ev = std::make_unique<TEvBlobStorage::TEvCollectGarbage>(record.TabletId, generation, - record.PerGenerationCounter, record.Channel, collect, nextGenStep.Generation(), nextGenStep.Step(), - keep_.get(), doNotKeep_.get(), TInstant::Max(), true); - - keep_.release(); - doNotKeep_.release(); - - record.CollectGarbageRequestInFlight = true; - record.PerGenerationCounter += ev->Collect ? ev->PerGenerationCounterStepSize() : 0; - record.TrashInFlight.swap(trashInFlight); - record.IssuedGenStep = Max(nextGenStep, record.LastConfirmedGenStep); - - Y_VERIFY(trashInFlight.empty()); - - record.TIntrusiveListItem<TRecordsPerChannelGroup, TRecordWithTrash>::Unlink(); - - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT11, "issuing TEvCollectGarbage", (Id, Self->GetLogId()), - (Channel, int(record.Channel)), (GroupId, record.GroupId), (Msg, ev->ToString()), - (LastConfirmedGenStep, record.LastConfirmedGenStep), (IssuedGenStep, record.IssuedGenStep), - (TrashInFlight.size, record.TrashInFlight.size())); - - const ui64 id = ++LastCollectCmdId; - CollectCmdToGroup.emplace(id, record.GroupId); - - if (collect) { - ExecuteIssueGC(record.Channel, record.GroupId, record.IssuedGenStep, std::move(ev), id); - } else { - SendToBSProxy(Self->SelfId(), record.GroupId, ev.release(), id); - } - } - - for (auto& [agentId, ev] : outbox) { - TAgent& agent = Self->GetAgent(agentId); - const ui64 id = ++agent.LastRequestId; - auto& request = agent.InvalidateStepRequests[id]; - for (const auto& item : ev->Record.GetInvalidatedSteps()) { - request[item.GetChannel()] = item.GetInvalidatedStep(); - } - - Y_VERIFY(agent.AgentId); - agent.PushCallbacks.emplace(id, std::bind(&TData::OnPushNotifyResult, this, std::placeholders::_1)); - TActivationContext::Send(new IEventHandle(*agent.AgentId, Self->SelfId(), ev.release(), 0, id)); - } - } - - void TData::Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev) { - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT12, "TEvCollectGarbageResult", (Id, Self->GetLogId()), - (Channel, ev->Get()->Channel), (GroupId, ev->Cookie), (Msg, ev->Get()->ToString())); - - auto cmd = CollectCmdToGroup.extract(ev->Cookie); - Y_VERIFY(cmd); - const ui32 groupId = cmd.mapped(); - - const auto& key = std::make_tuple(ev->Get()->TabletId, ev->Get()->Channel, groupId); - const auto it = RecordsPerChannelGroup.find(key); - Y_VERIFY(it != RecordsPerChannelGroup.end()); - auto& record = it->second; - if (ev->Get()->Status == NKikimrProto::OK) { - Y_VERIFY(record.CollectGarbageRequestInFlight); - record.OnSuccessfulCollect(this); - ExecuteConfirmGC(record.Channel, record.GroupId, std::exchange(record.TrashInFlight, {}), - record.LastConfirmedGenStep); - } else { - record.TrashInFlight.clear(); - record.ClearInFlight(this); - HandleTrash(); - } - } - void TData::OnPushNotifyResult(TEvBlobDepot::TEvPushNotifyResult::TPtr ev) { TAgent& agent = Self->GetAgent(ev->Recipient); @@ -570,15 +411,6 @@ namespace NKikimr::NBlobDepot { OnLeastExpectedBlobIdChange(channel); } - - HandleTrash(); - } - - void TData::OnCommitConfirmedGC(ui8 channel, ui32 groupId) { - const auto& key = std::make_tuple(Self->TabletID(), channel, groupId); - const auto it = RecordsPerChannelGroup.find(key); - Y_VERIFY(it != RecordsPerChannelGroup.end()); - it->second.ClearInFlight(this); } bool TData::OnBarrierShift(ui64 tabletId, ui8 channel, bool hard, TGenStep previous, TGenStep current, ui32& maxItems, @@ -616,7 +448,7 @@ namespace NKikimr::NBlobDepot { } bool TData::CanBeCollected(ui32 groupId, TBlobSeqId id) const { - const auto it = RecordsPerChannelGroup.find(std::make_tuple(Self->TabletID(), id.Channel, groupId)); + const auto it = RecordsPerChannelGroup.find(std::make_tuple(id.Channel, groupId)); return it != RecordsPerChannelGroup.end() && TGenStep(id) <= it->second.IssuedGenStep; } @@ -632,18 +464,17 @@ namespace NKikimr::NBlobDepot { const TTabletChannelInfo *storageChannel = info->ChannelInfo(leastExpectedBlobId.Channel); Y_VERIFY(storageChannel); for (const auto& entry : storageChannel->History) { - const auto& key = std::make_tuple(info->TabletID, storageChannel->Channel, entry.GroupID); + const auto& key = std::make_tuple(storageChannel->Channel, entry.GroupID); auto [it, _] = RecordsPerChannelGroup.emplace(std::piecewise_construct, key, key); auto& record = it->second; record.OnLeastExpectedBlobIdChange(this, leastExpectedBlobId); } } - void TData::TRecordsPerChannelGroup::MoveToTrash(TData *self, TLogoBlobID id) { + void TData::TRecordsPerChannelGroup::MoveToTrash(TLogoBlobID id) { const auto usedIt = Used.find(id); Y_VERIFY(usedIt != Used.end()); Trash.insert(Used.extract(usedIt)); - EnqueueForCollectionIfPossible(self); } void TData::TRecordsPerChannelGroup::OnSuccessfulCollect(TData *self) { @@ -662,19 +493,19 @@ namespace NKikimr::NBlobDepot { << " Next# " << leastExpectedBlobId.ToString()); if (LeastExpectedBlobId < leastExpectedBlobId) { LeastExpectedBlobId = leastExpectedBlobId; - EnqueueForCollectionIfPossible(self); + CollectIfPossible(self); } } void TData::TRecordsPerChannelGroup::ClearInFlight(TData *self) { Y_VERIFY(CollectGarbageRequestInFlight); CollectGarbageRequestInFlight = false; - EnqueueForCollectionIfPossible(self); + CollectIfPossible(self); } - void TData::TRecordsPerChannelGroup::EnqueueForCollectionIfPossible(TData *self) { - if (!CollectGarbageRequestInFlight && TabletId == self->Self->TabletID() && Empty() && !Trash.empty()) { - self->RecordsWithTrash.PushBack(this); + void TData::TRecordsPerChannelGroup::CollectIfPossible(TData *self) { + if (!CollectGarbageRequestInFlight && !Trash.empty()) { + self->HandleTrash(*this); } } diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h index 2bed10ad48a..2e12413baf7 100644 --- a/ydb/core/blob_depot/data.h +++ b/ydb/core/blob_depot/data.h @@ -318,10 +318,7 @@ namespace NKikimr::NBlobDepot { private: struct TRecordWithTrash {}; - struct TRecordsPerChannelGroup - : TIntrusiveListItem<TRecordsPerChannelGroup, TRecordWithTrash> - { - const ui64 TabletId; + struct TRecordsPerChannelGroup { const ui8 Channel; const ui32 GroupId; @@ -334,24 +331,22 @@ namespace NKikimr::NBlobDepot { bool CollectGarbageRequestInFlight = false; TBlobSeqId LeastExpectedBlobId; - TRecordsPerChannelGroup(ui64 tabletId, ui8 channel, ui32 groupId) - : TabletId(tabletId) - , Channel(channel) + TRecordsPerChannelGroup(ui8 channel, ui32 groupId) + : Channel(channel) , GroupId(groupId) {} - void MoveToTrash(TData *self, TLogoBlobID id); + void MoveToTrash(TLogoBlobID id); void OnSuccessfulCollect(TData *self); void OnLeastExpectedBlobIdChange(TData *self, TBlobSeqId leastExpectedBlobId); void ClearInFlight(TData *self); - void EnqueueForCollectionIfPossible(TData *self); + void CollectIfPossible(TData *self); }; bool Loaded = false; std::map<TKey, TValue> Data; THashMap<TLogoBlobID, ui32> RefCount; - THashMap<std::tuple<ui64, ui8, ui32>, TRecordsPerChannelGroup> RecordsPerChannelGroup; - TIntrusiveList<TRecordsPerChannelGroup, TRecordWithTrash> RecordsWithTrash; + THashMap<std::tuple<ui8, ui32>, TRecordsPerChannelGroup> RecordsPerChannelGroup; std::optional<TKey> LastLoadedKey; // keys are being loaded in ascending order std::optional<TLogoBlobID> LastAssimilatedBlobId; ui64 TotalStoredDataSize = 0; @@ -422,10 +417,21 @@ namespace NKikimr::NBlobDepot { return true; } + template<typename TCallback> + void ShowRange(const std::optional<TKey>& seek, ui32 rowsBefore, ui32 rowsAfter, TCallback&& callback) { + auto it = seek ? Data.lower_bound(*seek) : Data.begin(); + for (; it != Data.begin() && rowsBefore; --it, --rowsBefore, ++rowsAfter) + {} + for (; it != Data.end() && rowsAfter; ++it, --rowsAfter) { + callback(it->first, it->second); + } + } + const TValue *FindKey(const TKey& key) const; template<typename T, typename... TArgs> - bool UpdateKey(TKey key, NTabletFlatExecutor::TTransactionContext& txc, void *cookie, T&& callback, TArgs&&... args); + bool UpdateKey(TKey key, NTabletFlatExecutor::TTransactionContext& txc, void *cookie, const char *reason, + T&& callback, TArgs&&... args); void UpdateKey(const TKey& key, const NKikimrBlobDepot::TEvCommitBlobSeq::TItem& item, bool uncertainWrite, NTabletFlatExecutor::TTransactionContext& txc, void *cookie); @@ -441,11 +447,10 @@ namespace NKikimr::NBlobDepot { void AddTrashOnLoad(TLogoBlobID id); void AddGenStepOnLoad(ui8 channel, ui32 groupId, TGenStep issuedGenStep, TGenStep confirmedGenStep); - bool UpdateKeepState(TKey key, EKeepState keepState, - NTabletFlatExecutor::TTransactionContext& txc, void *cookie); + bool UpdateKeepState(TKey key, EKeepState keepState, NTabletFlatExecutor::TTransactionContext& txc, void *cookie); void DeleteKey(const TKey& key, NTabletFlatExecutor::TTransactionContext& txc, void *cookie); void CommitTrash(void *cookie); - void HandleTrash(); + void HandleTrash(TRecordsPerChannelGroup& record); void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev); void OnPushNotifyResult(TEvBlobDepot::TEvPushNotifyResult::TPtr ev); void OnCommitConfirmedGC(ui8 channel, ui32 groupId); @@ -486,6 +491,8 @@ namespace NKikimr::NBlobDepot { return TotalStoredDataSize; } + void RenderMainPage(IOutputStream& s); + private: void ExecuteIssueGC(ui8 channel, ui32 groupId, TGenStep issuedGenStep, std::unique_ptr<TEvBlobStorage::TEvCollectGarbage> collectGarbage, ui64 cookie); diff --git a/ydb/core/blob_depot/data_load.cpp b/ydb/core/blob_depot/data_load.cpp index fc9b9e2e97b..e2a487bf62b 100644 --- a/ydb/core/blob_depot/data_load.cpp +++ b/ydb/core/blob_depot/data_load.cpp @@ -162,7 +162,6 @@ namespace NKikimr::NBlobDepot { void TData::OnLoadComplete() { Loaded = true; Self->BarrierServer->OnDataLoaded(); - HandleTrash(); } } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/data_mon.cpp b/ydb/core/blob_depot/data_mon.cpp new file mode 100644 index 00000000000..e7f68c4eb45 --- /dev/null +++ b/ydb/core/blob_depot/data_mon.cpp @@ -0,0 +1,38 @@ +#include "data.h" +#include "data_uncertain.h" +#include "mon_main.h" + +namespace NKikimr::NBlobDepot { + + using TData = TBlobDepot::TData; + + void TData::RenderMainPage(IOutputStream& s) { + HTML(s) { + DIV_CLASS("panel panel-info") { + DIV_CLASS("panel-heading") { + s << "Main"; + } + DIV_CLASS("panel-body") { + KEYVALUE_TABLE({ + KEYVALUE_P("Loaded", Loaded ? "true" : "false"); + KEYVALUE_P("Last loaded key", LastLoadedKey ? LastLoadedKey->ToString() : "<null>"); + KEYVALUE_P("Last assimilated blob id", LastAssimilatedBlobId ? LastAssimilatedBlobId->ToString() : "<null>"); + KEYVALUE_P("Data size, number of keys", Data.size()); + KEYVALUE_P("RefCount size, number of blobs", RefCount.size()); + KEYVALUE_P("Total stored data size, bytes", FormatByteSize(TotalStoredDataSize)); + KEYVALUE_P("Keys made certain, number of keys", KeysMadeCertain.size()); + }) + } + } + DIV_CLASS("panel panel-info") { + DIV_CLASS("panel-heading") { + s << "Uncertainty resolver"; + } + DIV_CLASS("panel-body") { + UncertaintyResolver->RenderMainPage(s); + } + } + } + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/data_trash.cpp b/ydb/core/blob_depot/data_trash.cpp new file mode 100644 index 00000000000..91b68d09479 --- /dev/null +++ b/ydb/core/blob_depot/data_trash.cpp @@ -0,0 +1,179 @@ +#include "data.h" + +namespace NKikimr::NBlobDepot { + + using TData = TBlobDepot::TData; + + void TData::CommitTrash(void *cookie) { + auto [first, last] = InFlightTrash.equal_range(cookie); + std::unordered_set<TRecordsPerChannelGroup*> records; + for (auto it = first; it != last; ++it) { + auto& record = GetRecordsPerChannelGroup(it->second); + record.MoveToTrash(it->second); + records.insert(&record); + } + InFlightTrash.erase(first, last); + + for (TRecordsPerChannelGroup *record : records) { + record->CollectIfPossible(this); + } + } + + void TData::HandleTrash(TRecordsPerChannelGroup& record) { + const ui32 generation = Self->Executor()->Generation(); + THashMap<ui32, std::unique_ptr<TEvBlobDepot::TEvPushNotify>> outbox; + + Y_VERIFY(!record.CollectGarbageRequestInFlight); + Y_VERIFY(!record.Trash.empty()); + + Y_VERIFY(record.Channel < Self->Channels.size()); + auto& channel = Self->Channels[record.Channel]; + + TGenStep nextGenStep(*--record.Trash.end()); + std::set<TLogoBlobID>::iterator trashEndIter = record.Trash.end(); + + // step we are going to invalidate (including blobs with this one) + if (TGenStep(record.LeastExpectedBlobId) <= nextGenStep) { + const ui32 invalidatedStep = nextGenStep.Step(); // the step we want to invalidate and garbage collect + + // remove invalidated step from allocations + auto blobSeqId = TBlobSeqId::FromSequentalNumber(record.Channel, generation, channel.NextBlobSeqId); + Y_VERIFY(record.LastConfirmedGenStep < TGenStep(blobSeqId)); + if (blobSeqId.Step <= invalidatedStep) { + blobSeqId.Step = invalidatedStep + 1; + blobSeqId.Index = 0; + channel.NextBlobSeqId = blobSeqId.ToSequentialNumber(); + } + + // issue notifications to agents + for (auto& [agentId, agent] : Self->Agents) { + if (!agent.AgentId) { + continue; + } + const auto [it, inserted] = agent.InvalidatedStepInFlight.emplace(record.Channel, invalidatedStep); + if (inserted || it->second < invalidatedStep) { + it->second = invalidatedStep; + + auto& ev = outbox[agentId]; + if (!ev) { + ev.reset(new TEvBlobDepot::TEvPushNotify); + } + auto *item = ev->Record.AddInvalidatedSteps(); + item->SetChannel(record.Channel); + item->SetGeneration(generation); + item->SetInvalidatedStep(invalidatedStep); + } + } + + // adjust the barrier to keep it safe now + const TLogoBlobID maxId(Self->TabletID(), record.LeastExpectedBlobId.Generation, + record.LeastExpectedBlobId.Step, record.Channel, 0, 0); + trashEndIter = record.Trash.lower_bound(maxId); + if (trashEndIter != record.Trash.begin()) { + nextGenStep = TGenStep(*std::prev(trashEndIter)); + } else { + nextGenStep = {}; + } + } + + TVector<TLogoBlobID> keep; + TVector<TLogoBlobID> doNotKeep; + std::vector<TLogoBlobID> trashInFlight; + + for (auto it = record.Trash.begin(); it != trashEndIter; ++it) { + if (const TGenStep genStep(*it); genStep <= record.LastConfirmedGenStep) { + doNotKeep.push_back(*it); + } else if (nextGenStep < genStep) { + Y_FAIL(); + } + trashInFlight.push_back(*it); + } + + const TLogoBlobID keepFrom(Self->TabletID(), record.LastConfirmedGenStep.Generation(), + record.LastConfirmedGenStep.Step(), record.Channel, TLogoBlobID::MaxBlobSize, TLogoBlobID::MaxCookie, + TLogoBlobID::MaxPartId, TLogoBlobID::MaxCrcMode); + for (auto it = record.Used.upper_bound(keepFrom); it != record.Used.end() && TGenStep(*it) <= nextGenStep; ++it) { + Y_VERIFY(record.LastConfirmedGenStep < TGenStep(*it)); + keep.push_back(*it); + } + + const bool collect = nextGenStep > record.LastConfirmedGenStep; + + if (trashInFlight.empty()) { + Y_VERIFY(keep.empty()); // nothing to do here + } else { + auto keep_ = keep ? std::make_unique<TVector<TLogoBlobID>>(std::move(keep)) : nullptr; + auto doNotKeep_ = doNotKeep ? std::make_unique<TVector<TLogoBlobID>>(std::move(doNotKeep)) : nullptr; + + auto ev = std::make_unique<TEvBlobStorage::TEvCollectGarbage>(Self->TabletID(), generation, + record.PerGenerationCounter, record.Channel, collect, nextGenStep.Generation(), nextGenStep.Step(), + keep_.get(), doNotKeep_.get(), TInstant::Max(), true); + + keep_.release(); + doNotKeep_.release(); + + record.CollectGarbageRequestInFlight = true; + record.PerGenerationCounter += ev->Collect ? ev->PerGenerationCounterStepSize() : 0; + record.TrashInFlight.swap(trashInFlight); + record.IssuedGenStep = Max(nextGenStep, record.LastConfirmedGenStep); + + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT11, "issuing TEvCollectGarbage", (Id, Self->GetLogId()), + (Channel, int(record.Channel)), (GroupId, record.GroupId), (Msg, ev->ToString()), + (LastConfirmedGenStep, record.LastConfirmedGenStep), (IssuedGenStep, record.IssuedGenStep), + (TrashInFlight.size, record.TrashInFlight.size())); + + const ui64 id = ++LastCollectCmdId; + CollectCmdToGroup.emplace(id, record.GroupId); + + if (collect) { + ExecuteIssueGC(record.Channel, record.GroupId, record.IssuedGenStep, std::move(ev), id); + } else { + SendToBSProxy(Self->SelfId(), record.GroupId, ev.release(), id); + } + } + + for (auto& [agentId, ev] : outbox) { + TAgent& agent = Self->GetAgent(agentId); + const ui64 id = ++agent.LastRequestId; + auto& request = agent.InvalidateStepRequests[id]; + for (const auto& item : ev->Record.GetInvalidatedSteps()) { + request[item.GetChannel()] = item.GetInvalidatedStep(); + } + + Y_VERIFY(agent.AgentId); + agent.PushCallbacks.emplace(id, std::bind(&TData::OnPushNotifyResult, this, std::placeholders::_1)); + TActivationContext::Send(new IEventHandle(*agent.AgentId, Self->SelfId(), ev.release(), 0, id)); + } + } + + void TData::Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev) { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT12, "TEvCollectGarbageResult", (Id, Self->GetLogId()), + (Channel, ev->Get()->Channel), (GroupId, ev->Cookie), (Msg, ev->Get()->ToString())); + + auto cmd = CollectCmdToGroup.extract(ev->Cookie); + Y_VERIFY(cmd); + const ui32 groupId = cmd.mapped(); + + const auto& key = std::make_tuple(ev->Get()->Channel, groupId); + const auto it = RecordsPerChannelGroup.find(key); + Y_VERIFY(it != RecordsPerChannelGroup.end()); + auto& record = it->second; + if (ev->Get()->Status == NKikimrProto::OK) { + Y_VERIFY(record.CollectGarbageRequestInFlight); + record.OnSuccessfulCollect(this); + ExecuteConfirmGC(record.Channel, record.GroupId, std::exchange(record.TrashInFlight, {}), + record.LastConfirmedGenStep); + } else { + record.TrashInFlight.clear(); + record.ClearInFlight(this); + } + } + + void TData::OnCommitConfirmedGC(ui8 channel, ui32 groupId) { + const auto& key = std::make_tuple(channel, groupId); + const auto it = RecordsPerChannelGroup.find(key); + Y_VERIFY(it != RecordsPerChannelGroup.end()); + it->second.ClearInFlight(this); + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/data_uncertain.cpp b/ydb/core/blob_depot/data_uncertain.cpp index 5474a98f833..5404c3455dc 100644 --- a/ydb/core/blob_depot/data_uncertain.cpp +++ b/ydb/core/blob_depot/data_uncertain.cpp @@ -1,5 +1,6 @@ #include "data.h" #include "data_uncertain.h" +#include "mon_main.h" namespace NKikimr::NBlobDepot { @@ -32,6 +33,8 @@ namespace NKikimr::NBlobDepot { if (entry->NumUncertainKeys == 0) { // we had no more uncertain keys to resolve entry->Result.Send(Self->SelfId(), NKikimrProto::OK, std::nullopt); + } else { + NumKeysQueried += entry->NumUncertainKeys; } } @@ -47,6 +50,7 @@ namespace NKikimr::NBlobDepot { void TData::TUncertaintyResolver::DropKey(const TKey& key) { FinishKey(key, false); + ++NumKeysDropped; } void TData::TUncertaintyResolver::Handle(TEvBlobStorage::TEvGetResult::TPtr ev) { @@ -97,6 +101,7 @@ namespace NKikimr::NBlobDepot { const ui32 groupId = Self->Info()->GroupFor(id.Channel(), id.Generation()); SendToBSProxy(Self->SelfId(), groupId, new TEvBlobStorage::TEvGet(id, 0, 0, TInstant::Max(), NKikimrBlobStorage::EGetHandleClass::FastRead, true, true)); + ++NumGetsIssued; } okay = false; @@ -116,10 +121,12 @@ namespace NKikimr::NBlobDepot { case EKeyBlobState::WASNT_WRITTEN: // the blob hasn't been written completely; this may also be a race when it is being written // right now, but we are asking for the data too early (like in scan request) + okay = false; break; case EKeyBlobState::ERROR: // we can't figure out this blob's state + okay = false; break; } }); @@ -139,6 +146,8 @@ namespace NKikimr::NBlobDepot { return; } + ++(success ? NumKeysResolved : NumKeysUnresolved); + auto item = Keys.extract(keyIt); auto& keyContext = item.mapped(); @@ -166,4 +175,18 @@ namespace NKikimr::NBlobDepot { } } + void TData::TUncertaintyResolver::RenderMainPage(IOutputStream& s) { + HTML(s) { + KEYVALUE_TABLE({ + KEYVALUE_P("Keys queried", NumKeysQueried); + KEYVALUE_P("Gets issued", NumGetsIssued); + KEYVALUE_P("Keys resolved", NumKeysResolved); + KEYVALUE_P("Keys unresolved", NumKeysUnresolved); + KEYVALUE_P("Keys dropped", NumKeysDropped); + KEYVALUE_P("Keys being processed", Keys.size()); + KEYVALUE_P("Blobs in flight", Blobs.size()); + }) + } + } + } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/data_uncertain.h b/ydb/core/blob_depot/data_uncertain.h index 66ca3a7ab6d..da42cfc47ce 100644 --- a/ydb/core/blob_depot/data_uncertain.h +++ b/ydb/core/blob_depot/data_uncertain.h @@ -48,6 +48,12 @@ namespace NKikimr::NBlobDepot { TKeys Keys; std::unordered_map<TLogoBlobID, TBlobContext> Blobs; + ui64 NumKeysQueried = 0; + ui64 NumGetsIssued = 0; + ui64 NumKeysResolved = 0; + ui64 NumKeysUnresolved = 0; + ui64 NumKeysDropped = 0; + public: TUncertaintyResolver(TBlobDepot *self); void PushResultWithUncertainties(TResolveResultAccumulator&& result, std::deque<TKey>&& uncertainties); @@ -56,6 +62,8 @@ namespace NKikimr::NBlobDepot { void DropKey(const TKey& key); void Handle(TEvBlobStorage::TEvGetResult::TPtr ev); + void RenderMainPage(IOutputStream& s); + private: void FinishBlob(TLogoBlobID id, EKeyBlobState state); void CheckAndFinishKeyIfPossible(TKeys::value_type *keyRecord); diff --git a/ydb/core/blob_depot/garbage_collection.cpp b/ydb/core/blob_depot/garbage_collection.cpp index b90254400bc..62f7088b605 100644 --- a/ydb/core/blob_depot/garbage_collection.cpp +++ b/ydb/core/blob_depot/garbage_collection.cpp @@ -296,7 +296,6 @@ namespace NKikimr::NBlobDepot { error ? NKikimrProto::ERROR : NKikimrProto::OK, std::move(error)); TActivationContext::Send(response.release()); barrier.ProcessingQ.pop_front(); - Self->Data->HandleTrash(); if (!barrier.ProcessingQ.empty()) { Self->Execute(std::make_unique<TTxCollectGarbage>(Self, tabletId, channel)); } diff --git a/ydb/core/blob_depot/mon_main.cpp b/ydb/core/blob_depot/mon_main.cpp index 2cb2f3c0c81..28d4711477e 100644 --- a/ydb/core/blob_depot/mon_main.cpp +++ b/ydb/core/blob_depot/mon_main.cpp @@ -3,6 +3,7 @@ #include "garbage_collection.h" #include "blocks.h" #include "space_monitor.h" +#include "mon_main.h" namespace NKikimr::NBlobDepot { @@ -113,6 +114,10 @@ namespace NKikimr::NBlobDepot { } void RenderDataTable(bool header) { + std::optional<TData::TKey> seek; + ui32 rowsAfter = 100; + ui32 rowsBefore = 0; + HTML(Stream) { if (header) { TABLEH() { Stream << "key"; } @@ -120,7 +125,73 @@ namespace NKikimr::NBlobDepot { TABLEH() { Stream << "keep state"; } TABLEH() { Stream << "barrier"; } } else { - Self->Data->ScanRange(nullptr, nullptr, 0, [&](const TData::TKey& key, const TData::TValue& value) { + const TCgiParameters& cgi = Request->Get()->Cgi(); + if (cgi.Has("seek")) { + if (const TString& value = cgi.Get("seek")) { + if (Self->Config.HasVirtualGroupId()) { + TString error; + TLogoBlobID id; + if (TLogoBlobID::Parse(id, value, error)) { + seek.emplace(id); + } else { + DIV() { + Stream << "invalid seek value: " << error; + } + } + } else { + seek.emplace(value); + } + } + } + if (cgi.Has("rowsBefore") && !TryFromString(cgi.Get("rowsBefore"), rowsBefore)) { + DIV() { + Stream << "invalid rowsBefore value"; + } + } + if (cgi.Has("rowsAfter") && !TryFromString(cgi.Get("rowsAfter"), rowsAfter)) { + DIV() { + Stream << "invalid rowsAfter value"; + } + } + + FORM_CLASS("form-horizontal") { + DIV_CLASS("control-group") { + LABEL_CLASS_FOR("control-label", "inputSeek") { + Stream << "Seek"; + } + DIV_CLASS("controls") { + Stream << "<input id='inputSeek' name='seek' type='text' value='" << cgi.Get("seek") << "'/>"; + } + } + DIV_CLASS("control-group") { + LABEL_CLASS_FOR("control-label", "inputRowsBefore") { + Stream << "Rows before"; + } + DIV_CLASS("controls") { + Stream << "<input id='inputRowsBefore' name='rowsBefore' type='number' value='" << + rowsBefore << "'/>"; + } + } + DIV_CLASS("control-group") { + LABEL_CLASS_FOR("control-label", "inputRowsAfter") { + Stream << "Rows after"; + } + DIV_CLASS("controls") { + Stream << "<input id='inputRowsAfter' name='rowsAfter' type='number' value='" << + rowsAfter << "'/>"; + } + } + DIV_CLASS("control-group") { + DIV_CLASS("controls") { + Stream << "<input type='hidden' name='TabletID' value='" << Self->TabletID() << "'/>"; + Stream << "<input type='hidden' name='page' value='data'/>"; + Stream << "<input type='hidden' name='table' value='data'/>"; + Stream << "<button type='submit' class='btn btn-default'>Show</button>"; + } + } + } + + Self->Data->ShowRange(seek, rowsBefore, rowsAfter, [&](const TData::TKey& key, const TData::TValue& value) { TABLER() { TABLED() { key.Output(Stream); @@ -158,7 +229,6 @@ namespace NKikimr::NBlobDepot { } } } - return true; }); } } @@ -261,19 +331,12 @@ namespace NKikimr::NBlobDepot { space.try_emplace(groupId); } - auto outSize = [&](ui64 size) { - static const char *suffixes[] = { - "B", "KiB", "MiB", "GiB", "TiB", "PiB", nullptr - }; - FormatHumanReadable(Stream, size, 1024, 2, suffixes); - }; - for (const auto& [groupId, value] : space) { const auto& [current, total] = value; TABLER() { TABLED() { Stream << groupId; } - TABLED() { outSize(current); } - TABLED() { outSize(total); } + TABLED() { Stream << FormatByteSize(current); } + TABLED() { Stream << FormatByteSize(total); } const auto it = Self->SpaceMonitor->Groups.find(groupId); if (it != Self->SpaceMonitor->Groups.end()) { @@ -345,61 +408,47 @@ namespace NKikimr::NBlobDepot { s << "Stats"; } DIV_CLASS("panel-body") { - TABLE_CLASS("table") { - TABLEHEAD() { - TABLER() { - TABLEH() { s << "Parameter"; } - TABLEH() { s << "Value"; } + KEYVALUE_TABLE({ + + TABLER() { + TABLED() { s << "Data, bytes"; } + TABLED() { + ui64 total = 0; + Data->EnumerateRefCount([&](TLogoBlobID id, ui32 /*refCount*/) { + total += id.BlobSize(); + }); + s << FormatByteSize(total); } } - TABLEBODY() { - auto outSize = [&](ui64 size) { - static const char *suffixes[] = { - "B", "KiB", "MiB", "GiB", "TiB", "PiB", nullptr - }; - FormatHumanReadable(s, size, 1024, 2, suffixes); - }; - TABLER() { - TABLED() { s << "Data, bytes"; } - TABLED() { - ui64 total = 0; - Data->EnumerateRefCount([&](TLogoBlobID id, ui32 /*refCount*/) { - total += id.BlobSize(); - }); - outSize(total); - } - } - - ui64 trashInFlight = 0; - ui64 trashPending = 0; - Data->EnumerateTrash([&](ui32 /*groupId*/, TLogoBlobID id, bool inFlight) { - (inFlight ? trashInFlight : trashPending) += id.BlobSize(); - }); - TABLER() { - TABLED() { s << "Trash in flight, bytes"; } - TABLED() { outSize(trashInFlight); } - } + ui64 trashInFlight = 0; + ui64 trashPending = 0; + Data->EnumerateTrash([&](ui32 /*groupId*/, TLogoBlobID id, bool inFlight) { + (inFlight ? trashInFlight : trashPending) += id.BlobSize(); + }); - TABLER() { - TABLED() { s << "Trash pending, bytes"; } - TABLED() { outSize(trashPending); } - } + KEYVALUE_P("Trash in flight, bytes", FormatByteSize(trashInFlight)); + KEYVALUE_P("Trash pending, bytes", FormatByteSize(trashPending)); - std::vector<ui32> groups; - for (const auto& [groupId, _] : Groups) { - groups.push_back(groupId); - } - std::sort(groups.begin(), groups.end()); - for (const ui32 groupId : groups) { - TGroupInfo& group = Groups[groupId]; - TABLER() { - TABLED() { s << "Data in GroupId# " << groupId << ", bytes"; } - TABLED() { outSize(group.AllocatedBytes); } - } - } + std::vector<ui32> groups; + for (const auto& [groupId, _] : Groups) { + groups.push_back(groupId); } - } + std::sort(groups.begin(), groups.end()); + for (const ui32 groupId : groups) { + TGroupInfo& group = Groups[groupId]; + KEYVALUE_P(TStringBuilder() << "Data in GroupId# " << groupId << ", bytes", + FormatByteSize(group.AllocatedBytes)); + } + }) + } + } + DIV_CLASS("panel panel-info") { + DIV_CLASS("panel-heading") { + s << "Data"; + } + DIV_CLASS("panel-body") { + Data->RenderMainPage(s); } } } diff --git a/ydb/core/blob_depot/mon_main.h b/ydb/core/blob_depot/mon_main.h new file mode 100644 index 00000000000..b9aa0fd39bc --- /dev/null +++ b/ydb/core/blob_depot/mon_main.h @@ -0,0 +1,33 @@ +#pragma once + +#include "defs.h" + +namespace NKikimr::NBlobDepot { + + #define KEYVALUE_TABLE(BODY) \ + TABLE_CLASS("table") { \ + TABLEHEAD() { \ + TABLER() { \ + TABLEH() { s << "Parameter"; } \ + TABLEH() { s << "Value"; } \ + } \ + } \ + TABLEBODY() { \ + BODY \ + } \ + } + + #define KEYVALUE_P(KEY, VALUE) \ + TABLER() { \ + TABLED() { __stream << (KEY); } \ + TABLED() { __stream << (VALUE); } \ + } + + inline TString FormatByteSize(ui64 size) { + static const char *suffixes[] = {"B", "KiB", "MiB", "GiB", "TiB", "PiB", nullptr}; + TStringStream s; + FormatHumanReadable(s, size, 1024, 2, suffixes); + return s.Str(); + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/op_commit_blob_seq.cpp b/ydb/core/blob_depot/op_commit_blob_seq.cpp index fa51308a05a..a8501a57d13 100644 --- a/ydb/core/blob_depot/op_commit_blob_seq.cpp +++ b/ydb/core/blob_depot/op_commit_blob_seq.cpp @@ -124,7 +124,6 @@ namespace NKikimr::NBlobDepot { void Complete(const TActorContext&) override { Self->Data->CommitTrash(this); - Self->Data->HandleTrash(); TActivationContext::Send(Response.release()); } }; |