aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2022-07-13 22:12:17 +0300
committeralexvru <alexvru@ydb.tech>2022-07-13 22:12:17 +0300
commitbd3fd08805f8f6a5d618885c38f57f1d498d682f (patch)
tree1c9b24b140238eee5b187c07b07249f46b9d88c1
parenta3311ac7b9ea1ade9487351839b35bfab06583d3 (diff)
downloadydb-bd3fd08805f8f6a5d618885c38f57f1d498d682f.tar.gz
BlobDepot work in progress
-rw-r--r--ydb/core/blob_depot/agent.cpp25
-rw-r--r--ydb/core/blob_depot/agent/agent_impl.h2
-rw-r--r--ydb/core/blob_depot/agent/read.cpp19
-rw-r--r--ydb/core/blob_depot/agent/request.cpp27
-rw-r--r--ydb/core/blob_depot/data.cpp95
-rw-r--r--ydb/core/blob_depot/data.h9
-rw-r--r--ydb/core/blob_depot/garbage_collection.cpp88
-rw-r--r--ydb/core/blob_depot/garbage_collection.h15
-rw-r--r--ydb/core/blob_depot/given_id_range.cpp48
-rw-r--r--ydb/core/blob_depot/mon_main.cpp10
-rw-r--r--ydb/core/blob_depot/op_commit_blob_seq.cpp24
-rw-r--r--ydb/core/blob_depot/op_load.cpp21
-rw-r--r--ydb/core/blob_depot/schema.h19
-rw-r--r--ydb/core/blob_depot/types.h68
14 files changed, 322 insertions, 148 deletions
diff --git a/ydb/core/blob_depot/agent.cpp b/ydb/core/blob_depot/agent.cpp
index 7cecc58d313..68de7977909 100644
--- a/ydb/core/blob_depot/agent.cpp
+++ b/ydb/core/blob_depot/agent.cpp
@@ -4,13 +4,15 @@
namespace NKikimr::NBlobDepot {
void TBlobDepot::Handle(TEvTabletPipe::TEvServerConnected::TPtr ev) {
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT01, "TEvServerConnected", (TabletId, TabletID()), (Msg, ev->Get()->ToString()));
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT01, "TEvServerConnected", (TabletId, TabletID()),
+ (PipeServerId, ev->Get()->ServerId));
const auto [it, inserted] = PipeServerToNode.emplace(ev->Get()->ServerId, std::nullopt);
Y_VERIFY(inserted);
}
void TBlobDepot::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr ev) {
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT02, "TEvServerDisconnected", (TabletId, TabletID()), (Msg, ev->Get()->ToString()));
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT02, "TEvServerDisconnected", (TabletId, TabletID()),
+ (PipeServerId, ev->Get()->ServerId));
const auto it = PipeServerToNode.find(ev->Get()->ServerId);
Y_VERIFY(it != PipeServerToNode.end());
if (const auto& nodeId = it->second) {
@@ -31,17 +33,18 @@ namespace NKikimr::NBlobDepot {
}
void TBlobDepot::Handle(TEvBlobDepot::TEvRegisterAgent::TPtr ev) {
+ const ui32 nodeId = ev->Sender.NodeId();
+ const TActorId& pipeServerId = ev->Recipient;
const auto& req = ev->Get()->Record;
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT03, "TEvRegisterAgent", (TabletId, TabletID()), (Msg, req), (NodeId, nodeId),
+ (PipeServerId, pipeServerId));
- const auto it = PipeServerToNode.find(ev->Recipient);
+ const auto it = PipeServerToNode.find(pipeServerId);
Y_VERIFY(it != PipeServerToNode.end());
- const ui32 nodeId = ev->Sender.NodeId();
Y_VERIFY(!it->second || *it->second == nodeId);
it->second = nodeId;
auto& agent = Agents[nodeId];
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT03, "TEvRegisterAgent", (TabletId, TabletID()), (Msg, req), (NodeId, nodeId),
- (PipeServerId, it->first));
- agent.ConnectedAgent = it->first;
+ agent.ConnectedAgent = pipeServerId;
agent.ConnectedNodeId = nodeId;
agent.ExpirationTimestamp = TInstant::Max();
@@ -125,6 +128,9 @@ namespace NKikimr::NBlobDepot {
for (const auto& range : givenIdRange->GetChannelRanges()) {
agent.GivenIdRanges[range.GetChannel()].IssueNewRange(range.GetBegin(), range.GetEnd());
Channels[range.GetChannel()].GivenIdRanges.IssueNewRange(range.GetBegin(), range.GetEnd());
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT99, "IssueNewRange", (TabletId, TabletID()),
+ (AgentId, agent.ConnectedNodeId), (Channel, range.GetChannel()),
+ (Begin, range.GetBegin()), (End, range.GetEnd()));
}
}
@@ -149,6 +155,11 @@ namespace NKikimr::NBlobDepot {
void TBlobDepot::ResetAgent(TAgent& agent) {
for (auto& [channel, agentGivenIdRange] : agent.GivenIdRanges) {
Channels[channel].GivenIdRanges.Subtract(agentGivenIdRange);
+ const ui32 channel_ = channel;
+ const auto& agentGivenIdRange_ = agentGivenIdRange;
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT99, "ResetAgent", (TabletId, TabletID()), (AgentId, agent.ConnectedNodeId),
+ (Channel, channel_), (GivenIdRanges, Channels[channel_].GivenIdRanges),
+ (Agent.GivenIdRanges, agentGivenIdRange_));
agentGivenIdRange = {};
}
Data->HandleTrash();
diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h
index dc3068976d9..92984f132c6 100644
--- a/ydb/core/blob_depot/agent/agent_impl.h
+++ b/ydb/core/blob_depot/agent/agent_impl.h
@@ -61,6 +61,8 @@ namespace NKikimr::NBlobDepot {
TEvBlobStorage::TEvPutResult*
>;
+ static TString ToString(const TResponse& response);
+
public:
TRequestSender(TBlobDepotAgent& agent);
virtual ~TRequestSender();
diff --git a/ydb/core/blob_depot/agent/read.cpp b/ydb/core/blob_depot/agent/read.cpp
index 941a9258328..1d61329ae3d 100644
--- a/ydb/core/blob_depot/agent/read.cpp
+++ b/ydb/core/blob_depot/agent/read.cpp
@@ -58,14 +58,17 @@ namespace NKikimr::NBlobDepot {
return false;
}
- const ui64 partLen = end - begin;
+ // calculate the whole length of current part
+ ui64 partLen = end - begin;
if (offset >= partLen) {
// just skip this part
offset -= partLen;
continue;
}
- const ui64 partSize = Min(size ? size : Max<ui64>(), partLen - offset);
+ // adjust it to fit size and offset
+ partLen = Min(size ? size : Max<ui64>(), partLen - offset);
+ Y_VERIFY(partLen);
auto blobSeqId = TBlobSeqId::FromProto(locator.GetBlobSeqId());
@@ -75,19 +78,20 @@ namespace NKikimr::NBlobDepot {
const ui32 blobSize = totalDataLen + (composite ? sizeof(TVirtualGroupBlobFooter) : 0);
const auto id = blobSeqId.MakeBlobId(TabletId, type, 0, blobSize);
items.push_back(TReadItem{locator.GetGroupId(), id, static_cast<ui32>(offset + begin),
- static_cast<ui32>(partSize), outputOffset});
+ static_cast<ui32>(partLen), outputOffset});
} else {
Y_FAIL();
}
+ outputOffset += partLen;
+ offset = 0;
+
if (size) {
- size -= partSize;
+ size -= partLen;
if (!size) {
break;
}
}
- offset = 0;
- outputOffset += partSize;
}
if (size) {
@@ -130,7 +134,8 @@ namespace NKikimr::NBlobDepot {
readContext.ReadOffsets.erase(it);
for (const ui64 offset : v) {
- Y_VERIFY(offset + blob.Buffer.size() <= readContext.Size);
+ Y_VERIFY_S(offset + blob.Buffer.size() <= readContext.Size, "offset# " << offset << " Buffer.size# "
+ << blob.Buffer.size() << " Size# " << readContext.Size);
if (!readContext.Buffer && !offset) {
readContext.Buffer = std::move(blob.Buffer);
readContext.Buffer.resize(readContext.Size);
diff --git a/ydb/core/blob_depot/agent/request.cpp b/ydb/core/blob_depot/agent/request.cpp
index 3c7fe68706a..72adcaa0906 100644
--- a/ydb/core/blob_depot/agent/request.cpp
+++ b/ydb/core/blob_depot/agent/request.cpp
@@ -44,6 +44,20 @@ namespace NKikimr::NBlobDepot {
ProcessResponse(id, std::move(context), std::move(response));
}
+ TString TRequestSender::ToString(const TResponse& response) {
+ auto printer = [](auto& value) -> TString {
+ using T = std::decay_t<decltype(value)>;
+ if constexpr (std::is_same_v<T, TTabletDisconnected>) {
+ return "TTabletDisconnected";
+ } else if constexpr (std::is_same_v<T, TKeyResolved>) {
+ return "TKeyResolved";
+ } else {
+ return value->ToString();
+ }
+ };
+ return std::visit(printer, response);
+ }
+
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// TBlobDepotAgent machinery
@@ -59,8 +73,7 @@ namespace NKikimr::NBlobDepot {
void TBlobDepotAgent::HandleTabletResponse(TAutoPtr<TEventHandle<TEvent>> ev) {
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA15, "HandleTabletResponse", (VirtualGroupId, VirtualGroupId),
(Id, ev->Cookie), (Type, TypeName<TEvent>()));
- auto *event = ev->Get();
- OnRequestComplete(ev->Cookie, event, TabletRequestInFlight);
+ OnRequestComplete(ev->Cookie, ev->Get(), TabletRequestInFlight);
}
template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvRegisterAgentResult::TPtr ev);
@@ -75,8 +88,7 @@ namespace NKikimr::NBlobDepot {
void TBlobDepotAgent::HandleOtherResponse(TAutoPtr<TEventHandle<TEvent>> ev) {
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA16, "HandleOtherResponse", (VirtualGroupId, VirtualGroupId),
(Id, ev->Cookie), (Type, TypeName<TEvent>()));
- auto *event = ev->Get();
- OnRequestComplete(ev->Cookie, event, OtherRequestInFlight);
+ OnRequestComplete(ev->Cookie, ev->Get(), OtherRequestInFlight);
}
template void TBlobDepotAgent::HandleOtherResponse(TEvBlobStorage::TEvGetResult::TPtr ev);
@@ -84,10 +96,11 @@ namespace NKikimr::NBlobDepot {
void TBlobDepotAgent::OnRequestComplete(ui64 id, TResponse response, TRequestsInFlight& map) {
const auto it = map.find(id);
- Y_VERIFY(it != map.end());
- auto& [_, request] = *it;
- request.Sender->OnRequestComplete(id, std::move(response));
+ Y_VERIFY_S(it != map.end(), "id# " << id << " response# " << TRequestSender::ToString(response));
+ TRequestInFlight request = std::move(it->second);
map.erase(it);
+
+ request.Sender->OnRequestComplete(id, std::move(response));
}
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp
index 43b4d5ceee1..b0d58eddae0 100644
--- a/ydb/core/blob_depot/data.cpp
+++ b/ydb/core/blob_depot/data.cpp
@@ -3,17 +3,43 @@
namespace NKikimr::NBlobDepot {
+ class TBlobDepot::TData::TTxIssueGC : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
+ const ui8 Channel;
+ const ui32 GroupId;
+ const TGenStep IssuedGenStep;
+ std::unique_ptr<TEvBlobStorage::TEvCollectGarbage> CollectGarbage;
+
+ public:
+ TTxIssueGC(TBlobDepot *self, ui8 channel, ui32 groupId, TGenStep issuedGenStep,
+ std::unique_ptr<TEvBlobStorage::TEvCollectGarbage> collectGarbage)
+ : TTransactionBase(self)
+ , Channel(channel)
+ , GroupId(groupId)
+ , IssuedGenStep(issuedGenStep)
+ , CollectGarbage(std::move(collectGarbage))
+ {}
+
+ bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ NIceDb::TNiceDb db(txc.DB);
+ db.Table<Schema::GC>().Key(Channel, GroupId).Update<Schema::GC::IssuedGenStep>(ui64(IssuedGenStep));
+ return true;
+ }
+
+ void Complete(const TActorContext&) override {
+ SendToBSProxy(Self->SelfId(), GroupId, CollectGarbage.release(), GroupId);
+ }
+ };
+
class TBlobDepot::TData::TTxConfirmGC : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
const ui8 Channel;
const ui32 GroupId;
std::vector<TLogoBlobID> TrashDeleted;
- const ui64 ConfirmedGenStep;
+ const TGenStep ConfirmedGenStep;
static constexpr ui32 MaxKeysToProcessAtOnce = 10'000;
public:
- TTxConfirmGC(TBlobDepot *self, ui8 channel, ui32 groupId, std::vector<TLogoBlobID> trashDeleted,
- ui64 confirmedGenStep)
+ TTxConfirmGC(TBlobDepot *self, ui8 channel, ui32 groupId, std::vector<TLogoBlobID> trashDeleted, TGenStep confirmedGenStep)
: TTransactionBase(self)
, Channel(channel)
, GroupId(groupId)
@@ -29,8 +55,7 @@ namespace NKikimr::NBlobDepot {
}
if (TrashDeleted.size() <= MaxKeysToProcessAtOnce) {
TrashDeleted.clear();
- db.Table<Schema::ConfirmedGC>().Key(Channel, GroupId).Update<Schema::ConfirmedGC::ConfirmedGenStep>(
- ConfirmedGenStep);
+ db.Table<Schema::GC>().Key(Channel, GroupId).Update<Schema::GC::ConfirmedGenStep>(ui64(ConfirmedGenStep));
} else {
std::vector<TLogoBlobID> temp;
temp.insert(temp.end(), TrashDeleted.begin() + MaxKeysToProcessAtOnce, TrashDeleted.end());
@@ -81,10 +106,11 @@ namespace NKikimr::NBlobDepot {
OnTrashInserted(record);
}
- void TBlobDepot::TData::AddConfirmedGenStepOnLoad(ui8 channel, ui32 groupId, ui64 confirmedGenStep) {
+ void TBlobDepot::TData::AddGenStepOnLoad(ui8 channel, ui32 groupId, TGenStep issuedGenStep, TGenStep confirmedGenStep) {
const auto& key = std::make_tuple(Self->TabletID(), channel, groupId);
const auto [it, _] = RecordsPerChannelGroup.try_emplace(key, Self->TabletID(), channel, groupId);
auto& record = it->second;
+ record.IssuedGenStep = issuedGenStep;
record.LastConfirmedGenStep = confirmedGenStep;
}
@@ -176,7 +202,7 @@ namespace NKikimr::NBlobDepot {
Y_VERIFY(record.TabletId == Self->TabletID());
Y_VERIFY(!record.Trash.empty());
- ui64 nextGenStep = GenStep(*--record.Trash.end());
+ TGenStep nextGenStep(*--record.Trash.end());
Y_VERIFY(record.Channel < Self->Channels.size());
auto& channel = Self->Channels[record.Channel];
@@ -187,15 +213,15 @@ namespace NKikimr::NBlobDepot {
channel.GivenIdRanges.GetMinimumValue());
// step we are going to invalidate (including blobs with this one)
- const ui32 invalidatedStep = ui32(nextGenStep);
+ const ui32 invalidatedStep = nextGenStep.Step();
if (minBlobSeqId.Step <= invalidatedStep) {
const TLogoBlobID maxId(record.TabletId, generation, minBlobSeqId.Step, record.Channel, 0, 0);
const auto it = record.Trash.lower_bound(maxId);
if (it != record.Trash.begin()) {
- nextGenStep = GenStep(*std::prev(it));
+ nextGenStep = TGenStep(*std::prev(it));
} else {
- nextGenStep = 0;
+ nextGenStep = {};
}
// issue notifications to agents
@@ -224,14 +250,14 @@ namespace NKikimr::NBlobDepot {
auto doNotKeep = std::make_unique<TVector<TLogoBlobID>>();
// FIXME: check for blob leaks when LastConfirmedGenStep is not properly persisted
- for (auto it = record.Trash.begin(); it != record.Trash.end() && GenStep(*it) <= record.LastConfirmedGenStep; ++it) {
+ for (auto it = record.Trash.begin(); it != record.Trash.end() && TGenStep(*it) <= record.LastConfirmedGenStep; ++it) {
doNotKeep->push_back(*it);
}
// FIXME: check for blob loss when LastConfirmedGenStep is not properly persisted
- const TLogoBlobID keepFrom(record.TabletId, ui32(record.LastConfirmedGenStep >> 32),
- ui32(record.LastConfirmedGenStep), record.Channel, 0, 0);
- for (auto it = record.Used.upper_bound(keepFrom); it != record.Used.end() && GenStep(*it) <= nextGenStep; ++it) {
+ const TLogoBlobID keepFrom(record.TabletId, record.LastConfirmedGenStep.Generation(),
+ record.LastConfirmedGenStep.Step(), record.Channel, 0, 0);
+ for (auto it = record.Used.upper_bound(keepFrom); it != record.Used.end() && TGenStep(*it) <= nextGenStep; ++it) {
keep->push_back(*it);
}
@@ -248,7 +274,7 @@ namespace NKikimr::NBlobDepot {
}
auto ev = std::make_unique<TEvBlobStorage::TEvCollectGarbage>(record.TabletId, generation,
- record.PerGenerationCounter, record.Channel, collect, ui32(nextGenStep >> 32), ui32(nextGenStep),
+ record.PerGenerationCounter, record.Channel, collect, nextGenStep.Generation(), nextGenStep.Step(),
keep.get(), doNotKeep.get(), TInstant::Max(), true);
keep.release();
doNotKeep.release();
@@ -256,22 +282,27 @@ namespace NKikimr::NBlobDepot {
record.CollectGarbageRequestInFlight = true;
record.PerGenerationCounter += ev->Collect ? ev->PerGenerationCounterStepSize() : 0;
record.TrashInFlight.insert(record.TrashInFlight.end(), record.Trash.begin(), record.Trash.end());
- record.NextGenStep = Max(nextGenStep, record.LastConfirmedGenStep);
+ record.IssuedGenStep = Max(nextGenStep, record.LastConfirmedGenStep);
auto blobSeqId = TBlobSeqId::FromSequentalNumber(record.Channel, generation, channel.NextBlobSeqId);
- Y_VERIFY(record.LastConfirmedGenStep < GenStep(blobSeqId));
- if (GenStep(blobSeqId) <= nextGenStep) {
- blobSeqId.Step = ui32(nextGenStep) + 1;
+ Y_VERIFY(record.LastConfirmedGenStep < TGenStep(blobSeqId));
+ if (TGenStep(blobSeqId) <= nextGenStep) {
+ blobSeqId.Step = nextGenStep.Step() + 1;
blobSeqId.Index = 0;
channel.NextBlobSeqId = blobSeqId.ToSequentialNumber();
}
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT09, "issuing TEvCollectGarbage", (TabletId, Self->TabletID()),
(Channel, record.Channel), (GroupId, record.GroupId), (Msg, ev->ToString()),
- (LastConfirmedGenStep, record.LastConfirmedGenStep), (NextGenStep, record.NextGenStep),
+ (LastConfirmedGenStep, record.LastConfirmedGenStep), (IssuedGenStep, record.IssuedGenStep),
(TrashInFlight.size, record.TrashInFlight.size()));
- SendToBSProxy(Self->SelfId(), record.GroupId, ev.release(), record.GroupId);
+ if (collect) {
+ Self->Execute(std::make_unique<TTxIssueGC>(Self, record.Channel, record.GroupId, record.IssuedGenStep,
+ std::move(ev)));
+ } else {
+ SendToBSProxy(Self->SelfId(), record.GroupId, ev.release(), record.GroupId);
+ }
}
RecordsWithTrash.Clear();
@@ -301,7 +332,7 @@ namespace NKikimr::NBlobDepot {
for (const TLogoBlobID& id : record.TrashInFlight) { // make it merge
record.Trash.erase(id);
}
- record.LastConfirmedGenStep = record.NextGenStep;
+ record.LastConfirmedGenStep = record.IssuedGenStep;
Self->Execute(std::make_unique<TTxConfirmGC>(Self, record.Channel, record.GroupId,
std::exchange(record.TrashInFlight, {}), record.LastConfirmedGenStep));
} else {
@@ -316,6 +347,12 @@ namespace NKikimr::NBlobDepot {
const ui32 generation = Self->Executor()->Generation();
if (const auto it = agent.InvalidateStepRequests.find(ev->Cookie); it != agent.InvalidateStepRequests.end()) {
for (const auto& [channel, invalidatedStep] : it->second) {
+ const ui32 channel_ = channel;
+ const ui32 invalidatedStep_ = invalidatedStep;
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT99, "Trim", (TabletId, Self->TabletID()), (AgentId, agent.ConnectedNodeId),
+ (Channel, channel_), (InvalidatedStep, invalidatedStep_),
+ (GivenIdRanges, Self->Channels[channel_].GivenIdRanges),
+ (Agent.GivenIdRanges, agent.GivenIdRanges[channel_]));
Self->Channels[channel].GivenIdRanges.Trim(channel, generation, invalidatedStep);
agent.GivenIdRanges[channel].Trim(channel, generation, invalidatedStep);
}
@@ -323,8 +360,13 @@ namespace NKikimr::NBlobDepot {
}
for (const auto& item : ev->Get()->Record.GetWritesInFlight()) {
const auto blobSeqId = TBlobSeqId::FromProto(item);
- Self->Channels[blobSeqId.Channel].GivenIdRanges.AddPoint(blobSeqId.ToSequentialNumber());
- agent.GivenIdRanges[blobSeqId.Channel].AddPoint(blobSeqId.ToSequentialNumber());
+ const ui64 value = blobSeqId.ToSequentialNumber();
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT99, "WriteInFlight", (TabletId, Self->TabletID()), (AgentId, agent.ConnectedNodeId),
+ (BlobSeqId, blobSeqId), (Value, value),
+ (GivenIdRanges, Self->Channels[blobSeqId.Channel].GivenIdRanges),
+ (Agent.GivenIdRanges, agent.GivenIdRanges[blobSeqId.Channel]));
+ Self->Channels[blobSeqId.Channel].GivenIdRanges.AddPoint(value);
+ agent.GivenIdRanges[blobSeqId.Channel].AddPoint(value);
}
HandleTrash();
}
@@ -342,4 +384,9 @@ namespace NKikimr::NBlobDepot {
}
}
+ bool TBlobDepot::TData::CanBeCollected(ui32 groupId, TBlobSeqId id) const {
+ const auto it = RecordsPerChannelGroup.find(std::make_tuple(Self->TabletID(), id.Channel, groupId));
+ return it != RecordsPerChannelGroup.end() && TGenStep(id) <= it->second.IssuedGenStep;
+ }
+
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h
index 3d092bd995c..8e8b4ba84e4 100644
--- a/ydb/core/blob_depot/data.h
+++ b/ydb/core/blob_depot/data.h
@@ -246,8 +246,8 @@ namespace NKikimr::NBlobDepot {
std::set<TLogoBlobID> Trash; // committed trash
std::vector<TLogoBlobID> TrashInFlight;
ui32 PerGenerationCounter = 1;
- ui64 LastConfirmedGenStep = 0;
- ui64 NextGenStep = 0;
+ TGenStep IssuedGenStep; // currently in flight or already confirmed
+ TGenStep LastConfirmedGenStep;
bool CollectGarbageRequestInFlight = false;
TRecordsPerChannelGroup(ui64 tabletId, ui8 channel, ui32 groupId)
@@ -264,6 +264,7 @@ namespace NKikimr::NBlobDepot {
THashMultiMap<void*, TLogoBlobID> InFlightTrash; // being committed, but not yet confirmed
+ class TTxIssueGC;
class TTxConfirmGC;
public:
@@ -307,7 +308,7 @@ namespace NKikimr::NBlobDepot {
void AddDataOnLoad(TKey key, TString value);
void AddTrashOnLoad(TLogoBlobID id);
- void AddConfirmedGenStepOnLoad(ui8 channel, ui32 groupId, ui64 confirmedGenStep);
+ void AddGenStepOnLoad(ui8 channel, ui32 groupId, TGenStep issuedGenStep, TGenStep confirmedGenStep);
void PutKey(TKey key, TValue&& data);
@@ -320,6 +321,8 @@ namespace NKikimr::NBlobDepot {
void Handle(TEvBlobDepot::TEvPushNotifyResult::TPtr ev);
void OnCommitConfirmedGC(ui8 channel, ui32 groupId);
+ bool CanBeCollected(ui32 groupId, TBlobSeqId id) const;
+
static TString ToValueProto(const TValue& value);
template<typename TCallback>
diff --git a/ydb/core/blob_depot/garbage_collection.cpp b/ydb/core/blob_depot/garbage_collection.cpp
index 7e69dfa76f1..85a083f17ae 100644
--- a/ydb/core/blob_depot/garbage_collection.cpp
+++ b/ydb/core/blob_depot/garbage_collection.cpp
@@ -7,7 +7,10 @@ namespace NKikimr::NBlobDepot {
class TBlobDepot::TBarrierServer::TTxCollectGarbage : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
std::optional<TString> Error;
- std::unique_ptr<TEvBlobDepot::TEvCollectGarbage::THandle> Request;
+ TBarrier& Barrier;
+ std::unique_ptr<TEvBlobDepot::TEvCollectGarbage::THandle>& Request;
+ const ui64 TabletId;
+ const ui8 Channel;
int KeepIndex = 0;
int DoNotKeepIndex = 0;
ui32 NumKeysProcessed = 0;
@@ -16,10 +19,12 @@ namespace NKikimr::NBlobDepot {
static constexpr ui32 MaxKeysToProcessAtOnce = 10'000;
public:
- TTxCollectGarbage(TBlobDepot *self, std::unique_ptr<TEvBlobDepot::TEvCollectGarbage::THandle> request,
- ui32 keepIndex = 0, ui32 doNotKeepIndex = 0)
+ TTxCollectGarbage(TBlobDepot *self, ui64 tabletId, ui8 channel, ui32 keepIndex = 0, ui32 doNotKeepIndex = 0)
: TTransactionBase(self)
- , Request(std::move(request))
+ , Barrier(Self->BarrierServer->Barriers[std::make_pair(tabletId, channel)])
+ , Request(Barrier.ProcessingQ.front())
+ , TabletId(tabletId)
+ , Channel(channel)
, KeepIndex(keepIndex)
, DoNotKeepIndex(doNotKeepIndex)
{}
@@ -42,9 +47,13 @@ namespace NKikimr::NBlobDepot {
auto [response, _] = TEvBlobDepot::MakeResponseFor(*Request, Self->SelfId(),
Error ? NKikimrProto::ERROR : NKikimrProto::OK, std::move(Error));
TActivationContext::Send(response.release());
+ Barrier.ProcessingQ.pop_front();
Self->Data->HandleTrash();
+ if (!Barrier.ProcessingQ.empty()) {
+ Self->Execute(std::make_unique<TTxCollectGarbage>(Self, TabletId, Channel));
+ }
} else {
- Self->Execute(std::make_unique<TTxCollectGarbage>(Self, std::move(Request), KeepIndex, DoNotKeepIndex));
+ Self->Execute(std::make_unique<TTxCollectGarbage>(Self, TabletId, Channel, KeepIndex, DoNotKeepIndex));
}
}
@@ -60,17 +69,23 @@ namespace NKikimr::NBlobDepot {
const auto key = std::make_pair(record.GetTabletId(), record.GetChannel());
auto& barriers = Self->BarrierServer->Barriers;
if (const auto it = barriers.find(key); it != barriers.end()) {
+ // extract existing barrier record
auto& barrier = it->second;
- const ui64 recordGenStep = GenStep(record.GetGeneration(), record.GetPerGenerationCounter());
- const ui64 collectGenStep = GenStep(record.GetCollectGeneration(), record.GetCollectStep());
- ui64& currentGenStep = record.GetHard() ? barrier.Hard : barrier.Soft;
- if (recordGenStep < barrier.LastRecordGenStep) {
+ const auto barrierGenCounter = std::make_tuple(barrier.RecordGeneration, barrier.PerGenerationCounter);
+ const TGenStep barrierGenStep = record.GetHard() ? barrier.Hard : barrier.Soft;
+
+ // extract new parameters from protobuf
+ const auto genCounter = std::make_tuple(record.GetGeneration(), record.GetPerGenerationCounter());
+ const TGenStep collectGenStep(record.GetCollectGeneration(), record.GetCollectStep());
+
+ // validate them
+ if (genCounter < barrierGenCounter) {
Error = "record generation:counter is obsolete";
- } else if (recordGenStep == barrier.LastRecordGenStep) {
- if (currentGenStep != collectGenStep) {
+ } else if (genCounter == barrierGenCounter) {
+ if (barrierGenStep != collectGenStep) {
Error = "repeated command with different collect parameters received";
}
- } else if (collectGenStep < currentGenStep) {
+ } else if (collectGenStep < barrierGenStep) {
Error = "decreasing barrier";
}
}
@@ -133,18 +148,21 @@ namespace NKikimr::NBlobDepot {
const auto key = std::make_pair(record.GetTabletId(), record.GetChannel());
auto& barriers = Self->BarrierServer->Barriers;
auto& barrier = barriers[key];
- const ui64 recordGenStep = GenStep(record.GetGeneration(), record.GetPerGenerationCounter());
- const ui64 collectGenStep = GenStep(record.GetCollectGeneration(), record.GetCollectStep());
- ui64& currentGenStep = record.GetHard() ? barrier.Hard : barrier.Soft;
- Y_VERIFY(barrier.LastRecordGenStep <= recordGenStep);
- barrier.LastRecordGenStep = recordGenStep;
- Y_VERIFY(currentGenStep <= collectGenStep);
- currentGenStep = collectGenStep;
+ auto barrierGenCounter = std::tie(barrier.RecordGeneration, barrier.PerGenerationCounter);
+ TGenStep& barrierGenStep = record.GetHard() ? barrier.Hard : barrier.Soft;
+
+ const auto genCounter = std::make_tuple(record.GetGeneration(), record.GetPerGenerationCounter());
+ const TGenStep collectGenStep(record.GetCollectGeneration(), record.GetCollectStep());
+ Y_VERIFY(barrierGenCounter <= genCounter);
+ barrierGenCounter = genCounter;
+ Y_VERIFY(barrierGenStep <= collectGenStep);
+ barrierGenStep = collectGenStep;
db.Table<Schema::Barriers>().Key(record.GetTabletId(), record.GetChannel()).Update(
- NIceDb::TUpdate<Schema::Barriers::LastRecordGenStep>(recordGenStep),
- NIceDb::TUpdate<Schema::Barriers::Soft>(barrier.Soft),
- NIceDb::TUpdate<Schema::Barriers::Hard>(barrier.Hard)
+ NIceDb::TUpdate<Schema::Barriers::RecordGeneration>(std::get<0>(genCounter)),
+ NIceDb::TUpdate<Schema::Barriers::PerGenerationCounter>(std::get<1>(genCounter)),
+ NIceDb::TUpdate<Schema::Barriers::Soft>(ui64(barrier.Soft)),
+ NIceDb::TUpdate<Schema::Barriers::Hard>(ui64(barrier.Hard))
);
}
@@ -152,30 +170,34 @@ namespace NKikimr::NBlobDepot {
}
};
- void TBlobDepot::TBarrierServer::AddBarrierOnLoad(ui64 tabletId, ui8 channel, ui64 lastRecordGenStep,
- ui64 soft, ui64 hard) {
+ void TBlobDepot::TBarrierServer::AddBarrierOnLoad(ui64 tabletId, ui8 channel, ui32 recordGeneration, ui32 perGenerationCounter,
+ TGenStep soft, TGenStep hard) {
Barriers[std::make_pair(tabletId, channel)] = {
- .LastRecordGenStep = lastRecordGenStep,
+ .RecordGeneration = recordGeneration,
+ .PerGenerationCounter = perGenerationCounter,
.Soft = soft,
.Hard = hard,
};
}
void TBlobDepot::TBarrierServer::Handle(TEvBlobDepot::TEvCollectGarbage::TPtr ev) {
- Self->Execute(std::make_unique<TTxCollectGarbage>(Self,
- std::unique_ptr<TEvBlobDepot::TEvCollectGarbage::THandle>(ev.Release())));
+ const auto& record = ev->Get()->Record;
+ const auto key = std::make_pair(record.GetTabletId(), record.GetChannel());
+ auto& barrier = Barriers[key];
+ barrier.ProcessingQ.emplace_back(ev.Release());
+ if (barrier.ProcessingQ.size() == 1) {
+ Self->Execute(std::make_unique<TTxCollectGarbage>(Self, record.GetTabletId(), record.GetChannel()));
+ }
}
bool TBlobDepot::TBarrierServer::CheckBlobForBarrier(TLogoBlobID id) const {
- const auto key = std::make_pair(id.TabletID(), id.Channel());
- const auto it = Barriers.find(key);
- return it == Barriers.end() || GenStep(id) > Max(it->second.Soft, it->second.Hard);
+ const auto it = Barriers.find(std::make_pair(id.TabletID(), id.Channel()));
+ return it == Barriers.end() || TGenStep(id) > Max(it->second.Soft, it->second.Hard);
}
void TBlobDepot::TBarrierServer::GetBlobBarrierRelation(TLogoBlobID id, bool *underSoft, bool *underHard) const {
- const auto key = std::make_pair(id.TabletID(), id.Channel());
- const auto it = Barriers.find(key);
- const ui64 genStep = GenStep(id);
+ const auto it = Barriers.find(std::make_pair(id.TabletID(), id.Channel()));
+ const TGenStep genStep(id);
*underSoft = it == Barriers.end() ? false : genStep <= it->second.Soft;
*underHard = it == Barriers.end() ? false : genStep <= it->second.Hard;
}
diff --git a/ydb/core/blob_depot/garbage_collection.h b/ydb/core/blob_depot/garbage_collection.h
index c6c6a4ac2b8..3164828fda3 100644
--- a/ydb/core/blob_depot/garbage_collection.h
+++ b/ydb/core/blob_depot/garbage_collection.h
@@ -9,9 +9,11 @@ namespace NKikimr::NBlobDepot {
TBlobDepot* const Self;
struct TBarrier {
- ui64 LastRecordGenStep = 0;
- ui64 Soft = 0;
- ui64 Hard = 0;
+ ui32 RecordGeneration = 0;
+ ui32 PerGenerationCounter = 0;
+ TGenStep Soft;
+ TGenStep Hard;
+ std::deque<std::unique_ptr<TEvBlobDepot::TEvCollectGarbage::THandle>> ProcessingQ;
};
THashMap<std::pair<ui64, ui8>, TBarrier> Barriers;
@@ -24,7 +26,8 @@ namespace NKikimr::NBlobDepot {
: Self(self)
{}
- void AddBarrierOnLoad(ui64 tabletId, ui8 channel, ui64 lastRecordGenStep, ui64 soft, ui64 hard);
+ void AddBarrierOnLoad(ui64 tabletId, ui8 channel, ui32 recordGeneration, ui32 perGenerationCounter, TGenStep soft,
+ TGenStep hard);
void Handle(TEvBlobDepot::TEvCollectGarbage::TPtr ev);
bool CheckBlobForBarrier(TLogoBlobID id) const;
void GetBlobBarrierRelation(TLogoBlobID id, bool *underSoft, bool *underHard) const;
@@ -32,9 +35,7 @@ namespace NKikimr::NBlobDepot {
template<typename TCallback>
void Enumerate(TCallback&& callback) {
for (const auto& [key, value] : Barriers) {
- callback(key.first, key.second, static_cast<ui32>(value.LastRecordGenStep >> 32),
- static_cast<ui32>(value.LastRecordGenStep), static_cast<ui32>(value.Soft >> 32),
- static_cast<ui32>(value.Soft), static_cast<ui32>(value.Hard >> 32), static_cast<ui32>(value.Hard));
+ callback(key.first, key.second, value.RecordGeneration, value.PerGenerationCounter, value.Soft, value.Hard);
}
}
};
diff --git a/ydb/core/blob_depot/given_id_range.cpp b/ydb/core/blob_depot/given_id_range.cpp
index 91864653753..74cd9a4e8bd 100644
--- a/ydb/core/blob_depot/given_id_range.cpp
+++ b/ydb/core/blob_depot/given_id_range.cpp
@@ -4,8 +4,10 @@ namespace NKikimr::NBlobDepot {
void TGivenIdRange::IssueNewRange(ui64 begin, ui64 end) {
Y_VERIFY(begin < end);
+
const auto [it, inserted] = Ranges.emplace(begin, end);
Y_VERIFY(inserted);
+
if (it != Ranges.begin()) {
const auto& prev = *std::prev(it);
Y_VERIFY(prev.End <= begin);
@@ -14,6 +16,7 @@ namespace NKikimr::NBlobDepot {
const auto& next = *std::next(it);
Y_VERIFY(end <= next.Begin);
}
+
NumAvailableItems += end - begin;
}
@@ -22,16 +25,9 @@ namespace NKikimr::NBlobDepot {
}
void TGivenIdRange::RemovePoint(ui64 value) {
- const auto it = Ranges.upper_bound(value);
+ auto it = Ranges.upper_bound(value);
Y_VERIFY(it != Ranges.begin());
- auto& range = const_cast<TRange&>(*std::prev(it));
- Y_VERIFY(range.Begin <= value && value < range.End);
- Y_VERIFY(range.Bits[value - range.Begin]);
- range.Bits.Reset(value - range.Begin);
- if (range.Bits.Empty()) {
- Ranges.erase(it);
- }
- --NumAvailableItems;
+ Pop(std::prev(it), value);
}
bool TGivenIdRange::IsEmpty() const {
@@ -45,23 +41,15 @@ namespace NKikimr::NBlobDepot {
ui64 TGivenIdRange::GetMinimumValue() const {
Y_VERIFY(!Ranges.empty());
const auto& range = *Ranges.begin();
+ Y_VERIFY(range.NumSetBits);
size_t offset = range.Bits.FirstNonZeroBit();
Y_VERIFY(offset != range.Bits.Size());
return range.Begin + offset;
}
ui64 TGivenIdRange::Allocate() {
- Y_VERIFY(!Ranges.empty());
- const auto it = Ranges.begin();
- auto& range = const_cast<TRange&>(*it);
- size_t offset = range.Bits.FirstNonZeroBit();
- Y_VERIFY(offset != range.Bits.Size());
- range.Bits.Reset(offset);
- const ui64 value = range.Begin + offset;
- if (range.Bits.Empty()) {
- Ranges.erase(it);
- }
- --NumAvailableItems;
+ const ui64 value = GetMinimumValue();
+ Pop(Ranges.begin(), value);
return value;
}
@@ -77,6 +65,7 @@ namespace NKikimr::NBlobDepot {
} else if (range.Begin < validSince) {
const ui32 len = validSince - range.Begin;
for (ui32 i = 0; i < len; ++i) {
+ range.NumSetBits -= range.Bits[i];
NumAvailableItems -= range.Bits[i];
}
range.Bits.Reset(0, len);
@@ -91,8 +80,9 @@ namespace NKikimr::NBlobDepot {
const auto it = Ranges.find(range.Begin);
Y_VERIFY(it != Ranges.end());
Y_VERIFY(range.End == it->End);
+ Y_VERIFY(range.NumSetBits == it->NumSetBits);
Y_VERIFY(range.Bits == it->Bits);
- NumAvailableItems -= range.Bits.Count();
+ NumAvailableItems -= range.NumSetBits;
Ranges.erase(it);
}
}
@@ -103,11 +93,10 @@ namespace NKikimr::NBlobDepot {
if (it != Ranges.begin()) {
s << " ";
}
- s << it->Begin << "-" << it->End << "[";
+ s << "[" << it->Begin << "," << it->End << "):";
for (ui32 i = 0, count = it->End - it->Begin; i < count; ++i) {
s << int(it->Bits[i]);
}
- s << "]";
}
s << "}";
}
@@ -118,4 +107,17 @@ namespace NKikimr::NBlobDepot {
return s.Str();
}
+ void TGivenIdRange::Pop(TRanges::iterator it, ui64 value) {
+ TRange& range = const_cast<TRange&>(*it);
+ Y_VERIFY(range.Begin <= value && value < range.End);
+ const size_t offset = value - range.Begin;
+ Y_VERIFY(range.Bits[offset]);
+ range.Bits.Reset(offset);
+ --range.NumSetBits;
+ if (!range.NumSetBits) {
+ Ranges.erase(it);
+ }
+ --NumAvailableItems;
+ }
+
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/mon_main.cpp b/ydb/core/blob_depot/mon_main.cpp
index f32db510828..7f5544420e1 100644
--- a/ydb/core/blob_depot/mon_main.cpp
+++ b/ydb/core/blob_depot/mon_main.cpp
@@ -198,14 +198,14 @@ namespace NKikimr::NBlobDepot {
TABLEH() { Stream << "soft"; }
TABLEH() { Stream << "hard"; }
} else {
- Self->BarrierServer->Enumerate([&](ui64 tabletId, ui8 channel, ui32 recordGen, ui32 recordCounter,
- ui32 softGen, ui32 softStep, ui32 hardGen, ui32 hardStep) {
+ Self->BarrierServer->Enumerate([&](ui64 tabletId, ui8 channel, ui32 recordGen, ui32 perGenerationCounter,
+ TGenStep soft, TGenStep hard) {
TABLER() {
TABLED() { Stream << tabletId; }
TABLED() { Stream << int(channel); }
- TABLED() { Stream << recordGen << ":" << recordCounter; }
- TABLED() { Stream << softGen << ":" << softStep; }
- TABLED() { Stream << hardGen << ":" << hardStep; }
+ TABLED() { Stream << recordGen << ":" << perGenerationCounter; }
+ TABLED() { soft.Output(Stream); }
+ TABLED() { hard.Output(Stream); }
}
});
}
diff --git a/ydb/core/blob_depot/op_commit_blob_seq.cpp b/ydb/core/blob_depot/op_commit_blob_seq.cpp
index 1c65c6a19cc..bd85b9eda49 100644
--- a/ydb/core/blob_depot/op_commit_blob_seq.cpp
+++ b/ydb/core/blob_depot/op_commit_blob_seq.cpp
@@ -23,6 +23,7 @@ namespace NKikimr::NBlobDepot {
std::tie(Response, responseRecord) = TEvBlobDepot::MakeResponseFor(*Request, Self->SelfId());
TAgent& agent = Self->GetAgent(Request->Recipient);
+ const ui32 generation = Self->Executor()->Generation();
for (const auto& item : Request->Get()->Record.GetItems()) {
auto *responseItem = responseRecord->AddItems();
@@ -36,7 +37,20 @@ namespace NKikimr::NBlobDepot {
auto *locator = chain->MutableLocator();
locator->CopyFrom(item.GetBlobLocator());
- MarkGivenIdCommitted(agent, TBlobSeqId::FromProto(locator->GetBlobSeqId()));
+ const auto blobSeqId = TBlobSeqId::FromProto(locator->GetBlobSeqId());
+ const bool canBeCollected = Self->Data->CanBeCollected(locator->GetGroupId(), blobSeqId);
+
+ if (blobSeqId.Generation == generation) {
+ // check for internal sanity -- we can't issue barriers on given ids without confirmed trimming
+ Y_VERIFY(!canBeCollected);
+ } else if (canBeCollected) {
+ // we can't accept this record, because it is potentially under already issued barrier
+ responseItem->SetStatus(NKikimrProto::ERROR);
+ responseItem->SetErrorReason("generation race");
+ continue;
+ }
+
+ MarkGivenIdCommitted(agent, blobSeqId);
if (!CheckKeyAgainstBarrier(item.GetKey(), responseItem)) {
continue;
@@ -63,11 +77,13 @@ namespace NKikimr::NBlobDepot {
Y_VERIFY(blobSeqId.Generation == Self->Executor()->Generation());
Y_VERIFY(blobSeqId.Channel < Self->Channels.size());
- auto& channel = Self->Channels[blobSeqId.Channel];
-
const ui64 value = blobSeqId.ToSequentialNumber();
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT99, "MarkGivenIdCommitted", (TabletId, Self->TabletID()),
+ (AgentId, agent.ConnectedNodeId), (BlobSeqId, blobSeqId), (Value, value),
+ (GivenIdRanges, Self->Channels[blobSeqId.Channel].GivenIdRanges),
+ (Agent.GivenIdRanges, agent.GivenIdRanges[blobSeqId.Channel]));
agent.GivenIdRanges[blobSeqId.Channel].RemovePoint(value);
- channel.GivenIdRanges.RemovePoint(value);
+ Self->Channels[blobSeqId.Channel].GivenIdRanges.RemovePoint(value);
}
bool CheckKeyAgainstBarrier(const TString& key, NKikimrBlobDepot::TEvCommitBlobSeqResult::TItem *responseItem) {
diff --git a/ydb/core/blob_depot/op_load.cpp b/ydb/core/blob_depot/op_load.cpp
index c15641b18d3..b2efbaeac2a 100644
--- a/ydb/core/blob_depot/op_load.cpp
+++ b/ydb/core/blob_depot/op_load.cpp
@@ -65,9 +65,10 @@ namespace NKikimr::NBlobDepot {
Self->BarrierServer->AddBarrierOnLoad(
table.GetValue<Schema::Barriers::TabletId>(),
table.GetValue<Schema::Barriers::Channel>(),
- table.GetValue<Schema::Barriers::LastRecordGenStep>(),
- table.GetValue<Schema::Barriers::Soft>(),
- table.GetValue<Schema::Barriers::Hard>()
+ table.GetValue<Schema::Barriers::RecordGeneration>(),
+ table.GetValue<Schema::Barriers::PerGenerationCounter>(),
+ TGenStep(table.GetValue<Schema::Barriers::Soft>()),
+ TGenStep(table.GetValue<Schema::Barriers::Hard>())
);
if (!table.Next()) {
return false;
@@ -107,16 +108,18 @@ namespace NKikimr::NBlobDepot {
}
}
- // ConfirmedGC
+ // GC
{
- auto table = db.Table<Schema::ConfirmedGC>().Select();
+ using T = Schema::GC;
+ auto table = db.Table<T>().Select();
if (!table.IsReady()) {
return false;
}
while (table.IsValid()) {
- Self->Data->AddConfirmedGenStepOnLoad(table.GetValue<Schema::ConfirmedGC::Channel>(),
- table.GetValue<Schema::ConfirmedGC::GroupId>(),
- table.GetValue<Schema::ConfirmedGC::ConfirmedGenStep>());
+ Self->Data->AddGenStepOnLoad(table.GetValue<T::Channel>(),
+ table.GetValue<T::GroupId>(),
+ TGenStep(table.GetValueOrDefault<T::IssuedGenStep>()),
+ TGenStep(table.GetValueOrDefault<T::ConfirmedGenStep>()));
if (!table.Next()) {
return false;
}
@@ -132,7 +135,7 @@ namespace NKikimr::NBlobDepot {
auto barriers = db.Table<Schema::Barriers>().Select();
auto data = db.Table<Schema::Data>().Select();
auto trash = db.Table<Schema::Trash>().Select();
- auto confirmedGC = db.Table<Schema::ConfirmedGC>().Select();
+ auto confirmedGC = db.Table<Schema::GC>().Select();
return config.IsReady() && blocks.IsReady() && barriers.IsReady() && data.IsReady() && trash.IsReady() &&
confirmedGC.IsReady();
}
diff --git a/ydb/core/blob_depot/schema.h b/ydb/core/blob_depot/schema.h
index 057f91cad17..52a3f157503 100644
--- a/ydb/core/blob_depot/schema.h
+++ b/ydb/core/blob_depot/schema.h
@@ -40,15 +40,17 @@ namespace NKikimr::NBlobDepot {
struct Barriers : Table<3> {
struct TabletId : Column<1, NScheme::NTypeIds::Uint64> {};
struct Channel : Column<2, NScheme::NTypeIds::Uint8> {};
- struct LastRecordGenStep : Column<3, NScheme::NTypeIds::Uint64> {};
- struct Soft : Column<4, NScheme::NTypeIds::Uint64> {};
- struct Hard : Column<5, NScheme::NTypeIds::Uint64> {};
+ struct RecordGeneration : Column<3, NScheme::NTypeIds::Uint32> {};
+ struct PerGenerationCounter : Column<4, NScheme::NTypeIds::Uint32> {};
+ struct Soft : Column<5, NScheme::NTypeIds::Uint64> {};
+ struct Hard : Column<6, NScheme::NTypeIds::Uint64> {};
using TKey = TableKey<TabletId, Channel>;
using TColumns = TableColumns<
TabletId,
Channel,
- LastRecordGenStep,
+ RecordGeneration,
+ PerGenerationCounter,
Soft,
Hard
>;
@@ -75,13 +77,14 @@ namespace NKikimr::NBlobDepot {
using TColumns = TableColumns<BlobId>;
};
- struct ConfirmedGC : Table<6> {
+ struct GC : Table<6> {
struct Channel : Column<1, NScheme::NTypeIds::Uint8> {};
struct GroupId : Column<2, NScheme::NTypeIds::Uint32> {};
- struct ConfirmedGenStep : Column<3, NScheme::NTypeIds::Uint64> {};
+ struct IssuedGenStep : Column<3, NScheme::NTypeIds::Uint64> { static constexpr Type Default = 0; };
+ struct ConfirmedGenStep : Column<4, NScheme::NTypeIds::Uint64> { static constexpr Type Default = 0; };
using TKey = TableKey<Channel, GroupId>;
- using TColumns = TableColumns<Channel, GroupId, ConfirmedGenStep>;
+ using TColumns = TableColumns<Channel, GroupId, IssuedGenStep, ConfirmedGenStep>;
};
using TTables = SchemaTables<
@@ -90,7 +93,7 @@ namespace NKikimr::NBlobDepot {
Barriers,
Data,
Trash,
- ConfirmedGC
+ GC
>;
using TSettings = SchemaSettings<
diff --git a/ydb/core/blob_depot/types.h b/ydb/core/blob_depot/types.h
index 147c61a6f62..8cf0e3899ab 100644
--- a/ydb/core/blob_depot/types.h
+++ b/ydb/core/blob_depot/types.h
@@ -15,7 +15,6 @@ namespace NKikimr::NBlobDepot {
struct TVirtualGroupBlobFooter {
TLogoBlobID StoredBlobId;
};
-
#pragma pack(pop)
static constexpr ui32 MaxBlobSize = 10 << 20; // 10 MB BlobStorage hard limit
@@ -101,11 +100,13 @@ namespace NKikimr::NBlobDepot {
struct TRange {
const ui64 Begin;
const ui64 End;
+ ui32 NumSetBits = 0;
TDynBitMap Bits;
TRange(ui64 begin, ui64 end)
: Begin(begin)
, End(end)
+ , NumSetBits(end - begin)
{
Bits.Set(0, end - begin);
}
@@ -118,7 +119,8 @@ namespace NKikimr::NBlobDepot {
};
};
- std::set<TRange, TRange::TCompare> Ranges;
+ using TRanges = std::set<TRange, TRange::TCompare>; // FIXME: deque?
+ TRanges Ranges;
ui32 NumAvailableItems = 0;
public:
@@ -137,6 +139,9 @@ namespace NKikimr::NBlobDepot {
void Output(IOutputStream& s) const;
TString ToString() const;
+
+ private:
+ void Pop(TRanges::iterator it, ui64 value);
};
using TValueChain = NProtoBuf::RepeatedPtrField<NKikimrBlobDepot::TValueChain>;
@@ -156,16 +161,57 @@ namespace NKikimr::NBlobDepot {
}
}
- inline ui64 GenStep(ui32 gen, ui32 step) {
- return static_cast<ui64>(gen) << 32 | step;
- }
+ class TGenStep {
+ ui64 Value = 0;
- inline ui64 GenStep(TLogoBlobID id) {
- return GenStep(id.Generation(), id.Step());
- }
+ public:
+ TGenStep() = default;
+ TGenStep(const TGenStep&) = default;
- inline ui64 GenStep(TBlobSeqId id) {
- return GenStep(id.Generation, id.Step);
- }
+ explicit TGenStep(ui64 value)
+ : Value(value)
+ {}
+
+ TGenStep(ui32 gen, ui32 step)
+ : Value(ui64(gen) << 32 | step)
+ {}
+
+ explicit TGenStep(const TLogoBlobID& id)
+ : TGenStep(id.Generation(), id.Step())
+ {}
+
+ explicit TGenStep(const TBlobSeqId& id)
+ : TGenStep(id.Generation, id.Step)
+ {}
+
+ explicit operator ui64() const {
+ return Value;
+ }
+
+ ui32 Generation() const {
+ return Value >> 32;
+ }
+
+ ui32 Step() const {
+ return Value;
+ }
+
+ void Output(IOutputStream& s) const {
+ s << Generation() << ":" << Step();
+ }
+
+ TString ToString() const {
+ TStringStream s;
+ Output(s);
+ return s.Str();
+ }
+
+ friend bool operator ==(const TGenStep& x, const TGenStep& y) { return x.Value == y.Value; }
+ friend bool operator !=(const TGenStep& x, const TGenStep& y) { return x.Value != y.Value; }
+ friend bool operator < (const TGenStep& x, const TGenStep& y) { return x.Value < y.Value; }
+ friend bool operator <=(const TGenStep& x, const TGenStep& y) { return x.Value <= y.Value; }
+ friend bool operator > (const TGenStep& x, const TGenStep& y) { return x.Value > y.Value; }
+ friend bool operator >=(const TGenStep& x, const TGenStep& y) { return x.Value >= y.Value; }
+ };
} // NKikimr::NBlobDepot