aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2022-11-04 19:31:40 +0300
committeralexvru <alexvru@ydb.tech>2022-11-04 19:31:40 +0300
commitea78535b4b129ec0c018388786eea035eb81ec2c (patch)
treebb26a02ea735c10a1af5c4511f2f650eda78b42c
parent6f61e21ce1ec50faaae313dd52b1d8578f7a924f (diff)
downloadydb-ea78535b4b129ec0c018388786eea035eb81ec2c.tar.gz
Fix uncertainty resolver error
-rw-r--r--ydb/core/blob_depot/CMakeLists.txt2
-rw-r--r--ydb/core/blob_depot/agent.cpp2
-rw-r--r--ydb/core/blob_depot/agent/storage_put.cpp2
-rw-r--r--ydb/core/blob_depot/data.cpp257
-rw-r--r--ydb/core/blob_depot/data.h37
-rw-r--r--ydb/core/blob_depot/data_load.cpp1
-rw-r--r--ydb/core/blob_depot/data_mon.cpp38
-rw-r--r--ydb/core/blob_depot/data_trash.cpp179
-rw-r--r--ydb/core/blob_depot/data_uncertain.cpp23
-rw-r--r--ydb/core/blob_depot/data_uncertain.h8
-rw-r--r--ydb/core/blob_depot/garbage_collection.cpp1
-rw-r--r--ydb/core/blob_depot/mon_main.cpp169
-rw-r--r--ydb/core/blob_depot/mon_main.h33
-rw-r--r--ydb/core/blob_depot/op_commit_blob_seq.cpp1
14 files changed, 460 insertions, 293 deletions
diff --git a/ydb/core/blob_depot/CMakeLists.txt b/ydb/core/blob_depot/CMakeLists.txt
index 24b4e70e6d6..b05e2e73a92 100644
--- a/ydb/core/blob_depot/CMakeLists.txt
+++ b/ydb/core/blob_depot/CMakeLists.txt
@@ -24,7 +24,9 @@ target_sources(ydb-core-blob_depot PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_gc.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_load.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_mon.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_resolve.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_trash.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_uncertain.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/garbage_collection.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/given_id_range.cpp
diff --git a/ydb/core/blob_depot/agent.cpp b/ydb/core/blob_depot/agent.cpp
index b5426c3ff45..e28a2c62f86 100644
--- a/ydb/core/blob_depot/agent.cpp
+++ b/ydb/core/blob_depot/agent.cpp
@@ -197,8 +197,8 @@ namespace NKikimr::NBlobDepot {
(Channel, channel_), (GivenIdRanges, Channels[channel_].GivenIdRanges),
(Agent.GivenIdRanges, agentGivenIdRange_));
agentGivenIdRange = {};
+ Data->OnLeastExpectedBlobIdChange(channel);
}
- Data->HandleTrash();
}
void TBlobDepot::InitChannelKinds() {
diff --git a/ydb/core/blob_depot/agent/storage_put.cpp b/ydb/core/blob_depot/agent/storage_put.cpp
index 480992a67bf..a97a38b59a7 100644
--- a/ydb/core/blob_depot/agent/storage_put.cpp
+++ b/ydb/core/blob_depot/agent/storage_put.cpp
@@ -84,7 +84,7 @@ namespace NKikimr::NBlobDepot {
std::optional<TBlobSeqId> blobSeqId = kind.Allocate(Agent);
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA21, "allocated BlobSeqId", (VirtualGroupId, Agent.VirtualGroupId),
- (QueryId, GetQueryId()), (BlobSeqId, blobSeqId));
+ (QueryId, GetQueryId()), (BlobSeqId, blobSeqId), (BlobId, msg.Id));
if (!blobSeqId) {
return kind.EnqueueQueryWaitingForId(this);
}
diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp
index ba2641188ee..a74cac40ac1 100644
--- a/ydb/core/blob_depot/data.cpp
+++ b/ydb/core/blob_depot/data.cpp
@@ -70,26 +70,24 @@ namespace NKikimr::NBlobDepot {
}
template<typename T, typename... TArgs>
- bool TData::UpdateKey(TKey key, NTabletFlatExecutor::TTransactionContext& txc, void *cookie, T&& callback, TArgs&&... args) {
+ bool TData::UpdateKey(TKey key, NTabletFlatExecutor::TTransactionContext& txc, void *cookie, const char *reason,
+ T&& callback, TArgs&&... args) {
bool underSoft = false, underHard = false;
auto var = key.AsVariant();
if (auto *id = std::get_if<TLogoBlobID>(&var)) {
Self->BarrierServer->GetBlobBarrierRelation(*id, &underSoft, &underHard);
}
- if (underHard || underSoft) {
+ if (underHard) {
if (const auto it = Data.find(key); it == Data.end()) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT59, "UpdateKey: key under hard barrier, will not be created",
+ (Id, Self->GetLogId()), (Key, key), (Reason, reason));
return false; // no such key existed and will not be created as it hits the barrier
- } else {
- Y_VERIFY_S(!underHard && it->second.KeepState == EKeepState::Keep,
- "barrier invariant failed Key# " << key.ToString() << " Value# " << it->second.ToString());
}
}
const auto [it, inserted] = Data.try_emplace(std::move(key), std::forward<TArgs>(args)...);
{
auto& [key, value] = *it;
- Y_VERIFY(!underHard);
- Y_VERIFY(!underSoft || !inserted);
std::vector<TLogoBlobID> deleteQ;
const bool uncertainWriteBefore = value.UncertainWrite;
@@ -106,44 +104,56 @@ namespace NKikimr::NBlobDepot {
EUpdateOutcome outcome = callback(value, inserted);
- Y_VERIFY(!value.UncertainWrite || !value.ValueChain.empty());
-
Y_VERIFY(!inserted || outcome != EUpdateOutcome::NO_CHANGE);
- if (underSoft && value.KeepState != EKeepState::Keep) {
+ if ((underSoft && value.KeepState != EKeepState::Keep) || underHard) {
outcome = EUpdateOutcome::DROP;
}
+ auto outcomeToString = [outcome] {
+ switch (outcome) {
+ case EUpdateOutcome::CHANGE: return "CHANGE";
+ case EUpdateOutcome::NO_CHANGE: return "NO_CHANGE";
+ case EUpdateOutcome::DROP: return "DROP";
+ }
+ };
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT60, "UpdateKey", (Id, Self->GetLogId()), (Key, key), (Reason, reason),
+ (Outcome, outcomeToString()), (UnderSoft, underSoft), (Inserted, inserted), (Value, value),
+ (UncertainWriteBefore, uncertainWriteBefore));
+
EnumerateBlobsForValueChain(value.ValueChain, Self->TabletID(), [&](TLogoBlobID id, ui32, ui32) {
- const auto [it, inserted] = RefCount.try_emplace(id, 1);
+ const auto [it, inserted] = RefCount.try_emplace(id);
if (inserted) {
// first mention of this id
auto& record = GetRecordsPerChannelGroup(id);
const auto [_, inserted] = record.Used.insert(id);
Y_VERIFY(inserted);
AccountBlob(id, 1);
+ TotalStoredDataSize += id.BlobSize();
// blob is first mentioned and deleted as well
if (outcome == EUpdateOutcome::DROP) {
- it->second = 0;
deleteQ.push_back(id);
- } else {
- TotalStoredDataSize += id.BlobSize();
}
- } else if (outcome != EUpdateOutcome::DROP) {
+ }
+ if (outcome != EUpdateOutcome::DROP) {
++it->second;
}
});
- for (const TLogoBlobID& id : deleteQ) {
+ auto filter = [&](const TLogoBlobID& id) {
const auto it = RefCount.find(id);
Y_VERIFY(it != RefCount.end());
- if (!it->second) {
+ if (it->second) {
+ return true; // remove this blob from deletion queue, it still has references
+ } else {
InFlightTrash.emplace(cookie, id);
NIceDb::TNiceDb(txc.DB).Table<Schema::Trash>().Key(id.AsBinaryString()).Update();
RefCount.erase(it);
TotalStoredDataSize -= id.BlobSize();
+ return false; // keep this blob in deletion queue
}
- }
+ };
+ deleteQ.erase(std::remove_if(deleteQ.begin(), deleteQ.end(), filter), deleteQ.end());
if (!deleteQ.empty()) {
UncertaintyResolver->DropBlobs(deleteQ);
}
@@ -184,7 +194,7 @@ namespace NKikimr::NBlobDepot {
void TData::UpdateKey(const TKey& key, const NKikimrBlobDepot::TEvCommitBlobSeq::TItem& item, bool uncertainWrite,
NTabletFlatExecutor::TTransactionContext& txc, void *cookie) {
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT10, "UpdateKey", (Id, Self->GetLogId()), (Key, key), (Item, item));
- UpdateKey(key, txc, cookie, [&](TValue& value, bool inserted) {
+ UpdateKey(key, txc, cookie, "UpdateKey", [&](TValue& value, bool inserted) {
if (!inserted) { // update value items
value.Meta = item.GetMeta();
value.Public = false;
@@ -255,7 +265,8 @@ namespace NKikimr::NBlobDepot {
TTabletStorageInfo *info = Self->Info();
const ui32 groupId = info->GroupFor(id.Channel(), id.Generation());
Y_VERIFY(groupId != Max<ui32>());
- const auto& key = std::make_tuple(id.TabletID(), id.Channel(), groupId);
+ Y_VERIFY(id.TabletID() == info->TabletID);
+ const auto& key = std::make_tuple(id.Channel(), groupId);
const auto [it, _] = RecordsPerChannelGroup.emplace(std::piecewise_construct, key, key);
return it->second;
}
@@ -266,7 +277,7 @@ namespace NKikimr::NBlobDepot {
const bool success = proto.ParseFromString(value);
Y_VERIFY(success);
- UpdateKey(std::move(key), txc, cookie, [&](TValue& value, bool inserted) {
+ UpdateKey(std::move(key), txc, cookie, "AddDataOnLoad", [&](TValue& value, bool inserted) {
if (!inserted) { // do some merge logic
value.KeepState = Max(value.KeepState, proto.GetKeepState());
if (value.ValueChain.empty() && proto.ValueChainSize()) {
@@ -286,7 +297,7 @@ namespace NKikimr::NBlobDepot {
void TData::AddDataOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBlob& blob,
NTabletFlatExecutor::TTransactionContext& txc, void *cookie) {
- UpdateKey(TKey(blob.Id), txc, cookie, [&](TValue& value, bool inserted) {
+ UpdateKey(TKey(blob.Id), txc, cookie, "AddDataOnDecommit", [&](TValue& value, bool inserted) {
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT49, "AddDataOnDecommit", (Id, Self->GetLogId()), (Blob, blob),
(Value, value), (Inserted, inserted));
@@ -309,12 +320,11 @@ namespace NKikimr::NBlobDepot {
void TData::AddTrashOnLoad(TLogoBlobID id) {
auto& record = GetRecordsPerChannelGroup(id);
record.Trash.insert(id);
- record.EnqueueForCollectionIfPossible(this);
AccountBlob(id, true);
}
void TData::AddGenStepOnLoad(ui8 channel, ui32 groupId, TGenStep issuedGenStep, TGenStep confirmedGenStep) {
- const auto& key = std::make_tuple(Self->TabletID(), channel, groupId);
+ const auto& key = std::make_tuple(channel, groupId);
const auto [it, _] = RecordsPerChannelGroup.emplace(std::piecewise_construct, key, key);
auto& record = it->second;
record.IssuedGenStep = issuedGenStep;
@@ -323,7 +333,7 @@ namespace NKikimr::NBlobDepot {
bool TData::UpdateKeepState(TKey key, EKeepState keepState,
NTabletFlatExecutor::TTransactionContext& txc, void *cookie) {
- return UpdateKey(std::move(key), txc, cookie, [&](TValue& value, bool inserted) {
+ return UpdateKey(std::move(key), txc, cookie, "UpdateKeepState", [&](TValue& value, bool inserted) {
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT51, "UpdateKeepState", (Id, Self->GetLogId()), (Key, key),
(KeepState, keepState), (Value, value));
if (inserted) {
@@ -339,181 +349,12 @@ namespace NKikimr::NBlobDepot {
void TData::DeleteKey(const TKey& key, NTabletFlatExecutor::TTransactionContext& txc, void *cookie) {
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT14, "DeleteKey", (Id, Self->GetLogId()), (Key, key));
- UpdateKey(key, txc, cookie, [&](TValue&, bool inserted) {
+ UpdateKey(key, txc, cookie, "DeleteKey", [&](TValue&, bool inserted) {
Y_VERIFY(!inserted);
return EUpdateOutcome::DROP;
});
}
- void TData::CommitTrash(void *cookie) {
- auto [first, last] = InFlightTrash.equal_range(cookie);
- for (auto it = first; it != last; ++it) {
- auto& record = GetRecordsPerChannelGroup(it->second);
- record.MoveToTrash(this, it->second);
- }
- InFlightTrash.erase(first, last);
- }
-
- void TData::HandleTrash() {
- const ui32 generation = Self->Executor()->Generation();
- THashMap<ui32, std::unique_ptr<TEvBlobDepot::TEvPushNotify>> outbox;
-
- while (RecordsWithTrash) {
- TRecordsPerChannelGroup& record = *RecordsWithTrash.PopFront();
-
- Y_VERIFY(!record.CollectGarbageRequestInFlight);
- Y_VERIFY(record.TabletId == Self->TabletID());
- Y_VERIFY(!record.Trash.empty());
-
- Y_VERIFY(record.Channel < Self->Channels.size());
- auto& channel = Self->Channels[record.Channel];
-
- TGenStep nextGenStep(*--record.Trash.end());
- std::set<TLogoBlobID>::iterator trashEndIter = record.Trash.end();
-
- // step we are going to invalidate (including blobs with this one)
- if (TGenStep(record.LeastExpectedBlobId) <= nextGenStep) {
- const ui32 invalidatedStep = nextGenStep.Step(); // the step we want to invalidate and garbage collect
-
- // remove invalidated step from allocations
- auto blobSeqId = TBlobSeqId::FromSequentalNumber(record.Channel, generation, channel.NextBlobSeqId);
- Y_VERIFY(record.LastConfirmedGenStep < TGenStep(blobSeqId));
- if (blobSeqId.Step <= invalidatedStep) {
- blobSeqId.Step = invalidatedStep + 1;
- blobSeqId.Index = 0;
- channel.NextBlobSeqId = blobSeqId.ToSequentialNumber();
- }
-
- // issue notifications to agents
- for (auto& [agentId, agent] : Self->Agents) {
- if (!agent.AgentId) {
- continue;
- }
- const auto [it, inserted] = agent.InvalidatedStepInFlight.emplace(record.Channel, invalidatedStep);
- if (inserted || it->second < invalidatedStep) {
- it->second = invalidatedStep;
-
- auto& ev = outbox[agentId];
- if (!ev) {
- ev.reset(new TEvBlobDepot::TEvPushNotify);
- }
- auto *item = ev->Record.AddInvalidatedSteps();
- item->SetChannel(record.Channel);
- item->SetGeneration(generation);
- item->SetInvalidatedStep(invalidatedStep);
- }
- }
-
- // adjust the barrier to keep it safe now
- const TLogoBlobID maxId(record.TabletId, record.LeastExpectedBlobId.Generation,
- record.LeastExpectedBlobId.Step, record.Channel, 0, 0);
- trashEndIter = record.Trash.lower_bound(maxId);
- if (trashEndIter != record.Trash.begin()) {
- nextGenStep = TGenStep(*std::prev(trashEndIter));
- } else {
- nextGenStep = {};
- }
- }
-
- TVector<TLogoBlobID> keep;
- TVector<TLogoBlobID> doNotKeep;
- std::vector<TLogoBlobID> trashInFlight;
-
- for (auto it = record.Trash.begin(); it != trashEndIter; ++it) {
- if (const TGenStep genStep(*it); genStep <= record.LastConfirmedGenStep) {
- doNotKeep.push_back(*it);
- } else if (nextGenStep < genStep) {
- Y_FAIL();
- }
- trashInFlight.push_back(*it);
- }
-
- const TLogoBlobID keepFrom(record.TabletId, record.LastConfirmedGenStep.Generation(),
- record.LastConfirmedGenStep.Step(), record.Channel, TLogoBlobID::MaxBlobSize, TLogoBlobID::MaxCookie,
- TLogoBlobID::MaxPartId, TLogoBlobID::MaxCrcMode);
- for (auto it = record.Used.upper_bound(keepFrom); it != record.Used.end() && TGenStep(*it) <= nextGenStep; ++it) {
- Y_VERIFY(record.LastConfirmedGenStep < TGenStep(*it));
- keep.push_back(*it);
- }
-
- const bool collect = nextGenStep > record.LastConfirmedGenStep;
-
- if (trashInFlight.empty()) {
- Y_VERIFY(keep.empty());
- continue; // nothing to do here
- }
-
- auto keep_ = keep ? std::make_unique<TVector<TLogoBlobID>>(std::move(keep)) : nullptr;
- auto doNotKeep_ = doNotKeep ? std::make_unique<TVector<TLogoBlobID>>(std::move(doNotKeep)) : nullptr;
- auto ev = std::make_unique<TEvBlobStorage::TEvCollectGarbage>(record.TabletId, generation,
- record.PerGenerationCounter, record.Channel, collect, nextGenStep.Generation(), nextGenStep.Step(),
- keep_.get(), doNotKeep_.get(), TInstant::Max(), true);
-
- keep_.release();
- doNotKeep_.release();
-
- record.CollectGarbageRequestInFlight = true;
- record.PerGenerationCounter += ev->Collect ? ev->PerGenerationCounterStepSize() : 0;
- record.TrashInFlight.swap(trashInFlight);
- record.IssuedGenStep = Max(nextGenStep, record.LastConfirmedGenStep);
-
- Y_VERIFY(trashInFlight.empty());
-
- record.TIntrusiveListItem<TRecordsPerChannelGroup, TRecordWithTrash>::Unlink();
-
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT11, "issuing TEvCollectGarbage", (Id, Self->GetLogId()),
- (Channel, int(record.Channel)), (GroupId, record.GroupId), (Msg, ev->ToString()),
- (LastConfirmedGenStep, record.LastConfirmedGenStep), (IssuedGenStep, record.IssuedGenStep),
- (TrashInFlight.size, record.TrashInFlight.size()));
-
- const ui64 id = ++LastCollectCmdId;
- CollectCmdToGroup.emplace(id, record.GroupId);
-
- if (collect) {
- ExecuteIssueGC(record.Channel, record.GroupId, record.IssuedGenStep, std::move(ev), id);
- } else {
- SendToBSProxy(Self->SelfId(), record.GroupId, ev.release(), id);
- }
- }
-
- for (auto& [agentId, ev] : outbox) {
- TAgent& agent = Self->GetAgent(agentId);
- const ui64 id = ++agent.LastRequestId;
- auto& request = agent.InvalidateStepRequests[id];
- for (const auto& item : ev->Record.GetInvalidatedSteps()) {
- request[item.GetChannel()] = item.GetInvalidatedStep();
- }
-
- Y_VERIFY(agent.AgentId);
- agent.PushCallbacks.emplace(id, std::bind(&TData::OnPushNotifyResult, this, std::placeholders::_1));
- TActivationContext::Send(new IEventHandle(*agent.AgentId, Self->SelfId(), ev.release(), 0, id));
- }
- }
-
- void TData::Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev) {
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT12, "TEvCollectGarbageResult", (Id, Self->GetLogId()),
- (Channel, ev->Get()->Channel), (GroupId, ev->Cookie), (Msg, ev->Get()->ToString()));
-
- auto cmd = CollectCmdToGroup.extract(ev->Cookie);
- Y_VERIFY(cmd);
- const ui32 groupId = cmd.mapped();
-
- const auto& key = std::make_tuple(ev->Get()->TabletId, ev->Get()->Channel, groupId);
- const auto it = RecordsPerChannelGroup.find(key);
- Y_VERIFY(it != RecordsPerChannelGroup.end());
- auto& record = it->second;
- if (ev->Get()->Status == NKikimrProto::OK) {
- Y_VERIFY(record.CollectGarbageRequestInFlight);
- record.OnSuccessfulCollect(this);
- ExecuteConfirmGC(record.Channel, record.GroupId, std::exchange(record.TrashInFlight, {}),
- record.LastConfirmedGenStep);
- } else {
- record.TrashInFlight.clear();
- record.ClearInFlight(this);
- HandleTrash();
- }
- }
-
void TData::OnPushNotifyResult(TEvBlobDepot::TEvPushNotifyResult::TPtr ev) {
TAgent& agent = Self->GetAgent(ev->Recipient);
@@ -570,15 +411,6 @@ namespace NKikimr::NBlobDepot {
OnLeastExpectedBlobIdChange(channel);
}
-
- HandleTrash();
- }
-
- void TData::OnCommitConfirmedGC(ui8 channel, ui32 groupId) {
- const auto& key = std::make_tuple(Self->TabletID(), channel, groupId);
- const auto it = RecordsPerChannelGroup.find(key);
- Y_VERIFY(it != RecordsPerChannelGroup.end());
- it->second.ClearInFlight(this);
}
bool TData::OnBarrierShift(ui64 tabletId, ui8 channel, bool hard, TGenStep previous, TGenStep current, ui32& maxItems,
@@ -616,7 +448,7 @@ namespace NKikimr::NBlobDepot {
}
bool TData::CanBeCollected(ui32 groupId, TBlobSeqId id) const {
- const auto it = RecordsPerChannelGroup.find(std::make_tuple(Self->TabletID(), id.Channel, groupId));
+ const auto it = RecordsPerChannelGroup.find(std::make_tuple(id.Channel, groupId));
return it != RecordsPerChannelGroup.end() && TGenStep(id) <= it->second.IssuedGenStep;
}
@@ -632,18 +464,17 @@ namespace NKikimr::NBlobDepot {
const TTabletChannelInfo *storageChannel = info->ChannelInfo(leastExpectedBlobId.Channel);
Y_VERIFY(storageChannel);
for (const auto& entry : storageChannel->History) {
- const auto& key = std::make_tuple(info->TabletID, storageChannel->Channel, entry.GroupID);
+ const auto& key = std::make_tuple(storageChannel->Channel, entry.GroupID);
auto [it, _] = RecordsPerChannelGroup.emplace(std::piecewise_construct, key, key);
auto& record = it->second;
record.OnLeastExpectedBlobIdChange(this, leastExpectedBlobId);
}
}
- void TData::TRecordsPerChannelGroup::MoveToTrash(TData *self, TLogoBlobID id) {
+ void TData::TRecordsPerChannelGroup::MoveToTrash(TLogoBlobID id) {
const auto usedIt = Used.find(id);
Y_VERIFY(usedIt != Used.end());
Trash.insert(Used.extract(usedIt));
- EnqueueForCollectionIfPossible(self);
}
void TData::TRecordsPerChannelGroup::OnSuccessfulCollect(TData *self) {
@@ -662,19 +493,19 @@ namespace NKikimr::NBlobDepot {
<< " Next# " << leastExpectedBlobId.ToString());
if (LeastExpectedBlobId < leastExpectedBlobId) {
LeastExpectedBlobId = leastExpectedBlobId;
- EnqueueForCollectionIfPossible(self);
+ CollectIfPossible(self);
}
}
void TData::TRecordsPerChannelGroup::ClearInFlight(TData *self) {
Y_VERIFY(CollectGarbageRequestInFlight);
CollectGarbageRequestInFlight = false;
- EnqueueForCollectionIfPossible(self);
+ CollectIfPossible(self);
}
- void TData::TRecordsPerChannelGroup::EnqueueForCollectionIfPossible(TData *self) {
- if (!CollectGarbageRequestInFlight && TabletId == self->Self->TabletID() && Empty() && !Trash.empty()) {
- self->RecordsWithTrash.PushBack(this);
+ void TData::TRecordsPerChannelGroup::CollectIfPossible(TData *self) {
+ if (!CollectGarbageRequestInFlight && !Trash.empty()) {
+ self->HandleTrash(*this);
}
}
diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h
index 2bed10ad48a..2e12413baf7 100644
--- a/ydb/core/blob_depot/data.h
+++ b/ydb/core/blob_depot/data.h
@@ -318,10 +318,7 @@ namespace NKikimr::NBlobDepot {
private:
struct TRecordWithTrash {};
- struct TRecordsPerChannelGroup
- : TIntrusiveListItem<TRecordsPerChannelGroup, TRecordWithTrash>
- {
- const ui64 TabletId;
+ struct TRecordsPerChannelGroup {
const ui8 Channel;
const ui32 GroupId;
@@ -334,24 +331,22 @@ namespace NKikimr::NBlobDepot {
bool CollectGarbageRequestInFlight = false;
TBlobSeqId LeastExpectedBlobId;
- TRecordsPerChannelGroup(ui64 tabletId, ui8 channel, ui32 groupId)
- : TabletId(tabletId)
- , Channel(channel)
+ TRecordsPerChannelGroup(ui8 channel, ui32 groupId)
+ : Channel(channel)
, GroupId(groupId)
{}
- void MoveToTrash(TData *self, TLogoBlobID id);
+ void MoveToTrash(TLogoBlobID id);
void OnSuccessfulCollect(TData *self);
void OnLeastExpectedBlobIdChange(TData *self, TBlobSeqId leastExpectedBlobId);
void ClearInFlight(TData *self);
- void EnqueueForCollectionIfPossible(TData *self);
+ void CollectIfPossible(TData *self);
};
bool Loaded = false;
std::map<TKey, TValue> Data;
THashMap<TLogoBlobID, ui32> RefCount;
- THashMap<std::tuple<ui64, ui8, ui32>, TRecordsPerChannelGroup> RecordsPerChannelGroup;
- TIntrusiveList<TRecordsPerChannelGroup, TRecordWithTrash> RecordsWithTrash;
+ THashMap<std::tuple<ui8, ui32>, TRecordsPerChannelGroup> RecordsPerChannelGroup;
std::optional<TKey> LastLoadedKey; // keys are being loaded in ascending order
std::optional<TLogoBlobID> LastAssimilatedBlobId;
ui64 TotalStoredDataSize = 0;
@@ -422,10 +417,21 @@ namespace NKikimr::NBlobDepot {
return true;
}
+ template<typename TCallback>
+ void ShowRange(const std::optional<TKey>& seek, ui32 rowsBefore, ui32 rowsAfter, TCallback&& callback) {
+ auto it = seek ? Data.lower_bound(*seek) : Data.begin();
+ for (; it != Data.begin() && rowsBefore; --it, --rowsBefore, ++rowsAfter)
+ {}
+ for (; it != Data.end() && rowsAfter; ++it, --rowsAfter) {
+ callback(it->first, it->second);
+ }
+ }
+
const TValue *FindKey(const TKey& key) const;
template<typename T, typename... TArgs>
- bool UpdateKey(TKey key, NTabletFlatExecutor::TTransactionContext& txc, void *cookie, T&& callback, TArgs&&... args);
+ bool UpdateKey(TKey key, NTabletFlatExecutor::TTransactionContext& txc, void *cookie, const char *reason,
+ T&& callback, TArgs&&... args);
void UpdateKey(const TKey& key, const NKikimrBlobDepot::TEvCommitBlobSeq::TItem& item, bool uncertainWrite,
NTabletFlatExecutor::TTransactionContext& txc, void *cookie);
@@ -441,11 +447,10 @@ namespace NKikimr::NBlobDepot {
void AddTrashOnLoad(TLogoBlobID id);
void AddGenStepOnLoad(ui8 channel, ui32 groupId, TGenStep issuedGenStep, TGenStep confirmedGenStep);
- bool UpdateKeepState(TKey key, EKeepState keepState,
- NTabletFlatExecutor::TTransactionContext& txc, void *cookie);
+ bool UpdateKeepState(TKey key, EKeepState keepState, NTabletFlatExecutor::TTransactionContext& txc, void *cookie);
void DeleteKey(const TKey& key, NTabletFlatExecutor::TTransactionContext& txc, void *cookie);
void CommitTrash(void *cookie);
- void HandleTrash();
+ void HandleTrash(TRecordsPerChannelGroup& record);
void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev);
void OnPushNotifyResult(TEvBlobDepot::TEvPushNotifyResult::TPtr ev);
void OnCommitConfirmedGC(ui8 channel, ui32 groupId);
@@ -486,6 +491,8 @@ namespace NKikimr::NBlobDepot {
return TotalStoredDataSize;
}
+ void RenderMainPage(IOutputStream& s);
+
private:
void ExecuteIssueGC(ui8 channel, ui32 groupId, TGenStep issuedGenStep,
std::unique_ptr<TEvBlobStorage::TEvCollectGarbage> collectGarbage, ui64 cookie);
diff --git a/ydb/core/blob_depot/data_load.cpp b/ydb/core/blob_depot/data_load.cpp
index fc9b9e2e97b..e2a487bf62b 100644
--- a/ydb/core/blob_depot/data_load.cpp
+++ b/ydb/core/blob_depot/data_load.cpp
@@ -162,7 +162,6 @@ namespace NKikimr::NBlobDepot {
void TData::OnLoadComplete() {
Loaded = true;
Self->BarrierServer->OnDataLoaded();
- HandleTrash();
}
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/data_mon.cpp b/ydb/core/blob_depot/data_mon.cpp
new file mode 100644
index 00000000000..e7f68c4eb45
--- /dev/null
+++ b/ydb/core/blob_depot/data_mon.cpp
@@ -0,0 +1,38 @@
+#include "data.h"
+#include "data_uncertain.h"
+#include "mon_main.h"
+
+namespace NKikimr::NBlobDepot {
+
+ using TData = TBlobDepot::TData;
+
+ void TData::RenderMainPage(IOutputStream& s) {
+ HTML(s) {
+ DIV_CLASS("panel panel-info") {
+ DIV_CLASS("panel-heading") {
+ s << "Main";
+ }
+ DIV_CLASS("panel-body") {
+ KEYVALUE_TABLE({
+ KEYVALUE_P("Loaded", Loaded ? "true" : "false");
+ KEYVALUE_P("Last loaded key", LastLoadedKey ? LastLoadedKey->ToString() : "<null>");
+ KEYVALUE_P("Last assimilated blob id", LastAssimilatedBlobId ? LastAssimilatedBlobId->ToString() : "<null>");
+ KEYVALUE_P("Data size, number of keys", Data.size());
+ KEYVALUE_P("RefCount size, number of blobs", RefCount.size());
+ KEYVALUE_P("Total stored data size, bytes", FormatByteSize(TotalStoredDataSize));
+ KEYVALUE_P("Keys made certain, number of keys", KeysMadeCertain.size());
+ })
+ }
+ }
+ DIV_CLASS("panel panel-info") {
+ DIV_CLASS("panel-heading") {
+ s << "Uncertainty resolver";
+ }
+ DIV_CLASS("panel-body") {
+ UncertaintyResolver->RenderMainPage(s);
+ }
+ }
+ }
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/data_trash.cpp b/ydb/core/blob_depot/data_trash.cpp
new file mode 100644
index 00000000000..91b68d09479
--- /dev/null
+++ b/ydb/core/blob_depot/data_trash.cpp
@@ -0,0 +1,179 @@
+#include "data.h"
+
+namespace NKikimr::NBlobDepot {
+
+ using TData = TBlobDepot::TData;
+
+ void TData::CommitTrash(void *cookie) {
+ auto [first, last] = InFlightTrash.equal_range(cookie);
+ std::unordered_set<TRecordsPerChannelGroup*> records;
+ for (auto it = first; it != last; ++it) {
+ auto& record = GetRecordsPerChannelGroup(it->second);
+ record.MoveToTrash(it->second);
+ records.insert(&record);
+ }
+ InFlightTrash.erase(first, last);
+
+ for (TRecordsPerChannelGroup *record : records) {
+ record->CollectIfPossible(this);
+ }
+ }
+
+ void TData::HandleTrash(TRecordsPerChannelGroup& record) {
+ const ui32 generation = Self->Executor()->Generation();
+ THashMap<ui32, std::unique_ptr<TEvBlobDepot::TEvPushNotify>> outbox;
+
+ Y_VERIFY(!record.CollectGarbageRequestInFlight);
+ Y_VERIFY(!record.Trash.empty());
+
+ Y_VERIFY(record.Channel < Self->Channels.size());
+ auto& channel = Self->Channels[record.Channel];
+
+ TGenStep nextGenStep(*--record.Trash.end());
+ std::set<TLogoBlobID>::iterator trashEndIter = record.Trash.end();
+
+ // step we are going to invalidate (including blobs with this one)
+ if (TGenStep(record.LeastExpectedBlobId) <= nextGenStep) {
+ const ui32 invalidatedStep = nextGenStep.Step(); // the step we want to invalidate and garbage collect
+
+ // remove invalidated step from allocations
+ auto blobSeqId = TBlobSeqId::FromSequentalNumber(record.Channel, generation, channel.NextBlobSeqId);
+ Y_VERIFY(record.LastConfirmedGenStep < TGenStep(blobSeqId));
+ if (blobSeqId.Step <= invalidatedStep) {
+ blobSeqId.Step = invalidatedStep + 1;
+ blobSeqId.Index = 0;
+ channel.NextBlobSeqId = blobSeqId.ToSequentialNumber();
+ }
+
+ // issue notifications to agents
+ for (auto& [agentId, agent] : Self->Agents) {
+ if (!agent.AgentId) {
+ continue;
+ }
+ const auto [it, inserted] = agent.InvalidatedStepInFlight.emplace(record.Channel, invalidatedStep);
+ if (inserted || it->second < invalidatedStep) {
+ it->second = invalidatedStep;
+
+ auto& ev = outbox[agentId];
+ if (!ev) {
+ ev.reset(new TEvBlobDepot::TEvPushNotify);
+ }
+ auto *item = ev->Record.AddInvalidatedSteps();
+ item->SetChannel(record.Channel);
+ item->SetGeneration(generation);
+ item->SetInvalidatedStep(invalidatedStep);
+ }
+ }
+
+ // adjust the barrier to keep it safe now
+ const TLogoBlobID maxId(Self->TabletID(), record.LeastExpectedBlobId.Generation,
+ record.LeastExpectedBlobId.Step, record.Channel, 0, 0);
+ trashEndIter = record.Trash.lower_bound(maxId);
+ if (trashEndIter != record.Trash.begin()) {
+ nextGenStep = TGenStep(*std::prev(trashEndIter));
+ } else {
+ nextGenStep = {};
+ }
+ }
+
+ TVector<TLogoBlobID> keep;
+ TVector<TLogoBlobID> doNotKeep;
+ std::vector<TLogoBlobID> trashInFlight;
+
+ for (auto it = record.Trash.begin(); it != trashEndIter; ++it) {
+ if (const TGenStep genStep(*it); genStep <= record.LastConfirmedGenStep) {
+ doNotKeep.push_back(*it);
+ } else if (nextGenStep < genStep) {
+ Y_FAIL();
+ }
+ trashInFlight.push_back(*it);
+ }
+
+ const TLogoBlobID keepFrom(Self->TabletID(), record.LastConfirmedGenStep.Generation(),
+ record.LastConfirmedGenStep.Step(), record.Channel, TLogoBlobID::MaxBlobSize, TLogoBlobID::MaxCookie,
+ TLogoBlobID::MaxPartId, TLogoBlobID::MaxCrcMode);
+ for (auto it = record.Used.upper_bound(keepFrom); it != record.Used.end() && TGenStep(*it) <= nextGenStep; ++it) {
+ Y_VERIFY(record.LastConfirmedGenStep < TGenStep(*it));
+ keep.push_back(*it);
+ }
+
+ const bool collect = nextGenStep > record.LastConfirmedGenStep;
+
+ if (trashInFlight.empty()) {
+ Y_VERIFY(keep.empty()); // nothing to do here
+ } else {
+ auto keep_ = keep ? std::make_unique<TVector<TLogoBlobID>>(std::move(keep)) : nullptr;
+ auto doNotKeep_ = doNotKeep ? std::make_unique<TVector<TLogoBlobID>>(std::move(doNotKeep)) : nullptr;
+
+ auto ev = std::make_unique<TEvBlobStorage::TEvCollectGarbage>(Self->TabletID(), generation,
+ record.PerGenerationCounter, record.Channel, collect, nextGenStep.Generation(), nextGenStep.Step(),
+ keep_.get(), doNotKeep_.get(), TInstant::Max(), true);
+
+ keep_.release();
+ doNotKeep_.release();
+
+ record.CollectGarbageRequestInFlight = true;
+ record.PerGenerationCounter += ev->Collect ? ev->PerGenerationCounterStepSize() : 0;
+ record.TrashInFlight.swap(trashInFlight);
+ record.IssuedGenStep = Max(nextGenStep, record.LastConfirmedGenStep);
+
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT11, "issuing TEvCollectGarbage", (Id, Self->GetLogId()),
+ (Channel, int(record.Channel)), (GroupId, record.GroupId), (Msg, ev->ToString()),
+ (LastConfirmedGenStep, record.LastConfirmedGenStep), (IssuedGenStep, record.IssuedGenStep),
+ (TrashInFlight.size, record.TrashInFlight.size()));
+
+ const ui64 id = ++LastCollectCmdId;
+ CollectCmdToGroup.emplace(id, record.GroupId);
+
+ if (collect) {
+ ExecuteIssueGC(record.Channel, record.GroupId, record.IssuedGenStep, std::move(ev), id);
+ } else {
+ SendToBSProxy(Self->SelfId(), record.GroupId, ev.release(), id);
+ }
+ }
+
+ for (auto& [agentId, ev] : outbox) {
+ TAgent& agent = Self->GetAgent(agentId);
+ const ui64 id = ++agent.LastRequestId;
+ auto& request = agent.InvalidateStepRequests[id];
+ for (const auto& item : ev->Record.GetInvalidatedSteps()) {
+ request[item.GetChannel()] = item.GetInvalidatedStep();
+ }
+
+ Y_VERIFY(agent.AgentId);
+ agent.PushCallbacks.emplace(id, std::bind(&TData::OnPushNotifyResult, this, std::placeholders::_1));
+ TActivationContext::Send(new IEventHandle(*agent.AgentId, Self->SelfId(), ev.release(), 0, id));
+ }
+ }
+
+ void TData::Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT12, "TEvCollectGarbageResult", (Id, Self->GetLogId()),
+ (Channel, ev->Get()->Channel), (GroupId, ev->Cookie), (Msg, ev->Get()->ToString()));
+
+ auto cmd = CollectCmdToGroup.extract(ev->Cookie);
+ Y_VERIFY(cmd);
+ const ui32 groupId = cmd.mapped();
+
+ const auto& key = std::make_tuple(ev->Get()->Channel, groupId);
+ const auto it = RecordsPerChannelGroup.find(key);
+ Y_VERIFY(it != RecordsPerChannelGroup.end());
+ auto& record = it->second;
+ if (ev->Get()->Status == NKikimrProto::OK) {
+ Y_VERIFY(record.CollectGarbageRequestInFlight);
+ record.OnSuccessfulCollect(this);
+ ExecuteConfirmGC(record.Channel, record.GroupId, std::exchange(record.TrashInFlight, {}),
+ record.LastConfirmedGenStep);
+ } else {
+ record.TrashInFlight.clear();
+ record.ClearInFlight(this);
+ }
+ }
+
+ void TData::OnCommitConfirmedGC(ui8 channel, ui32 groupId) {
+ const auto& key = std::make_tuple(channel, groupId);
+ const auto it = RecordsPerChannelGroup.find(key);
+ Y_VERIFY(it != RecordsPerChannelGroup.end());
+ it->second.ClearInFlight(this);
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/data_uncertain.cpp b/ydb/core/blob_depot/data_uncertain.cpp
index 5474a98f833..5404c3455dc 100644
--- a/ydb/core/blob_depot/data_uncertain.cpp
+++ b/ydb/core/blob_depot/data_uncertain.cpp
@@ -1,5 +1,6 @@
#include "data.h"
#include "data_uncertain.h"
+#include "mon_main.h"
namespace NKikimr::NBlobDepot {
@@ -32,6 +33,8 @@ namespace NKikimr::NBlobDepot {
if (entry->NumUncertainKeys == 0) {
// we had no more uncertain keys to resolve
entry->Result.Send(Self->SelfId(), NKikimrProto::OK, std::nullopt);
+ } else {
+ NumKeysQueried += entry->NumUncertainKeys;
}
}
@@ -47,6 +50,7 @@ namespace NKikimr::NBlobDepot {
void TData::TUncertaintyResolver::DropKey(const TKey& key) {
FinishKey(key, false);
+ ++NumKeysDropped;
}
void TData::TUncertaintyResolver::Handle(TEvBlobStorage::TEvGetResult::TPtr ev) {
@@ -97,6 +101,7 @@ namespace NKikimr::NBlobDepot {
const ui32 groupId = Self->Info()->GroupFor(id.Channel(), id.Generation());
SendToBSProxy(Self->SelfId(), groupId, new TEvBlobStorage::TEvGet(id, 0, 0, TInstant::Max(),
NKikimrBlobStorage::EGetHandleClass::FastRead, true, true));
+ ++NumGetsIssued;
}
okay = false;
@@ -116,10 +121,12 @@ namespace NKikimr::NBlobDepot {
case EKeyBlobState::WASNT_WRITTEN:
// the blob hasn't been written completely; this may also be a race when it is being written
// right now, but we are asking for the data too early (like in scan request)
+ okay = false;
break;
case EKeyBlobState::ERROR:
// we can't figure out this blob's state
+ okay = false;
break;
}
});
@@ -139,6 +146,8 @@ namespace NKikimr::NBlobDepot {
return;
}
+ ++(success ? NumKeysResolved : NumKeysUnresolved);
+
auto item = Keys.extract(keyIt);
auto& keyContext = item.mapped();
@@ -166,4 +175,18 @@ namespace NKikimr::NBlobDepot {
}
}
+ void TData::TUncertaintyResolver::RenderMainPage(IOutputStream& s) {
+ HTML(s) {
+ KEYVALUE_TABLE({
+ KEYVALUE_P("Keys queried", NumKeysQueried);
+ KEYVALUE_P("Gets issued", NumGetsIssued);
+ KEYVALUE_P("Keys resolved", NumKeysResolved);
+ KEYVALUE_P("Keys unresolved", NumKeysUnresolved);
+ KEYVALUE_P("Keys dropped", NumKeysDropped);
+ KEYVALUE_P("Keys being processed", Keys.size());
+ KEYVALUE_P("Blobs in flight", Blobs.size());
+ })
+ }
+ }
+
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/data_uncertain.h b/ydb/core/blob_depot/data_uncertain.h
index 66ca3a7ab6d..da42cfc47ce 100644
--- a/ydb/core/blob_depot/data_uncertain.h
+++ b/ydb/core/blob_depot/data_uncertain.h
@@ -48,6 +48,12 @@ namespace NKikimr::NBlobDepot {
TKeys Keys;
std::unordered_map<TLogoBlobID, TBlobContext> Blobs;
+ ui64 NumKeysQueried = 0;
+ ui64 NumGetsIssued = 0;
+ ui64 NumKeysResolved = 0;
+ ui64 NumKeysUnresolved = 0;
+ ui64 NumKeysDropped = 0;
+
public:
TUncertaintyResolver(TBlobDepot *self);
void PushResultWithUncertainties(TResolveResultAccumulator&& result, std::deque<TKey>&& uncertainties);
@@ -56,6 +62,8 @@ namespace NKikimr::NBlobDepot {
void DropKey(const TKey& key);
void Handle(TEvBlobStorage::TEvGetResult::TPtr ev);
+ void RenderMainPage(IOutputStream& s);
+
private:
void FinishBlob(TLogoBlobID id, EKeyBlobState state);
void CheckAndFinishKeyIfPossible(TKeys::value_type *keyRecord);
diff --git a/ydb/core/blob_depot/garbage_collection.cpp b/ydb/core/blob_depot/garbage_collection.cpp
index b90254400bc..62f7088b605 100644
--- a/ydb/core/blob_depot/garbage_collection.cpp
+++ b/ydb/core/blob_depot/garbage_collection.cpp
@@ -296,7 +296,6 @@ namespace NKikimr::NBlobDepot {
error ? NKikimrProto::ERROR : NKikimrProto::OK, std::move(error));
TActivationContext::Send(response.release());
barrier.ProcessingQ.pop_front();
- Self->Data->HandleTrash();
if (!barrier.ProcessingQ.empty()) {
Self->Execute(std::make_unique<TTxCollectGarbage>(Self, tabletId, channel));
}
diff --git a/ydb/core/blob_depot/mon_main.cpp b/ydb/core/blob_depot/mon_main.cpp
index 2cb2f3c0c81..28d4711477e 100644
--- a/ydb/core/blob_depot/mon_main.cpp
+++ b/ydb/core/blob_depot/mon_main.cpp
@@ -3,6 +3,7 @@
#include "garbage_collection.h"
#include "blocks.h"
#include "space_monitor.h"
+#include "mon_main.h"
namespace NKikimr::NBlobDepot {
@@ -113,6 +114,10 @@ namespace NKikimr::NBlobDepot {
}
void RenderDataTable(bool header) {
+ std::optional<TData::TKey> seek;
+ ui32 rowsAfter = 100;
+ ui32 rowsBefore = 0;
+
HTML(Stream) {
if (header) {
TABLEH() { Stream << "key"; }
@@ -120,7 +125,73 @@ namespace NKikimr::NBlobDepot {
TABLEH() { Stream << "keep state"; }
TABLEH() { Stream << "barrier"; }
} else {
- Self->Data->ScanRange(nullptr, nullptr, 0, [&](const TData::TKey& key, const TData::TValue& value) {
+ const TCgiParameters& cgi = Request->Get()->Cgi();
+ if (cgi.Has("seek")) {
+ if (const TString& value = cgi.Get("seek")) {
+ if (Self->Config.HasVirtualGroupId()) {
+ TString error;
+ TLogoBlobID id;
+ if (TLogoBlobID::Parse(id, value, error)) {
+ seek.emplace(id);
+ } else {
+ DIV() {
+ Stream << "invalid seek value: " << error;
+ }
+ }
+ } else {
+ seek.emplace(value);
+ }
+ }
+ }
+ if (cgi.Has("rowsBefore") && !TryFromString(cgi.Get("rowsBefore"), rowsBefore)) {
+ DIV() {
+ Stream << "invalid rowsBefore value";
+ }
+ }
+ if (cgi.Has("rowsAfter") && !TryFromString(cgi.Get("rowsAfter"), rowsAfter)) {
+ DIV() {
+ Stream << "invalid rowsAfter value";
+ }
+ }
+
+ FORM_CLASS("form-horizontal") {
+ DIV_CLASS("control-group") {
+ LABEL_CLASS_FOR("control-label", "inputSeek") {
+ Stream << "Seek";
+ }
+ DIV_CLASS("controls") {
+ Stream << "<input id='inputSeek' name='seek' type='text' value='" << cgi.Get("seek") << "'/>";
+ }
+ }
+ DIV_CLASS("control-group") {
+ LABEL_CLASS_FOR("control-label", "inputRowsBefore") {
+ Stream << "Rows before";
+ }
+ DIV_CLASS("controls") {
+ Stream << "<input id='inputRowsBefore' name='rowsBefore' type='number' value='" <<
+ rowsBefore << "'/>";
+ }
+ }
+ DIV_CLASS("control-group") {
+ LABEL_CLASS_FOR("control-label", "inputRowsAfter") {
+ Stream << "Rows after";
+ }
+ DIV_CLASS("controls") {
+ Stream << "<input id='inputRowsAfter' name='rowsAfter' type='number' value='" <<
+ rowsAfter << "'/>";
+ }
+ }
+ DIV_CLASS("control-group") {
+ DIV_CLASS("controls") {
+ Stream << "<input type='hidden' name='TabletID' value='" << Self->TabletID() << "'/>";
+ Stream << "<input type='hidden' name='page' value='data'/>";
+ Stream << "<input type='hidden' name='table' value='data'/>";
+ Stream << "<button type='submit' class='btn btn-default'>Show</button>";
+ }
+ }
+ }
+
+ Self->Data->ShowRange(seek, rowsBefore, rowsAfter, [&](const TData::TKey& key, const TData::TValue& value) {
TABLER() {
TABLED() {
key.Output(Stream);
@@ -158,7 +229,6 @@ namespace NKikimr::NBlobDepot {
}
}
}
- return true;
});
}
}
@@ -261,19 +331,12 @@ namespace NKikimr::NBlobDepot {
space.try_emplace(groupId);
}
- auto outSize = [&](ui64 size) {
- static const char *suffixes[] = {
- "B", "KiB", "MiB", "GiB", "TiB", "PiB", nullptr
- };
- FormatHumanReadable(Stream, size, 1024, 2, suffixes);
- };
-
for (const auto& [groupId, value] : space) {
const auto& [current, total] = value;
TABLER() {
TABLED() { Stream << groupId; }
- TABLED() { outSize(current); }
- TABLED() { outSize(total); }
+ TABLED() { Stream << FormatByteSize(current); }
+ TABLED() { Stream << FormatByteSize(total); }
const auto it = Self->SpaceMonitor->Groups.find(groupId);
if (it != Self->SpaceMonitor->Groups.end()) {
@@ -345,61 +408,47 @@ namespace NKikimr::NBlobDepot {
s << "Stats";
}
DIV_CLASS("panel-body") {
- TABLE_CLASS("table") {
- TABLEHEAD() {
- TABLER() {
- TABLEH() { s << "Parameter"; }
- TABLEH() { s << "Value"; }
+ KEYVALUE_TABLE({
+
+ TABLER() {
+ TABLED() { s << "Data, bytes"; }
+ TABLED() {
+ ui64 total = 0;
+ Data->EnumerateRefCount([&](TLogoBlobID id, ui32 /*refCount*/) {
+ total += id.BlobSize();
+ });
+ s << FormatByteSize(total);
}
}
- TABLEBODY() {
- auto outSize = [&](ui64 size) {
- static const char *suffixes[] = {
- "B", "KiB", "MiB", "GiB", "TiB", "PiB", nullptr
- };
- FormatHumanReadable(s, size, 1024, 2, suffixes);
- };
- TABLER() {
- TABLED() { s << "Data, bytes"; }
- TABLED() {
- ui64 total = 0;
- Data->EnumerateRefCount([&](TLogoBlobID id, ui32 /*refCount*/) {
- total += id.BlobSize();
- });
- outSize(total);
- }
- }
-
- ui64 trashInFlight = 0;
- ui64 trashPending = 0;
- Data->EnumerateTrash([&](ui32 /*groupId*/, TLogoBlobID id, bool inFlight) {
- (inFlight ? trashInFlight : trashPending) += id.BlobSize();
- });
- TABLER() {
- TABLED() { s << "Trash in flight, bytes"; }
- TABLED() { outSize(trashInFlight); }
- }
+ ui64 trashInFlight = 0;
+ ui64 trashPending = 0;
+ Data->EnumerateTrash([&](ui32 /*groupId*/, TLogoBlobID id, bool inFlight) {
+ (inFlight ? trashInFlight : trashPending) += id.BlobSize();
+ });
- TABLER() {
- TABLED() { s << "Trash pending, bytes"; }
- TABLED() { outSize(trashPending); }
- }
+ KEYVALUE_P("Trash in flight, bytes", FormatByteSize(trashInFlight));
+ KEYVALUE_P("Trash pending, bytes", FormatByteSize(trashPending));
- std::vector<ui32> groups;
- for (const auto& [groupId, _] : Groups) {
- groups.push_back(groupId);
- }
- std::sort(groups.begin(), groups.end());
- for (const ui32 groupId : groups) {
- TGroupInfo& group = Groups[groupId];
- TABLER() {
- TABLED() { s << "Data in GroupId# " << groupId << ", bytes"; }
- TABLED() { outSize(group.AllocatedBytes); }
- }
- }
+ std::vector<ui32> groups;
+ for (const auto& [groupId, _] : Groups) {
+ groups.push_back(groupId);
}
- }
+ std::sort(groups.begin(), groups.end());
+ for (const ui32 groupId : groups) {
+ TGroupInfo& group = Groups[groupId];
+ KEYVALUE_P(TStringBuilder() << "Data in GroupId# " << groupId << ", bytes",
+ FormatByteSize(group.AllocatedBytes));
+ }
+ })
+ }
+ }
+ DIV_CLASS("panel panel-info") {
+ DIV_CLASS("panel-heading") {
+ s << "Data";
+ }
+ DIV_CLASS("panel-body") {
+ Data->RenderMainPage(s);
}
}
}
diff --git a/ydb/core/blob_depot/mon_main.h b/ydb/core/blob_depot/mon_main.h
new file mode 100644
index 00000000000..b9aa0fd39bc
--- /dev/null
+++ b/ydb/core/blob_depot/mon_main.h
@@ -0,0 +1,33 @@
+#pragma once
+
+#include "defs.h"
+
+namespace NKikimr::NBlobDepot {
+
+ #define KEYVALUE_TABLE(BODY) \
+ TABLE_CLASS("table") { \
+ TABLEHEAD() { \
+ TABLER() { \
+ TABLEH() { s << "Parameter"; } \
+ TABLEH() { s << "Value"; } \
+ } \
+ } \
+ TABLEBODY() { \
+ BODY \
+ } \
+ }
+
+ #define KEYVALUE_P(KEY, VALUE) \
+ TABLER() { \
+ TABLED() { __stream << (KEY); } \
+ TABLED() { __stream << (VALUE); } \
+ }
+
+ inline TString FormatByteSize(ui64 size) {
+ static const char *suffixes[] = {"B", "KiB", "MiB", "GiB", "TiB", "PiB", nullptr};
+ TStringStream s;
+ FormatHumanReadable(s, size, 1024, 2, suffixes);
+ return s.Str();
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/op_commit_blob_seq.cpp b/ydb/core/blob_depot/op_commit_blob_seq.cpp
index fa51308a05a..a8501a57d13 100644
--- a/ydb/core/blob_depot/op_commit_blob_seq.cpp
+++ b/ydb/core/blob_depot/op_commit_blob_seq.cpp
@@ -124,7 +124,6 @@ namespace NKikimr::NBlobDepot {
void Complete(const TActorContext&) override {
Self->Data->CommitTrash(this);
- Self->Data->HandleTrash();
TActivationContext::Send(Response.release());
}
};