aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2022-09-27 19:30:13 +0300
committeralexvru <alexvru@ydb.tech>2022-09-27 19:30:13 +0300
commit3a89ec1b09110ffccc8a2a51f746256ebb8e30c4 (patch)
tree0ff40f525d941af85a21520cfd2513ea43dd316b
parent3c36672f2b135e07e454dda026b5bc2c53f75fad (diff)
downloadydb-3a89ec1b09110ffccc8a2a51f746256ebb8e30c4.tar.gz
Fix garbage collection handling in GroupOverseer
-rw-r--r--ydb/core/blob_depot/blob_depot_tablet.h1
-rw-r--r--ydb/core/blob_depot/data.cpp31
-rw-r--r--ydb/core/blob_depot/data.h15
-rw-r--r--ydb/core/blob_depot/data_gc.cpp10
-rw-r--r--ydb/core/blob_depot/testing.cpp51
-rw-r--r--ydb/core/blob_depot/testing.h1
-rw-r--r--ydb/core/blobstorage/testing/group_overseer/group_overseer.cpp8
-rw-r--r--ydb/core/blobstorage/testing/group_overseer/group_state.cpp81
-rw-r--r--ydb/core/blobstorage/testing/group_overseer/group_state.h12
-rw-r--r--ydb/core/blobstorage/ut_testshard/env.h4
10 files changed, 171 insertions, 43 deletions
diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h
index 42a87d1c96e..5079d824701 100644
--- a/ydb/core/blob_depot/blob_depot_tablet.h
+++ b/ydb/core/blob_depot/blob_depot_tablet.h
@@ -238,6 +238,7 @@ namespace NKikimr::NBlobDepot {
// Validation
void Validate(NTesting::TGroupOverseer& overseer) const;
+ void OnSuccessfulGetResult(TLogoBlobID id) const;
};
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp
index 8aac540989f..521d52ff420 100644
--- a/ydb/core/blob_depot/data.cpp
+++ b/ydb/core/blob_depot/data.cpp
@@ -80,7 +80,7 @@ namespace NKikimr::NBlobDepot {
if (const auto it = Data.find(key); it == Data.end()) {
return false; // no such key existed and will not be created as it hits the barrier
} else {
- Y_VERIFY_S(!underHard && it->second.KeepState == NKikimrBlobDepot::EKeepState::Keep,
+ Y_VERIFY_S(!underHard && it->second.KeepState == EKeepState::Keep,
"barrier invariant failed Key# " << key.ToString() << " Value# " << it->second.ToString());
}
}
@@ -109,7 +109,7 @@ namespace NKikimr::NBlobDepot {
Y_VERIFY(!value.UncertainWrite || !value.ValueChain.empty());
Y_VERIFY(!inserted || outcome != EUpdateOutcome::NO_CHANGE);
- if (underSoft && value.KeepState != NKikimrBlobDepot::EKeepState::Keep) {
+ if (underSoft && value.KeepState != EKeepState::Keep) {
outcome = EUpdateOutcome::DROP;
}
@@ -288,10 +288,10 @@ namespace NKikimr::NBlobDepot {
(Value, value), (Inserted, inserted));
// update keep state if necessary
- if (blob.DoNotKeep && value.KeepState < NKikimrBlobDepot::EKeepState::DoNotKeep) {
- value.KeepState = NKikimrBlobDepot::EKeepState::DoNotKeep;
- } else if (blob.Keep && value.KeepState < NKikimrBlobDepot::EKeepState::Keep) {
- value.KeepState = NKikimrBlobDepot::EKeepState::Keep;
+ if (blob.DoNotKeep && value.KeepState < EKeepState::DoNotKeep) {
+ value.KeepState = EKeepState::DoNotKeep;
+ } else if (blob.Keep && value.KeepState < EKeepState::Keep) {
+ value.KeepState = EKeepState::Keep;
}
// if there is not value chain for this blob, map it to the original blob id
@@ -318,7 +318,7 @@ namespace NKikimr::NBlobDepot {
record.LastConfirmedGenStep = confirmedGenStep;
}
- bool TData::UpdateKeepState(TKey key, NKikimrBlobDepot::EKeepState keepState,
+ bool TData::UpdateKeepState(TKey key, EKeepState keepState,
NTabletFlatExecutor::TTransactionContext& txc, void *cookie) {
return UpdateKey(std::move(key), txc, cookie, [&](TValue& value, bool inserted) {
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT51, "UpdateKeepState", (Id, Self->GetLogId()), (Key, key),
@@ -445,6 +445,7 @@ namespace NKikimr::NBlobDepot {
auto ev = std::make_unique<TEvBlobStorage::TEvCollectGarbage>(record.TabletId, generation,
record.PerGenerationCounter, record.Channel, collect, nextGenStep.Generation(), nextGenStep.Step(),
keep_.get(), doNotKeep_.get(), TInstant::Max(), true);
+
keep_.release();
doNotKeep_.release();
@@ -462,10 +463,13 @@ namespace NKikimr::NBlobDepot {
(LastConfirmedGenStep, record.LastConfirmedGenStep), (IssuedGenStep, record.IssuedGenStep),
(TrashInFlight.size, record.TrashInFlight.size()));
+ const ui64 id = ++LastCollectCmdId;
+ CollectCmdToGroup.emplace(id, record.GroupId);
+
if (collect) {
- ExecuteIssueGC(record.Channel, record.GroupId, record.IssuedGenStep, std::move(ev));
+ ExecuteIssueGC(record.Channel, record.GroupId, record.IssuedGenStep, std::move(ev), id);
} else {
- SendToBSProxy(Self->SelfId(), record.GroupId, ev.release(), record.GroupId);
+ SendToBSProxy(Self->SelfId(), record.GroupId, ev.release(), id);
}
}
@@ -486,7 +490,12 @@ namespace NKikimr::NBlobDepot {
void TData::Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev) {
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT12, "TEvCollectGarbageResult", (Id, Self->GetLogId()),
(Channel, ev->Get()->Channel), (GroupId, ev->Cookie), (Msg, ev->Get()->ToString()));
- const auto& key = std::make_tuple(ev->Get()->TabletId, ev->Get()->Channel, ev->Cookie);
+
+ auto cmd = CollectCmdToGroup.extract(ev->Cookie);
+ Y_VERIFY(cmd);
+ const ui32 groupId = cmd.mapped();
+
+ const auto& key = std::make_tuple(ev->Get()->TabletId, ev->Get()->Channel, groupId);
const auto it = RecordsPerChannelGroup.find(key);
Y_VERIFY(it != RecordsPerChannelGroup.end());
auto& record = it->second;
@@ -577,7 +586,7 @@ namespace NKikimr::NBlobDepot {
bool finished = true;
Self->Data->ScanRange(&first, &last, TData::EScanFlags::INCLUDE_END, [&](auto& key, auto& value) {
- if (value.KeepState != NKikimrBlobDepot::EKeepState::Keep || hard) {
+ if (value.KeepState != EKeepState::Keep || hard) {
if (maxItems) {
Self->Data->DeleteKey(key, txc, cookie);
--maxItems;
diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h
index 2ab81af1faa..aaec1dcfc95 100644
--- a/ydb/core/blob_depot/data.h
+++ b/ydb/core/blob_depot/data.h
@@ -10,6 +10,8 @@ namespace NKikimr::NBlobDepot {
class TBlobDepot::TData {
TBlobDepot* const Self;
+ using EKeepState = NKikimrBlobDepot::EKeepState;
+
public:
class alignas(TString) TKey {
struct TData {
@@ -219,7 +221,7 @@ namespace NKikimr::NBlobDepot {
struct TValue {
TString Meta;
TValueChain ValueChain;
- NKikimrBlobDepot::EKeepState KeepState = NKikimrBlobDepot::EKeepState::Default;
+ EKeepState KeepState = EKeepState::Default;
bool Public = false;
std::optional<TLogoBlobID> OriginalBlobId;
bool UncertainWrite = false;
@@ -252,7 +254,7 @@ namespace NKikimr::NBlobDepot {
locator->CopyFrom(item.GetBlobLocator());
}
- explicit TValue(NKikimrBlobDepot::EKeepState keepState)
+ explicit TValue(EKeepState keepState)
: KeepState(keepState)
, Public(false)
, UncertainWrite(true)
@@ -297,7 +299,7 @@ namespace NKikimr::NBlobDepot {
void Output(IOutputStream& s) const {
s << "{Meta# '" << EscapeC(Meta) << "'"
<< " ValueChain# " << FormatList(ValueChain)
- << " KeepState# " << NKikimrBlobDepot::EKeepState_Name(KeepState)
+ << " KeepState# " << EKeepState_Name(KeepState)
<< " Public# " << (Public ? "true" : "false")
<< " OriginalBlobId# " << (OriginalBlobId ? OriginalBlobId->ToString() : "<none>")
<< " UncertainWrite# " << (UncertainWrite ? "true" : "false")
@@ -381,6 +383,9 @@ namespace NKikimr::NBlobDepot {
std::deque<TKey> KeysMadeCertain; // but not yet committed
bool CommitCertainKeysScheduled = false;
+ ui64 LastCollectCmdId = 0;
+ std::unordered_map<ui64, ui32> CollectCmdToGroup;
+
public:
TData(TBlobDepot *self);
~TData();
@@ -435,7 +440,7 @@ namespace NKikimr::NBlobDepot {
void AddTrashOnLoad(TLogoBlobID id);
void AddGenStepOnLoad(ui8 channel, ui32 groupId, TGenStep issuedGenStep, TGenStep confirmedGenStep);
- bool UpdateKeepState(TKey key, NKikimrBlobDepot::EKeepState keepState,
+ bool UpdateKeepState(TKey key, EKeepState keepState,
NTabletFlatExecutor::TTransactionContext& txc, void *cookie);
void DeleteKey(const TKey& key, NTabletFlatExecutor::TTransactionContext& txc, void *cookie);
void CommitTrash(void *cookie);
@@ -478,7 +483,7 @@ namespace NKikimr::NBlobDepot {
private:
void ExecuteIssueGC(ui8 channel, ui32 groupId, TGenStep issuedGenStep,
- std::unique_ptr<TEvBlobStorage::TEvCollectGarbage> collectGarbage);
+ std::unique_ptr<TEvBlobStorage::TEvCollectGarbage> collectGarbage, ui64 cookie);
void ExecuteConfirmGC(ui8 channel, ui32 groupId, std::vector<TLogoBlobID> trashDeleted, TGenStep confirmedGenStep);
};
diff --git a/ydb/core/blob_depot/data_gc.cpp b/ydb/core/blob_depot/data_gc.cpp
index 4eff9208f76..8d17f3dd9b5 100644
--- a/ydb/core/blob_depot/data_gc.cpp
+++ b/ydb/core/blob_depot/data_gc.cpp
@@ -10,15 +10,17 @@ namespace NKikimr::NBlobDepot {
const ui32 GroupId;
const TGenStep IssuedGenStep;
std::unique_ptr<TEvBlobStorage::TEvCollectGarbage> CollectGarbage;
+ const ui64 Cookie;
public:
TTxIssueGC(TBlobDepot *self, ui8 channel, ui32 groupId, TGenStep issuedGenStep,
- std::unique_ptr<TEvBlobStorage::TEvCollectGarbage> collectGarbage)
+ std::unique_ptr<TEvBlobStorage::TEvCollectGarbage> collectGarbage, ui64 cookie)
: TTransactionBase(self)
, Channel(channel)
, GroupId(groupId)
, IssuedGenStep(issuedGenStep)
, CollectGarbage(std::move(collectGarbage))
+ , Cookie(cookie)
{}
bool Execute(TTransactionContext& txc, const TActorContext&) override {
@@ -28,13 +30,13 @@ namespace NKikimr::NBlobDepot {
}
void Complete(const TActorContext&) override {
- SendToBSProxy(Self->SelfId(), GroupId, CollectGarbage.release(), GroupId);
+ SendToBSProxy(Self->SelfId(), GroupId, CollectGarbage.release(), Cookie);
}
};
void TData::ExecuteIssueGC(ui8 channel, ui32 groupId, TGenStep issuedGenStep,
- std::unique_ptr<TEvBlobStorage::TEvCollectGarbage> collectGarbage) {
- Self->Execute(std::make_unique<TTxIssueGC>(Self, channel, groupId, issuedGenStep, std::move(collectGarbage)));
+ std::unique_ptr<TEvBlobStorage::TEvCollectGarbage> collectGarbage, ui64 cookie) {
+ Self->Execute(std::make_unique<TTxIssueGC>(Self, channel, groupId, issuedGenStep, std::move(collectGarbage), cookie));
}
class TData::TTxConfirmGC : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
diff --git a/ydb/core/blob_depot/testing.cpp b/ydb/core/blob_depot/testing.cpp
index 27cf6fe6e6f..9c66c2ee152 100644
--- a/ydb/core/blob_depot/testing.cpp
+++ b/ydb/core/blob_depot/testing.cpp
@@ -1,5 +1,6 @@
#include "testing.h"
#include "blob_depot_tablet.h"
+#include "data.h"
namespace NKikimr::NBlobDepot {
@@ -15,13 +16,53 @@ namespace NKikimr::NBlobDepot {
}
}
+ void OnSuccessfulGetResult(IActor *actor, TLogoBlobID id) {
+ if (auto *x = dynamic_cast<TBlobDepot*>(actor)) {
+ x->OnSuccessfulGetResult(id);
+ } else {
+ Y_FAIL();
+ }
+ }
+
void TBlobDepot::Validate(NTesting::TGroupOverseer& overseer) const {
Y_VERIFY(Config.HasVirtualGroupId());
- (void)overseer;
-// overseer.EnumerateBlobs(Config.GetVirtualGroupId(), [&](TLogoBlobID id, NTesting::EBlobState state) {
-// Cerr << id.ToString() << Endl;
-// (void)state;
-// });
+ overseer.EnumerateBlobs(Config.GetVirtualGroupId(), [&](TLogoBlobID userId, NTesting::EBlobState userState) {
+ switch (userState) {
+ case NTesting::EBlobState::NOT_WRITTEN:
+ case NTesting::EBlobState::CERTAINLY_COLLECTED_OR_NEVER_WRITTEN:
+ Y_FAIL();
+
+ case NTesting::EBlobState::POSSIBLY_WRITTEN:
+ break;
+
+ case NTesting::EBlobState::CERTAINLY_WRITTEN: {
+ Cerr << userId.ToString() << Endl;
+ const TData::TKey key(userId);
+ const TData::TValue *value = Data->FindKey(key);
+ Y_VERIFY(value); // key must exist
+ ui32 numDataBytes = 0;
+ EnumerateBlobsForValueChain(value->ValueChain, TabletID(), [&](TLogoBlobID id, ui32, ui32 size) {
+ const ui32 groupId = Info()->GroupFor(id.Channel(), id.Generation());
+ const auto state = overseer.GetBlobState(groupId, id);
+ Y_VERIFY_S(state == NTesting::EBlobState::CERTAINLY_WRITTEN
+ || state == NTesting::EBlobState::POSSIBLY_WRITTEN,
+ "UserId# " << userId.ToString() << " UserState# " << (int)userState
+ << " Id# " << id.ToString() << " State# " << (int)state);
+ // FIXME(alexvru): handle this case somehow?
+ numDataBytes += size;
+ });
+ Y_VERIFY(numDataBytes == userId.BlobSize());
+ break;
+ }
+
+ case NTesting::EBlobState::POSSIBLY_COLLECTED:
+ break;
+ }
+ });
+ }
+
+ void TBlobDepot::OnSuccessfulGetResult(TLogoBlobID id) const {
+ (void)id; // FIXME(alexvru): handle race with blob deletion
}
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/testing.h b/ydb/core/blob_depot/testing.h
index 04d5d87646a..a41141c0153 100644
--- a/ydb/core/blob_depot/testing.h
+++ b/ydb/core/blob_depot/testing.h
@@ -12,5 +12,6 @@ namespace NKikimr::NBlobDepot {
bool IsBlobDepotActor(IActor *actor);
void ValidateBlobDepot(IActor *actor, NTesting::TGroupOverseer& overseer);
+ void OnSuccessfulGetResult(IActor *actor, TLogoBlobID id);
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blobstorage/testing/group_overseer/group_overseer.cpp b/ydb/core/blobstorage/testing/group_overseer/group_overseer.cpp
index fe662e5249e..852119018a8 100644
--- a/ydb/core/blobstorage/testing/group_overseer/group_overseer.cpp
+++ b/ydb/core/blobstorage/testing/group_overseer/group_overseer.cpp
@@ -12,7 +12,7 @@ namespace NKikimr::NTesting {
void AddGroupToOversee(ui32 groupId) {
const TActorId proxyId = MakeBlobStorageProxyID(groupId);
OverseenServiceMap[proxyId] = groupId;
- GroupStates.try_emplace(groupId);
+ GroupStates.try_emplace(groupId, groupId);
}
void ExamineEvent(ui32 nodeId, IEventHandle& ev) {
@@ -75,7 +75,8 @@ namespace NKikimr::NTesting {
const TQueryId queryId{nodeId, resultEventType, ev.Sender, ev.Cookie};
const auto [_, inserted] = QueryToGroup.emplace(queryId, groupId);
if (inserted) {
- GroupStates[groupId].ExamineQueryEvent(queryId, *ev.Get<T>());
+ const auto groupStateIt = GroupStates.try_emplace(groupId, groupId).first;
+ groupStateIt->second.ExamineQueryEvent(queryId, *ev.Get<T>());
}
}
@@ -99,7 +100,8 @@ namespace NKikimr::NTesting {
Y_VERIFY(groupId == msg.GroupId);
}
- GroupStates[groupId].ExamineResultEvent(queryId, msg);
+ const auto groupStateIt = GroupStates.try_emplace(groupId, groupId).first;
+ groupStateIt->second.ExamineResultEvent(queryId, msg);
}
};
diff --git a/ydb/core/blobstorage/testing/group_overseer/group_state.cpp b/ydb/core/blobstorage/testing/group_overseer/group_state.cpp
index 469017496bd..839781449b6 100644
--- a/ydb/core/blobstorage/testing/group_overseer/group_state.cpp
+++ b/ydb/core/blobstorage/testing/group_overseer/group_state.cpp
@@ -42,6 +42,8 @@ namespace NKikimr::NTesting {
template<>
void TGroupState::ExamineQueryEvent(const TQueryId& queryId, const TEvBlobStorage::TEvPut& msg) {
+ Log(TStringBuilder() << "TEvPut " << msg.ToString());
+
TBlobInfo& blob = *LookupBlob(msg.Id, true);
TBlobValueHash valueHash(msg);
@@ -74,6 +76,8 @@ namespace NKikimr::NTesting {
template<>
void TGroupState::ExamineResultEvent(const TQueryId& queryId, const TEvBlobStorage::TEvPutResult& msg) {
+ Log(TStringBuilder() << "TEvPutResult " << msg.ToString());
+
const auto it = Blobs.find(msg.Id);
if (msg.Status != NKikimrProto::OK && it == Blobs.end()) {
Y_VERIFY(GetBlobState(msg.Id) == EBlobState::CERTAINLY_COLLECTED_OR_NEVER_WRITTEN);
@@ -131,16 +135,20 @@ namespace NKikimr::NTesting {
template<>
void TGroupState::ExamineQueryEvent(const TQueryId& queryId, const TEvBlobStorage::TEvCollectGarbage& msg) {
+ Log(TStringBuilder() << "TEvCollectGarbage " << msg.ToString());
+
auto processFlags = [&](TVector<TLogoBlobID> *vector, bool isKeepFlag) {
- for (const TLogoBlobID& id : vector ? *vector : TVector<TLogoBlobID>()) {
- TBlobInfo& blob = *LookupBlob(id, true);
- ++(isKeepFlag ? blob.NumKeepsInFlight : blob.NumDoNotKeepsInFlight);
- FlagsInFlight.emplace(queryId, std::make_tuple(isKeepFlag, id));
+ if (vector) {
+ for (const TLogoBlobID& id : *vector) {
+ TBlobInfo& blob = *LookupBlob(id, true);
+ ++(isKeepFlag ? blob.NumKeepsInFlight : blob.NumDoNotKeepsInFlight);
+ FlagsInFlight.emplace(queryId, std::make_tuple(isKeepFlag, id));
+ }
}
};
processFlags(msg.Keep.Get(), true);
- processFlags(msg.DoNotKeep.Get(), true);
+ processFlags(msg.DoNotKeep.Get(), false);
if (msg.Collect) {
const TBarrierId id(msg.TabletId, msg.Channel);
@@ -163,6 +171,10 @@ namespace NKikimr::NTesting {
template<>
void TGroupState::ExamineResultEvent(const TQueryId& queryId, const TEvBlobStorage::TEvCollectGarbageResult& msg) {
+ Log(TStringBuilder() << "TEvCollectGarbageResult " << msg.ToString());
+
+ std::set<TLogoBlobID> idsToExamine;
+
auto [begin, end] = FlagsInFlight.equal_range(queryId);
for (auto it = begin; it != end; ++it) {
const auto& [isKeepFlag, id] = it->second;
@@ -171,6 +183,7 @@ namespace NKikimr::NTesting {
if (msg.Status == NKikimrProto::OK) {
(isKeepFlag ? blob.ConfirmedKeep : blob.ConfirmedDoNotKeep) = true;
}
+ idsToExamine.insert(id);
}
FlagsInFlight.erase(begin, end);
@@ -182,11 +195,21 @@ namespace NKikimr::NTesting {
auto inFlight = barrier.InFlight[hard].extract(inFlightIt);
Y_VERIFY(inFlight);
if (msg.Status == NKikimrProto::OK) {
+ const auto& value = inFlight.value().Value;
+ std::optional<std::tuple<ui32, ui32>> prev;
if (auto& dest = barrier.Confirmed[hard]) {
- dest->Supersede(inFlight.value().Value);
+ prev.emplace(dest->GetCollectGenStep());
+ dest->Supersede(value);
} else {
- dest.emplace(inFlight.value().Value);
+ dest.emplace(value);
}
+ ApplyBarrier(barrierId, prev, value.GetCollectGenStep());
+ }
+ }
+
+ for (const TLogoBlobID& id : idsToExamine) {
+ if (const auto it = Blobs.find(id); it != Blobs.end() && IsCollected(id, &it->second) == EConfidence::CONFIRMED) {
+ Blobs.erase(id);
}
}
}
@@ -233,6 +256,16 @@ namespace NKikimr::NTesting {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Common parts
+ TGroupState::TGroupState(ui32 groupId)
+ : LogPrefix(TStringBuilder() << "[" << groupId << "] ")
+ {}
+
+ void TGroupState::Log(TString message) const {
+ if (LogPrefix) {
+ Cerr << LogPrefix << message << Endl;
+ }
+ }
+
TGroupState::EConfidence TGroupState::TBlockInfo::IsBlocked(ui32 generation) const {
if (Confirmed && generation <= Confirmed->Generation) {
return EConfidence::CONFIRMED;
@@ -311,6 +344,34 @@ namespace NKikimr::NTesting {
return result;
}
+ TGroupState::EConfidence TGroupState::IsCollected(TLogoBlobID id, const TBlobInfo *blob) const {
+ const EConfidence keep = blob->ConfirmedKeep ? EConfidence::CONFIRMED :
+ blob->NumKeepsInFlight ? EConfidence::POSSIBLE : EConfidence::SURELY_NOT;
+ const EConfidence doNotKeep = blob->ConfirmedDoNotKeep ? EConfidence::CONFIRMED :
+ blob->NumDoNotKeepsInFlight ? EConfidence::POSSIBLE : EConfidence::SURELY_NOT;
+ return IsCollected(id, keep, doNotKeep);
+ }
+
+ void TGroupState::ApplyBarrier(TBarrierId barrierId, std::optional<std::tuple<ui32, ui32>> prevGenStep,
+ std::tuple<ui32, ui32> collectGenStep) {
+ const auto& [tabletId, channel] = barrierId;
+ auto it = prevGenStep
+ ? Blobs.upper_bound(TLogoBlobID(tabletId, std::get<0>(*prevGenStep), std::get<1>(*prevGenStep), channel,
+ TLogoBlobID::MaxBlobSize, TLogoBlobID::MaxCookie, TLogoBlobID::MaxPartId, TLogoBlobID::MaxCrcMode))
+ : Blobs.lower_bound(TLogoBlobID(tabletId, 0, 0, channel, 0, 0));
+ auto key = [](const TLogoBlobID& id) {
+ return std::make_tuple(id.TabletID(), id.Channel(), id.Generation(), id.Step());
+ };
+ const auto maxKey = std::tuple_cat(std::make_tuple(tabletId, channel), collectGenStep);
+ while (it != Blobs.end() && key(it->first) <= maxKey) {
+ if (IsCollected(it->first, &it->second) == EConfidence::CONFIRMED) {
+ it = Blobs.erase(it);
+ } else {
+ ++it;
+ }
+ }
+ }
+
TGroupState::TBlobInfo *TGroupState::LookupBlob(TLogoBlobID id, bool create) const {
Y_VERIFY(id.BlobSize() != 0);
Y_VERIFY(id.PartId() == 0);
@@ -347,11 +408,7 @@ namespace NKikimr::NTesting {
if (!blob->ValueHash) {
return EBlobState::NOT_WRITTEN;
}
- const EConfidence keep = blob->ConfirmedKeep ? EConfidence::CONFIRMED :
- blob->NumKeepsInFlight ? EConfidence::POSSIBLE : EConfidence::SURELY_NOT;
- const EConfidence doNotKeep = blob->ConfirmedDoNotKeep ? EConfidence::CONFIRMED :
- blob->NumDoNotKeepsInFlight ? EConfidence::POSSIBLE : EConfidence::SURELY_NOT;
- switch (IsCollected(id, keep, doNotKeep)) {
+ switch (IsCollected(id, blob)) {
case EConfidence::SURELY_NOT:
return blob->ConfirmedValue ? EBlobState::CERTAINLY_WRITTEN : EBlobState::POSSIBLY_WRITTEN;
diff --git a/ydb/core/blobstorage/testing/group_overseer/group_state.h b/ydb/core/blobstorage/testing/group_overseer/group_state.h
index f71655d4ddf..4a9fc651425 100644
--- a/ydb/core/blobstorage/testing/group_overseer/group_state.h
+++ b/ydb/core/blobstorage/testing/group_overseer/group_state.h
@@ -107,7 +107,11 @@ namespace NKikimr::NTesting {
std::unordered_map<TBarrierId, TBarrierInfo> Barriers;
std::unordered_multimap<TQueryId, std::tuple<bool, TLogoBlobID>> FlagsInFlight;
+ struct TBlobInfo;
EConfidence IsCollected(TLogoBlobID id, EConfidence keep, EConfidence doNotKeep) const;
+ EConfidence IsCollected(TLogoBlobID id, const TBlobInfo *blob) const;
+ void ApplyBarrier(TBarrierId barrierId, std::optional<std::tuple<ui32, ui32>> prevGenStep,
+ std::tuple<ui32, ui32> collectGenStep);
struct TBlobValueHash {
ui64 Low;
@@ -134,7 +138,6 @@ namespace NKikimr::NTesting {
bool ConfirmedDoNotKeep = false;
ui32 NumKeepsInFlight = 0; // number of CollectGarbage requests in flight with Keep flag for this blob
ui32 NumDoNotKeepsInFlight = 0; // the same, but for DoNotKeep flag
- EBlobState BlobState = EBlobState::NOT_WRITTEN;
struct TQueryContext {
bool IsBlocked = false; // was the request already blocked when the Put got issued?
@@ -147,7 +150,11 @@ namespace NKikimr::NTesting {
TBlobInfo *LookupBlob(TLogoBlobID id, bool create) const;
+ void Log(TString message) const;
+
public:
+ TGroupState(ui32 groupId);
+
template<typename T>
void ExamineQueryEvent(const TQueryId& queryId, const T& msg);
@@ -156,6 +163,9 @@ namespace NKikimr::NTesting {
EBlobState GetBlobState(TLogoBlobID id, const TBlobInfo *blob = nullptr) const;
void EnumerateBlobs(const std::function<void(TLogoBlobID, EBlobState)>& callback) const;
+
+ private:
+ TString LogPrefix;
};
} // NKikimr::NTesting
diff --git a/ydb/core/blobstorage/ut_testshard/env.h b/ydb/core/blobstorage/ut_testshard/env.h
index 6a55403faf5..6d251269b21 100644
--- a/ydb/core/blobstorage/ut_testshard/env.h
+++ b/ydb/core/blobstorage/ut_testshard/env.h
@@ -185,8 +185,8 @@ struct TEnvironmentSetup {
// NKikimrServices::BS_PROXY_INDEXRESTOREGET,
// NKikimrServices::BS_PROXY_STATUS,
NActorsServices::TEST,
-// NKikimrServices::BLOB_DEPOT,
-// NKikimrServices::BLOB_DEPOT_AGENT,
+ NKikimrServices::BLOB_DEPOT,
+ NKikimrServices::BLOB_DEPOT_AGENT,
// NKikimrServices::HIVE,
// NKikimrServices::LOCAL,
// NKikimrServices::TEST_SHARD,