aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2022-12-30 19:51:48 +0300
committeralexvru <alexvru@ydb.tech>2022-12-30 19:51:48 +0300
commite8ac1f63d70588f605a4b93b64e6c07931702531 (patch)
tree892e9a05bb552e496c0864b4061f4419b90d14df
parente91537d589eda3fbbaaa9c6bb035f8d9856cb5a1 (diff)
downloadydb-e8ac1f63d70588f605a4b93b64e6c07931702531.tar.gz
Fix BlobDepot group assimilation
-rw-r--r--ydb/core/blob_depot/agent.cpp143
-rw-r--r--ydb/core/blob_depot/assimilator.cpp144
-rw-r--r--ydb/core/blob_depot/assimilator.h8
-rw-r--r--ydb/core/blob_depot/blob_depot.cpp126
-rw-r--r--ydb/core/blob_depot/blob_depot_tablet.h22
-rw-r--r--ydb/core/blob_depot/data.cpp44
-rw-r--r--ydb/core/blob_depot/data.h17
-rw-r--r--ydb/core/blob_depot/data_resolve.cpp225
-rw-r--r--ydb/core/blob_depot/defs.h1
-rw-r--r--ydb/core/blob_depot/space_monitor.cpp1
-rw-r--r--ydb/core/blob_depot/types.h17
-rw-r--r--ydb/core/mind/bscontroller/impl.h2
-rw-r--r--ydb/core/mind/bscontroller/load_everything.cpp3
-rw-r--r--ydb/core/protos/counters_blob_depot.proto11
14 files changed, 439 insertions, 325 deletions
diff --git a/ydb/core/blob_depot/agent.cpp b/ydb/core/blob_depot/agent.cpp
index b1739b56d40..edc014ed8fc 100644
--- a/ydb/core/blob_depot/agent.cpp
+++ b/ydb/core/blob_depot/agent.cpp
@@ -33,14 +33,6 @@ namespace NKikimr::NBlobDepot {
}
void TBlobDepot::Handle(TEvBlobDepot::TEvRegisterAgent::TPtr ev) {
- if (!Configured || (Config.GetIsDecommittingGroup() && DecommitState < EDecommitState::BlocksFinished)) {
- const auto it = PipeServers.find(ev->Recipient);
- Y_VERIFY(it != PipeServers.end());
- it->second.PostponeFromAgent = true;
- it->second.PostponeQ.emplace_back(ev.Release());
- return;
- }
-
const ui32 nodeId = ev->Sender.NodeId();
const TActorId& pipeServerId = ev->Recipient;
const auto& req = ev->Get()->Record;
@@ -142,66 +134,35 @@ namespace NKikimr::NBlobDepot {
const ui32 generation = Executor()->Generation();
auto [response, record] = TEvBlobDepot::MakeResponseFor(*ev, ev->Get()->Record.GetChannelKind(), generation);
- if (const auto it = ChannelKinds.find(record->GetChannelKind()); it != ChannelKinds.end()) {
- auto& kind = it->second;
- auto *givenIdRange = record->MutableGivenIdRange();
-
- struct TGroupInfo {
- std::vector<TChannelInfo*> Channels;
- };
- std::unordered_map<ui32, TGroupInfo> groups;
-
- for (const auto& [channel, groupId] : kind.ChannelGroups) {
- Y_VERIFY_DEBUG(channel < Channels.size() && Channels[channel].ChannelKind == it->first);
- groups[groupId].Channels.push_back(&Channels[channel]);
- }
+ auto *givenIdRange = record->MutableGivenIdRange();
- std::vector<std::tuple<ui64, const TGroupInfo*>> options;
+ std::vector<ui8> channels(ev->Get()->Record.GetCount());
+ PickChannels(record->GetChannelKind(), channels);
- ui64 accum = 0;
- for (const auto& [groupId, group] : groups) {
- if (const ui64 w = SpaceMonitor->GetGroupAllocationWeight(groupId)) {
- accum += w;
- options.emplace_back(accum, &group);
- }
- }
+ THashMap<ui8, NKikimrBlobDepot::TGivenIdRange::TChannelRange*> issuedRanges;
+ for (ui8 channelIndex : channels) {
+ TChannelInfo& channel = Channels[channelIndex];
+ const ui64 value = channel.NextBlobSeqId++;
- if (accum) {
- THashMap<ui8, NKikimrBlobDepot::TGivenIdRange::TChannelRange*> issuedRanges;
- for (ui32 i = 0, count = ev->Get()->Record.GetCount(); i < count; ++i) {
- const ui64 selection = RandomNumber(accum);
- const auto it = std::upper_bound(options.begin(), options.end(), selection,
- [](ui64 x, const auto& y) { return x < std::get<0>(y); });
- const auto& [_, group] = *it;
-
- const size_t channelIndex = RandomNumber(group->Channels.size());
- TChannelInfo* const channel = group->Channels[channelIndex];
-
- const ui64 value = channel->NextBlobSeqId++;
-
- // fill in range item
- auto& range = issuedRanges[channel->Index];
- if (!range || range->GetEnd() != value) {
- range = givenIdRange->AddChannelRanges();
- range->SetChannel(channel->Index);
- range->SetBegin(value);
- }
- range->SetEnd(value + 1);
- }
- } else {
- Y_VERIFY_DEBUG(false); // TODO(alexvru): handle this situation somehow -- agent needs to retry this query?
+ // fill in range item
+ auto& range = issuedRanges[channelIndex];
+ if (!range || range->GetEnd() != value) {
+ range = givenIdRange->AddChannelRanges();
+ range->SetChannel(channelIndex);
+ range->SetBegin(value);
}
+ range->SetEnd(value + 1);
+ }
- // register issued ranges in agent and global records
- TAgent& agent = GetAgent(ev->Recipient);
- for (const auto& range : givenIdRange->GetChannelRanges()) {
- agent.GivenIdRanges[range.GetChannel()].IssueNewRange(range.GetBegin(), range.GetEnd());
- Channels[range.GetChannel()].GivenIdRanges.IssueNewRange(range.GetBegin(), range.GetEnd());
+ // register issued ranges in agent and global records
+ TAgent& agent = GetAgent(ev->Recipient);
+ for (const auto& range : givenIdRange->GetChannelRanges()) {
+ agent.GivenIdRanges[range.GetChannel()].IssueNewRange(range.GetBegin(), range.GetEnd());
+ Channels[range.GetChannel()].GivenIdRanges.IssueNewRange(range.GetBegin(), range.GetEnd());
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT05, "IssueNewRange", (Id, GetLogId()),
- (AgentId, agent.Connection->NodeId), (Channel, range.GetChannel()),
- (Begin, range.GetBegin()), (End, range.GetEnd()));
- }
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT05, "IssueNewRange", (Id, GetLogId()),
+ (AgentId, agent.Connection->NodeId), (Channel, range.GetChannel()),
+ (Begin, range.GetBegin()), (End, range.GetEnd()));
}
TActivationContext::Send(response.release());
@@ -246,46 +207,6 @@ namespace NKikimr::NBlobDepot {
agent.InvalidatedStepInFlight.clear();
}
- void TBlobDepot::InitChannelKinds() {
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT07, "InitChannelKinds", (Id, GetLogId()));
-
- TTabletStorageInfo *info = Info();
- const ui32 generation = Executor()->Generation();
-
- Y_VERIFY(Channels.empty());
-
- ui32 channel = 0;
- for (const auto& profile : Config.GetChannelProfiles()) {
- for (ui32 i = 0, count = profile.GetCount(); i < count; ++i, ++channel) {
- if (channel >= 2) {
- const auto kind = profile.GetChannelKind();
- auto& p = ChannelKinds[kind];
- p.ChannelToIndex[channel] = p.ChannelGroups.size();
- p.ChannelGroups.emplace_back(channel, info->GroupFor(channel, generation));
- Channels.push_back({
- ui8(channel),
- kind,
- &p,
- {},
- TBlobSeqId{channel, generation, 1, 0}.ToSequentialNumber(),
- {},
- {},
- });
- } else {
- Channels.push_back({
- ui8(channel),
- NKikimrBlobDepot::TChannelKind::System,
- nullptr,
- {},
- 0,
- {},
- {},
- });
- }
- }
- }
- }
-
void TBlobDepot::Handle(TEvBlobDepot::TEvPushNotifyResult::TPtr ev) {
TAgent& agent = GetAgent(ev->Recipient);
if (const auto it = agent.PushCallbacks.find(ev->Get()->Record.GetId()); it != agent.PushCallbacks.end()) {
@@ -296,19 +217,19 @@ namespace NKikimr::NBlobDepot {
}
void TBlobDepot::ProcessRegisterAgentQ() {
- if (!Configured || (Config.GetIsDecommittingGroup() && DecommitState < EDecommitState::BlocksFinished)) {
+ if (!ReadyForAgentQueries()) {
return;
}
for (auto& [pipeServerId, info] : PipeServers) {
- if (info.PostponeFromAgent) {
- info.PostponeFromAgent = false;
- for (auto& ev : std::exchange(info.PostponeQ, {})) {
- Y_VERIFY(ev->Cookie == info.NextExpectedMsgId);
- ++info.NextExpectedMsgId;
-
- TAutoPtr<IEventHandle> tmp(ev.release());
- HandleFromAgent(tmp);
+ if (info.ProcessThroughQueue) {
+ if (info.PostponeQ.empty()) {
+ info.ProcessThroughQueue = false;
+ } else {
+ for (auto& ev : std::exchange(info.PostponeQ, {})) {
+ TActivationContext::Send(ev.release());
+ }
+ TActivationContext::Send(new IEventHandle(TEvPrivate::EvProcessRegisterAgentQ, 0, SelfId(), {}, nullptr, 0));
}
}
}
diff --git a/ydb/core/blob_depot/assimilator.cpp b/ydb/core/blob_depot/assimilator.cpp
index 7af0565cf99..ef444d09d28 100644
--- a/ydb/core/blob_depot/assimilator.cpp
+++ b/ydb/core/blob_depot/assimilator.cpp
@@ -16,6 +16,51 @@ namespace NKikimr::NBlobDepot {
};
};
+ class TAssimilator::TTxCommitAssimilatedBlob : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
+ const TActorId AssimilatorId;
+ const NKikimrProto::EReplyStatus Status;
+ const TBlobSeqId BlobSeqId;
+ const TData::TKey Key;
+
+ public:
+ TTxType GetTxType() const override { return NKikimrBlobDepot::TXTYPE_COMMIT_ASSIMILATED_BLOB; }
+
+ TTxCommitAssimilatedBlob(TAssimilator *self, NKikimrProto::EReplyStatus status, TBlobSeqId blobSeqId, TData::TKey key)
+ : TTransactionBase(self->Self)
+ , AssimilatorId(self->SelfId())
+ , Status(status)
+ , BlobSeqId(blobSeqId)
+ , Key(std::move(key))
+ {}
+
+ bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ if (Status == NKikimrProto::OK) {
+ Y_VERIFY(!Self->Data->CanBeCollected(BlobSeqId));
+ Self->Data->BindToBlob(Key, BlobSeqId, txc, this);
+ } else if (Status == NKikimrProto::NODATA) {
+ if (const TData::TValue *value = Self->Data->FindKey(Key); value && value->ValueChain.empty()) {
+ Self->Data->DeleteKey(Key, txc, this);
+ }
+ }
+ return true;
+ }
+
+ void Complete(const TActorContext&) override {
+ if (BlobSeqId) {
+ TChannelInfo& channel = Self->Channels[BlobSeqId.Channel];
+ const ui32 generation = Self->Executor()->Generation();
+ const TBlobSeqId leastExpectedBlobIdBefore = channel.GetLeastExpectedBlobId(generation);
+ const size_t numErased = channel.AssimilatedBlobsInFlight.erase(BlobSeqId.ToSequentialNumber());
+ Y_VERIFY(numErased == 1);
+ if (leastExpectedBlobIdBefore != channel.GetLeastExpectedBlobId(generation)) {
+ Self->Data->OnLeastExpectedBlobIdChange(channel.Index); // allow garbage collection
+ }
+ }
+ Self->Data->CommitTrash(this);
+ TActivationContext::Send(new IEventHandle(TEvPrivate::EvTxComplete, 0, AssimilatorId, {}, nullptr, 0));
+ }
+ };
+
void TAssimilator::Bootstrap() {
if (Token.expired()) {
return PassAway();
@@ -42,7 +87,9 @@ namespace NKikimr::NBlobDepot {
}
void TAssimilator::PassAway() {
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT52, "TAssimilator::PassAway", (Id, Self->GetLogId()));
+ if (!Token.expired()) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT52, "TAssimilator::PassAway", (Id, Self->GetLogId()));
+ }
TActorBootstrapped::PassAway();
}
@@ -56,7 +103,6 @@ namespace NKikimr::NBlobDepot {
hFunc(TEvents::TEvUndelivered, Handle);
hFunc(TEvBlobStorage::TEvGetResult, Handle);
hFunc(TEvBlobStorage::TEvPutResult, Handle);
- hFunc(TEvBlobStorage::TEvCollectGarbageResult, Handle);
hFunc(TEvTabletPipe::TEvClientConnected, Handle);
hFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
hFunc(TEvBlobStorage::TEvControllerGroupDecommittedResponse, Handle);
@@ -222,16 +268,7 @@ namespace NKikimr::NBlobDepot {
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT54, "TAssimilator::ScanDataForCopying", (Id, Self->GetLogId()),
(LastScannedKey, LastScannedKey));
- TData::TKey lastScannedKey;
- if (LastScannedKey) {
- lastScannedKey = TData::TKey(*LastScannedKey);
- }
-
- struct TScanQueueItem {
- TLogoBlobID Key;
- TLogoBlobID OriginalBlobId;
- };
- std::deque<TScanQueueItem> scanQ;
+ std::deque<TLogoBlobID> scanQ;
ui32 totalSize = 0;
THPTimer timer;
ui32 numItems = 0;
@@ -243,13 +280,13 @@ namespace NKikimr::NBlobDepot {
return false;
}
}
- if (!value.OriginalBlobId) {
- LastScannedKey.emplace(key.GetBlobId());
+ const TLogoBlobID& id = key.GetBlobId();
+ if (!value.ValueChain.empty()) {
+ LastScannedKey.emplace(id);
return true; // keep scanning
- } else if (const TLogoBlobID& id = *value.OriginalBlobId; scanQ.empty() ||
- scanQ.front().OriginalBlobId.TabletID() == id.TabletID()) {
- LastScannedKey.emplace(key.GetBlobId());
- scanQ.push_back({.Key = *LastScannedKey, .OriginalBlobId = id});
+ } else if (scanQ.empty() || scanQ.front().TabletID() == id.TabletID()) {
+ LastScannedKey.emplace(id);
+ scanQ.push_back(id);
totalSize += id.BlobSize();
EntriesToProcess = true;
return totalSize < MaxSizeToQuery;
@@ -259,6 +296,7 @@ namespace NKikimr::NBlobDepot {
};
// FIXME: reentrable as it shares mailbox with the BlobDepot tablet itself
+ TData::TKey lastScannedKey = LastScannedKey ? TData::TKey(*LastScannedKey) : TData::TKey();
const bool finished = Self->Data->ScanRange(LastScannedKey ? &lastScannedKey : nullptr, nullptr, {}, callback);
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT56, "ScanDataForCopying done", (Id, Self->GetLogId()),
@@ -270,8 +308,8 @@ namespace NKikimr::NBlobDepot {
const ui32 sz = scanQ.size();
TArrayHolder<TQuery> queries(new TQuery[sz]);
TQuery *query = queries.Get();
- for (const TScanQueueItem& item : scanQ) {
- query->Set(item.OriginalBlobId);
+ for (const TLogoBlobID& id : scanQ) {
+ query->Set(id);
++query;
}
auto ev = std::make_unique<TEvBlobStorage::TEvGet>(queries, sz, TInstant::Max(), NKikimrBlobStorage::EGetHandleClass::FastRead);
@@ -289,46 +327,28 @@ namespace NKikimr::NBlobDepot {
}
void TAssimilator::Handle(TEvBlobStorage::TEvGetResult::TPtr ev) {
- class TTxDropBlobIfNoData : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
- const TLogoBlobID Id;
- const TActorId AssimilatorId;
-
- public:
- TTxType GetTxType() const override { return NKikimrBlobDepot::TXTYPE_DROP_BLOB_IF_NODATA; }
-
- TTxDropBlobIfNoData(TBlobDepot *self, TLogoBlobID id, TActorId assimilatorId)
- : TTransactionBase(self)
- , Id(id)
- , AssimilatorId(assimilatorId)
- {}
-
- bool Execute(TTransactionContext& txc, const TActorContext&) override {
- const TData::TKey key(Id);
- if (const TData::TValue *v = Self->Data->FindKey(key); v && v->OriginalBlobId &&
- v->KeepState != NKikimrBlobDepot::EKeepState::Keep) {
- Self->Data->DeleteKey(key, txc, this);
- }
- return true;
- }
-
- void Complete(const TActorContext&) override {
- Self->Data->CommitTrash(this);
- TActivationContext::Send(new IEventHandle(TEvPrivate::EvTxComplete, 0, AssimilatorId, {}, nullptr, 0));
- }
- };
-
auto& msg = *ev->Get();
for (ui32 i = 0; i < msg.ResponseSz; ++i) {
auto& resp = msg.Responses[i];
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT34, "got TEvGetResult", (Id, Self->GetLogId()), (BlobId, resp.Id),
(Status, resp.Status), (NumPutsInFlight, NumPutsInFlight));
if (resp.Status == NKikimrProto::OK) {
- auto ev = std::make_unique<TEvBlobStorage::TEvPut>(resp.Id, resp.Buffer, TInstant::Max());
- ev->Decommission = true;
- SendToBSProxy(SelfId(), Self->Config.GetVirtualGroupId(), ev.release());
+ std::vector<ui8> channels(1);
+ Self->PickChannels(NKikimrBlobDepot::TChannelKind::Data, channels);
+ TChannelInfo& channel = Self->Channels[channels.front()];
+ const ui64 value = channel.NextBlobSeqId++;
+ const auto blobSeqId = TBlobSeqId::FromSequentalNumber(channel.Index, Self->Executor()->Generation(), value);
+ const TLogoBlobID id = blobSeqId.MakeBlobId(Self->TabletID(), EBlobType::VG_DATA_BLOB, 0, resp.Id.BlobSize());
+ const ui64 putId = NextPutId++;
+ SendToBSProxy(SelfId(), channel.GroupId, new TEvBlobStorage::TEvPut(id, resp.Buffer, TInstant::Max()), putId);
+ const bool inserted = channel.AssimilatedBlobsInFlight.insert(value).second; // prevent from barrier advancing
+ Y_VERIFY(inserted);
+ const bool inserted1 = PutIdToKey.emplace(putId, TData::TKey(resp.Id)).second;
+ Y_VERIFY(inserted1);
++NumPutsInFlight;
} else if (resp.Status == NKikimrProto::NODATA) {
- Self->Execute(std::make_unique<TTxDropBlobIfNoData>(Self, resp.Id, SelfId()));
+ Self->Execute(std::make_unique<TTxCommitAssimilatedBlob>(this, NKikimrProto::NODATA, TBlobSeqId(),
+ TData::TKey(resp.Id)));
++NumPutsInFlight;
}
}
@@ -345,20 +365,13 @@ namespace NKikimr::NBlobDepot {
void TAssimilator::Handle(TEvBlobStorage::TEvPutResult::TPtr ev) {
auto& msg = *ev->Get();
+ const auto it = PutIdToKey.find(ev->Cookie);
+ Y_VERIFY(it != PutIdToKey.end());
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT37, "got TEvPutResult", (Id, Self->GetLogId()), (Msg, msg),
- (NumPutsInFlight, NumPutsInFlight));
- if (!--NumPutsInFlight) {
- IssueCollects();
- }
- }
-
- void TAssimilator::IssueCollects() {
- // FIXME: do it
- Action();
- }
-
- void TAssimilator::Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev) {
- (void)ev;
+ (NumPutsInFlight, NumPutsInFlight), (Key, it->second));
+ Self->Execute(std::make_unique<TTxCommitAssimilatedBlob>(this, msg.Status, TBlobSeqId::FromLogoBlobId(msg.Id),
+ std::move(it->second)));
+ PutIdToKey.erase(it);
}
void TAssimilator::OnCopyDone() {
@@ -472,6 +485,7 @@ namespace NKikimr::NBlobDepot {
void TBlobDepot::StartGroupAssimilator() {
if (Config.GetIsDecommittingGroup()) {
Y_VERIFY(!GroupAssimilatorId);
+ Y_VERIFY(Data->IsLoaded());
GroupAssimilatorId = RegisterWithSameMailbox(new TGroupAssimilator(this));
}
}
diff --git a/ydb/core/blob_depot/assimilator.h b/ydb/core/blob_depot/assimilator.h
index 9ef88beb3c7..0cde2fc5246 100644
--- a/ydb/core/blob_depot/assimilator.h
+++ b/ydb/core/blob_depot/assimilator.h
@@ -2,6 +2,7 @@
#include "defs.h"
#include "blob_depot_tablet.h"
+#include "data.h"
namespace NKikimr::NBlobDepot {
@@ -29,6 +30,11 @@ namespace NKikimr::NBlobDepot {
TActorId PipeId;
+ ui64 NextPutId = 1;
+ THashMap<ui64, TData::TKey> PutIdToKey;
+
+ class TTxCommitAssimilatedBlob;
+
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::BLOB_DEPOT_ASSIMILATOR_ACTOR;
@@ -54,8 +60,6 @@ namespace NKikimr::NBlobDepot {
void Handle(TEvBlobStorage::TEvGetResult::TPtr ev);
void HandleTxComplete();
void Handle(TEvBlobStorage::TEvPutResult::TPtr ev);
- void IssueCollects();
- void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev);
void OnCopyDone();
void CreatePipe();
void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev);
diff --git a/ydb/core/blob_depot/blob_depot.cpp b/ydb/core/blob_depot/blob_depot.cpp
index af812228820..8c5e1627a2c 100644
--- a/ydb/core/blob_depot/blob_depot.cpp
+++ b/ydb/core/blob_depot/blob_depot.cpp
@@ -49,21 +49,24 @@ namespace NKikimr::NBlobDepot {
auto handleFromAgentPipe = [this](auto& ev) {
const auto it = PipeServers.find(ev->Recipient);
Y_VERIFY(it != PipeServers.end());
+ auto& info = it->second;
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT69, "HandleFromAgentPipe", (Id, GetLogId()), (RequestId, ev->Cookie),
- (Postpone, it->second.PostponeFromAgent), (Sender, ev->Sender), (PipeServerId, ev->Recipient));
-
- if (it->second.PostponeFromAgent) {
- it->second.PostponeQ.emplace_back(ev.Release());
- return;
+ (Sender, ev->Sender), (PipeServerId, ev->Recipient), (ProcessThroughQueue, info.ProcessThroughQueue),
+ (NextExpectedMsgId, info.NextExpectedMsgId), (PostponeQ.size, info.PostponeQ.size()));
+
+ if (info.ProcessThroughQueue || !ReadyForAgentQueries()) {
+ info.PostponeQ.emplace_back(ev.Release());
+ info.ProcessThroughQueue = true;
+ } else {
+ // ensure correct ordering of incoming messages
+ Y_VERIFY_S(ev->Cookie == info.NextExpectedMsgId, "message reordering detected Cookie# " << ev->Cookie
+ << " NextExpectedMsgId# " << info.NextExpectedMsgId << " Type# " << Sprintf("%08" PRIx32,
+ ev->GetTypeRewrite()) << " Id# " << GetLogId());
+ ++info.NextExpectedMsgId;
+
+ HandleFromAgent(ev);
}
-
- Y_VERIFY_S(ev->Cookie == it->second.NextExpectedMsgId, "pipe reordering detected Cookie# " << ev->Cookie
- << " NextExpectedMsgId# " << it->second.NextExpectedMsgId << " Type# " << Sprintf("%08" PRIx32,
- ev->GetTypeRewrite()) << " Id# " << GetLogId());
-
- ++it->second.NextExpectedMsgId;
- HandleFromAgent(ev);
};
switch (const ui32 type = ev->GetTypeRewrite()) {
@@ -81,6 +84,8 @@ namespace NKikimr::NBlobDepot {
fFunc(TEvBlobDepot::EvPushNotifyResult, handleFromAgentPipe);
fFunc(TEvBlobDepot::EvCollectGarbage, handleFromAgentPipe);
+ cFunc(TEvPrivate::EvProcessRegisterAgentQ, ProcessRegisterAgentQ);
+
hFunc(TEvBlobStorage::TEvCollectGarbageResult, Data->Handle);
hFunc(TEvBlobStorage::TEvRangeResult, Data->Handle);
hFunc(TEvBlobStorage::TEvGetResult, Data->UncertaintyResolver->Handle);
@@ -116,6 +121,103 @@ namespace NKikimr::NBlobDepot {
TActor::PassAway();
}
+ void TBlobDepot::InitChannelKinds() {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT07, "InitChannelKinds", (Id, GetLogId()));
+
+ TTabletStorageInfo *info = Info();
+ const ui32 generation = Executor()->Generation();
+
+ Y_VERIFY(Channels.empty());
+
+ ui32 channel = 0;
+ for (const auto& profile : Config.GetChannelProfiles()) {
+ for (ui32 i = 0, count = profile.GetCount(); i < count; ++i, ++channel) {
+ const ui32 groupId = info->GroupFor(channel, generation);
+ if (channel >= 2) {
+ const auto kind = profile.GetChannelKind();
+ auto& p = ChannelKinds[kind];
+ p.ChannelToIndex[channel] = p.ChannelGroups.size();
+ p.ChannelGroups.emplace_back(channel, groupId);
+ Channels.push_back({
+ ui8(channel),
+ groupId,
+ kind,
+ &p,
+ {},
+ TBlobSeqId{channel, generation, 1, 0}.ToSequentialNumber(),
+ {},
+ {},
+ {},
+ });
+ Groups[groupId].Channels[kind].push_back(channel);
+ } else {
+ Channels.push_back({
+ ui8(channel),
+ groupId,
+ NKikimrBlobDepot::TChannelKind::System,
+ nullptr,
+ {},
+ 0,
+ {},
+ {},
+ {},
+ });
+ }
+ }
+ }
+ }
+
+ void TBlobDepot::InvalidateGroupForAllocation(ui32 groupId) {
+ const auto groupIt = Groups.find(groupId);
+ Y_VERIFY(groupIt != Groups.end());
+ const auto& group = groupIt->second;
+ for (const auto& [kind, channels] : group.Channels) {
+ const auto kindIt = ChannelKinds.find(kind);
+ Y_VERIFY(kindIt != ChannelKinds.end());
+ auto& kindv = kindIt->second;
+ kindv.GroupAccumWeights.clear(); // invalidate
+ }
+ }
+
+ void TBlobDepot::PickChannels(NKikimrBlobDepot::TChannelKind::E kind, std::vector<ui8>& channels) {
+ const auto kindIt = ChannelKinds.find(kind);
+ Y_VERIFY(kindIt != ChannelKinds.end());
+ auto& kindv = kindIt->second;
+
+ if (kindv.GroupAccumWeights.empty()) {
+ // recalculate group weights
+ ui64 accum = 0;
+ THashSet<ui32> seenGroups;
+ for (const auto& [channel, groupId] : kindv.ChannelGroups) {
+ if (const auto& [_, inserted] = seenGroups.insert(groupId); inserted) {
+ accum += SpaceMonitor->GetGroupAllocationWeight(groupId);
+ kindv.GroupAccumWeights.emplace_back(groupId, accum);
+ }
+ }
+ Y_VERIFY(!kindv.GroupAccumWeights.empty());
+ }
+
+ const auto [_, accum] = kindv.GroupAccumWeights.back();
+ for (ui8& channel : channels) {
+ const ui64 random = RandomNumber(accum);
+ const auto comp = [](ui64 x, const auto& y) { return x < std::get<1>(y); };
+ const auto it = std::upper_bound(kindv.GroupAccumWeights.begin(), kindv.GroupAccumWeights.end(), random, comp);
+ Y_VERIFY(it != kindv.GroupAccumWeights.end());
+ const auto [groupId, _] = *it;
+
+ const auto groupIt = Groups.find(groupId);
+ Y_VERIFY(groupIt != Groups.end());
+ auto& group = groupIt->second;
+
+ const auto channelsIt = group.Channels.find(kind);
+ Y_VERIFY(channelsIt != group.Channels.end());
+ const auto& channels = channelsIt->second;
+
+ const size_t channelIndex = RandomNumber(channels.size());
+ channel = channels[channelIndex];
+ }
+ }
+
IActor *CreateBlobDepot(const TActorId& tablet, TTabletStorageInfo *info) {
return new TBlobDepot(tablet, info);
}
diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h
index 98b62f4f1f8..e229220259d 100644
--- a/ydb/core/blob_depot/blob_depot_tablet.h
+++ b/ydb/core/blob_depot/blob_depot_tablet.h
@@ -25,6 +25,7 @@ namespace NKikimr::NBlobDepot {
EvCommitCertainKeys,
EvDoGroupMetricsExchange,
EvKickSpaceMonitor,
+ EvProcessRegisterAgentQ,
};
};
@@ -81,28 +82,35 @@ namespace NKikimr::NBlobDepot {
std::optional<ui32> NodeId; // as reported by RegisterAgent
ui64 NextExpectedMsgId = 1;
std::deque<std::unique_ptr<IEventHandle>> PostponeQ;
- bool PostponeFromAgent = false;
+ bool ProcessThroughQueue = false;
};
THashMap<TActorId, TPipeServerContext> PipeServers;
THashMap<ui32, TAgent> Agents; // NodeId -> Agent
+ struct TChannelKind : NBlobDepot::TChannelKind {
+ std::vector<std::tuple<ui32, ui64>> GroupAccumWeights; // last one is the total weight
+ };
+
THashMap<NKikimrBlobDepot::TChannelKind::E, TChannelKind> ChannelKinds;
struct TChannelInfo {
ui8 Index;
+ ui32 GroupId;
NKikimrBlobDepot::TChannelKind::E ChannelKind;
TChannelKind *KindPtr;
TGivenIdRange GivenIdRanges; // accumulated through all agents
ui64 NextBlobSeqId = 0;
std::set<ui64> SequenceNumbersInFlight; // of blobs being committed
+ std::set<ui64> AssimilatedBlobsInFlight;
std::optional<TBlobSeqId> LastReportedLeastId;
// Obtain the least BlobSeqId that is not yet committed, but may be written by any agent
TBlobSeqId GetLeastExpectedBlobId(ui32 generation) {
const auto result = TBlobSeqId::FromSequentalNumber(Index, generation, Min(NextBlobSeqId,
GivenIdRanges.IsEmpty() ? Max<ui64>() : GivenIdRanges.GetMinimumValue(),
- SequenceNumbersInFlight.empty() ? Max<ui64>() : *SequenceNumbersInFlight.begin()));
+ SequenceNumbersInFlight.empty() ? Max<ui64>() : *SequenceNumbersInFlight.begin(),
+ AssimilatedBlobsInFlight.empty() ? Max<ui64>() : *AssimilatedBlobsInFlight.begin()));
// this value can't decrease, because it may lead to data loss
Y_VERIFY_S(!LastReportedLeastId || *LastReportedLeastId <= result,
"decreasing LeastExpectedBlobId"
@@ -110,7 +118,8 @@ namespace NKikimr::NBlobDepot {
<< " result# " << result.ToString()
<< " NextBlobSeqId# " << NextBlobSeqId
<< " GivenIdRanges# " << GivenIdRanges.ToString()
- << " SequenceNumbersInFlight# " << FormatList(SequenceNumbersInFlight));
+ << " SequenceNumbersInFlight# " << FormatList(SequenceNumbersInFlight)
+ << " AssimilatedBlobsInFlight# " << FormatList(AssimilatedBlobsInFlight));
LastReportedLeastId.emplace(result);
return result;
}
@@ -118,6 +127,7 @@ namespace NKikimr::NBlobDepot {
std::vector<TChannelInfo> Channels;
struct TGroupInfo {
+ THashMap<NKikimrBlobDepot::TChannelKind::E, std::vector<ui8>> Channels;
ui64 AllocatedBytes = 0;
};
THashMap<ui32, TGroupInfo> Groups;
@@ -136,6 +146,10 @@ namespace NKikimr::NBlobDepot {
void ProcessRegisterAgentQ();
+ bool ReadyForAgentQueries() const {
+ return Configured && (!Config.GetIsDecommittingGroup() || DecommitState >= EDecommitState::BlocksFinished);
+ }
+
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void Enqueue(TAutoPtr<IEventHandle>& ev, const TActorContext&) override {
@@ -182,6 +196,8 @@ namespace NKikimr::NBlobDepot {
void PassAway() override;
void InitChannelKinds();
+ void InvalidateGroupForAllocation(ui32 groupId);
+ void PickChannels(NKikimrBlobDepot::TChannelKind::E kind, std::vector<ui8>& channels);
TString GetLogId() const {
const auto *executor = Executor();
diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp
index 3bc3899d79a..ae62f4d6bdd 100644
--- a/ydb/core/blob_depot/data.cpp
+++ b/ydb/core/blob_depot/data.cpp
@@ -194,6 +194,7 @@ namespace NKikimr::NBlobDepot {
void TData::UpdateKey(const TKey& key, const NKikimrBlobDepot::TEvCommitBlobSeq::TItem& item,
NTabletFlatExecutor::TTransactionContext& txc, void *cookie) {
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT10, "UpdateKey", (Id, Self->GetLogId()), (Key, key), (Item, item));
+ Y_VERIFY(Loaded || IsKeyLoaded(key));
UpdateKey(key, txc, cookie, "UpdateKey", [&](TValue& value, bool inserted) {
if (!inserted) { // update value items
value.Meta = item.GetMeta();
@@ -205,15 +206,30 @@ namespace NKikimr::NBlobDepot {
auto *chain = value.ValueChain.Add();
auto *locator = chain->MutableLocator();
locator->CopyFrom(item.GetBlobLocator());
-
- // reset original blob id, if any
- value.OriginalBlobId.reset();
}
return EUpdateOutcome::CHANGE;
}, item);
}
+ void TData::BindToBlob(const TKey& key, TBlobSeqId blobSeqId, NTabletFlatExecutor::TTransactionContext& txc, void *cookie) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT49, "BindToBlob", (Id, Self->GetLogId()), (Key, key), (BlobSeqId, blobSeqId));
+ Y_VERIFY(Loaded || IsKeyLoaded(key));
+ UpdateKey(key, txc, cookie, "BindToBlob", [&](TValue& value, bool inserted) {
+ if (!value.ValueChain.empty()) {
+ return EUpdateOutcome::NO_CHANGE;
+ } else {
+ auto *chain = value.ValueChain.Add();
+ auto *locator = chain->MutableLocator();
+ locator->SetGroupId(Self->Info()->GroupFor(blobSeqId.Channel, blobSeqId.Generation));
+ blobSeqId.ToProto(locator->MutableBlobSeqId());
+ locator->SetTotalDataLen(key.GetBlobId().BlobSize());
+ locator->SetFooterLen(0);
+ return inserted ? EUpdateOutcome::DROP : EUpdateOutcome::CHANGE;
+ }
+ });
+ }
+
void TData::MakeKeyCertain(const TKey& key) {
const auto it = Data.find(key);
Y_VERIFY(it != Data.end());
@@ -321,25 +337,23 @@ namespace NKikimr::NBlobDepot {
ValidateRecords();
}
- void TData::AddDataOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBlob& blob,
+ bool TData::AddDataOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBlob& blob,
NTabletFlatExecutor::TTransactionContext& txc, void *cookie) {
- UpdateKey(TKey(blob.Id), txc, cookie, "AddDataOnDecommit", [&](TValue& value, bool inserted) {
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT49, "AddDataOnDecommit", (Id, Self->GetLogId()), (Blob, blob),
- (Value, value), (Inserted, inserted));
+ Y_VERIFY(Loaded || IsKeyLoaded(TKey(blob.Id)));
+
+ return UpdateKey(TKey(blob.Id), txc, cookie, "AddDataOnDecommit", [&](TValue& value, bool inserted) {
+ bool change = inserted;
// update keep state if necessary
if (blob.DoNotKeep && value.KeepState < EKeepState::DoNotKeep) {
value.KeepState = EKeepState::DoNotKeep;
+ change = true;
} else if (blob.Keep && value.KeepState < EKeepState::Keep) {
value.KeepState = EKeepState::Keep;
+ change = true;
}
- // if there is not value chain for this blob, map it to the original blob id
- if (value.ValueChain.empty()) {
- value.OriginalBlobId = blob.Id;
- }
-
- return EUpdateOutcome::CHANGE;
+ return change ? EUpdateOutcome::CHANGE : EUpdateOutcome::NO_CHANGE;
});
}
@@ -359,8 +373,8 @@ namespace NKikimr::NBlobDepot {
record.LastConfirmedGenStep = confirmedGenStep;
}
- bool TData::UpdateKeepState(TKey key, EKeepState keepState,
- NTabletFlatExecutor::TTransactionContext& txc, void *cookie) {
+ bool TData::UpdateKeepState(TKey key, EKeepState keepState, NTabletFlatExecutor::TTransactionContext& txc, void *cookie) {
+ Y_VERIFY(Loaded || IsKeyLoaded(key));
return UpdateKey(std::move(key), txc, cookie, "UpdateKeepState", [&](TValue& value, bool inserted) {
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT51, "UpdateKeepState", (Id, Self->GetLogId()), (Key, key),
(KeepState, keepState), (Value, value));
diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h
index 1f00909bcda..024ae6c0136 100644
--- a/ydb/core/blob_depot/data.h
+++ b/ydb/core/blob_depot/data.h
@@ -223,7 +223,6 @@ namespace NKikimr::NBlobDepot {
TValueChain ValueChain;
EKeepState KeepState = EKeepState::Default;
bool Public = false;
- std::optional<TLogoBlobID> OriginalBlobId;
bool UncertainWrite = false;
TValue() = default;
@@ -238,9 +237,6 @@ namespace NKikimr::NBlobDepot {
, ValueChain(std::move(*proto.MutableValueChain()))
, KeepState(proto.GetKeepState())
, Public(proto.GetPublic())
- , OriginalBlobId(proto.HasOriginalBlobId()
- ? std::make_optional(LogoBlobIDFromLogoBlobID(proto.GetOriginalBlobId()))
- : std::nullopt)
, UncertainWrite(uncertainWrite)
{}
@@ -277,9 +273,6 @@ namespace NKikimr::NBlobDepot {
if (Public != proto->GetPublic()) {
proto->SetPublic(Public);
}
- if (OriginalBlobId) {
- LogoBlobIDFromLogoBlobID(*OriginalBlobId, proto->MutableOriginalBlobId());
- }
}
static bool Validate(const NKikimrBlobDepot::TEvCommitBlobSeq::TItem& item);
@@ -305,7 +298,6 @@ namespace NKikimr::NBlobDepot {
<< " ValueChain# " << FormatList(ValueChain)
<< " KeepState# " << EKeepState_Name(KeepState)
<< " Public# " << (Public ? "true" : "false")
- << " OriginalBlobId# " << (OriginalBlobId ? OriginalBlobId->ToString() : "<none>")
<< " UncertainWrite# " << (UncertainWrite ? "true" : "false")
<< "}";
}
@@ -363,8 +355,9 @@ namespace NKikimr::NBlobDepot {
struct TResolveDecommitContext {
TEvBlobDepot::TEvResolve::TPtr Ev; // original resolve request
- ui32 NumRangesInFlight = 0;
- bool Errors = false;
+ ui32 NumRangesInFlight;
+ std::deque<TEvBlobStorage::TEvAssimilateResult::TBlob> DecommitBlobs = {};
+ TIntervalMap<TLogoBlobID> Errors = {};
};
ui64 LastRangeId = 0;
THashMap<ui64, TResolveDecommitContext> ResolveDecommitContexts;
@@ -446,6 +439,8 @@ namespace NKikimr::NBlobDepot {
void UpdateKey(const TKey& key, const NKikimrBlobDepot::TEvCommitBlobSeq::TItem& item,
NTabletFlatExecutor::TTransactionContext& txc, void *cookie);
+ void BindToBlob(const TKey& key, TBlobSeqId blobSeqId, NTabletFlatExecutor::TTransactionContext& txc, void *cookie);
+
void MakeKeyCertain(const TKey& key);
void HandleCommitCertainKeys();
@@ -454,7 +449,7 @@ namespace NKikimr::NBlobDepot {
void AddLoadSkip(TKey key);
void AddDataOnLoad(TKey key, TString value, bool uncertainWrite, bool skip);
- void AddDataOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBlob& blob,
+ bool AddDataOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBlob& blob,
NTabletFlatExecutor::TTransactionContext& txc, void *cookie);
void AddTrashOnLoad(TLogoBlobID id);
void AddGenStepOnLoad(ui8 channel, ui32 groupId, TGenStep issuedGenStep, TGenStep confirmedGenStep);
diff --git a/ydb/core/blob_depot/data_resolve.cpp b/ydb/core/blob_depot/data_resolve.cpp
index bfffd2dba53..ed11f6ba471 100644
--- a/ydb/core/blob_depot/data_resolve.cpp
+++ b/ydb/core/blob_depot/data_resolve.cpp
@@ -7,6 +7,13 @@ namespace NKikimr::NBlobDepot {
using TData = TBlobDepot::TData;
+ namespace {
+ TLogoBlobID BlobIdUpperBound(TLogoBlobID id) {
+ return TLogoBlobID(id.TabletID(), id.Generation(), id.Step(), id.Channel(), TLogoBlobID::MaxBlobSize,
+ id.Cookie(), TLogoBlobID::MaxPartId, TLogoBlobID::MaxCrcMode);
+ }
+ }
+
TData::TResolveResultAccumulator::TResolveResultAccumulator(TEventHandle<TEvBlobDepot::TEvResolve>& ev)
: Sender(ev.Sender)
, Recipient(ev.Recipient)
@@ -83,6 +90,10 @@ namespace NKikimr::NBlobDepot {
class TData::TTxResolve : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
std::unique_ptr<TEvBlobDepot::TEvResolve::THandle> Request;
+ std::deque<TEvBlobStorage::TEvAssimilateResult::TBlob> DecommitBlobs;
+ TIntervalMap<TLogoBlobID> Errors;
+
+ bool KeysLoaded = false;
int ItemIndex = 0;
std::optional<TKey> LastScannedKey;
ui32 NumKeysRead = 0; // number of keys already read for this item
@@ -95,15 +106,22 @@ namespace NKikimr::NBlobDepot {
public:
TTxType GetTxType() const override { return NKikimrBlobDepot::TXTYPE_RESOLVE; }
- TTxResolve(TBlobDepot *self, TEvBlobDepot::TEvResolve::TPtr request)
+ TTxResolve(TBlobDepot *self, TEvBlobDepot::TEvResolve::TPtr request,
+ std::deque<TEvBlobStorage::TEvAssimilateResult::TBlob>&& decommitBlobs = {},
+ TIntervalMap<TLogoBlobID>&& errors = {})
: TTransactionBase(self)
, Request(request.Release())
+ , DecommitBlobs(std::move(decommitBlobs))
+ , Errors(std::move(errors))
, Result(*Request)
{}
TTxResolve(TTxResolve& predecessor)
: TTransactionBase(predecessor.Self)
, Request(std::move(predecessor.Request))
+ , DecommitBlobs(std::move(predecessor.DecommitBlobs))
+ , Errors(std::move(predecessor.Errors))
+ , KeysLoaded(predecessor.KeysLoaded)
, ItemIndex(predecessor.ItemIndex)
, LastScannedKey(std::move(predecessor.LastScannedKey))
, NumKeysRead(predecessor.NumKeysRead)
@@ -148,18 +166,71 @@ namespace NKikimr::NBlobDepot {
}
bool Execute(TTransactionContext& txc, const TActorContext&) override {
- NIceDb::TNiceDb db(txc.DB);
-
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT22, "TTxResolve::Execute", (Id, Self->GetLogId()),
(Sender, Request->Sender), (Cookie, Request->Cookie), (ItemIndex, ItemIndex),
- (LastScannedKey, LastScannedKey));
+ (LastScannedKey, LastScannedKey), (DecommitBlobs.size, DecommitBlobs.size()));
+
+ bool progress = false;
+ if (!KeysLoaded && !LoadKeys(txc, progress)) {
+ return progress;
+ } else {
+ KeysLoaded = true;
+ }
+
+ for (ui32 numItemsRemain = 10'000; !DecommitBlobs.empty(); DecommitBlobs.pop_front()) {
+ if (numItemsRemain) {
+ numItemsRemain -= Self->Data->AddDataOnDecommit(DecommitBlobs.front(), txc, this);
+ } else {
+ SuccessorTx = true;
+ return true;
+ }
+ }
+
+ const auto& record = Request->Get()->Record;
+ for (const auto& item : record.GetItems()) {
+ std::optional<ui64> cookie = item.HasCookie() ? std::make_optional(item.GetCookie()) : std::nullopt;
+
+ std::optional<TKey> begin;
+ std::optional<TKey> end;
+ TScanFlags flags;
+ ui64 maxKeys;
+ const bool success = GetScanParams(item, &begin, &end, &flags, &maxKeys);
+ Y_VERIFY_DEBUG(success);
+
+ // we have everything we need contained in memory, generate response from memory
+ auto callback = [&](const TKey& key, const TValue& value) {
+ IssueResponseItem(cookie, key, value);
+ return --maxKeys != 0;
+ };
+ Self->Data->ScanRange(begin ? &begin.value() : nullptr, end ? &end.value() : nullptr, flags, callback);
+ }
+
+ return true;
+ }
+
+ void Complete(const TActorContext&) override {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT30, "TTxResolve::Complete", (Id, Self->GetLogId()),
+ (Sender, Request->Sender), (Cookie, Request->Cookie), (SuccessorTx, SuccessorTx),
+ (Uncertainties.size, Uncertainties.size()));
+
+ Self->Data->CommitTrash(this);
+
+ if (SuccessorTx) {
+ Self->Execute(std::make_unique<TTxResolve>(*this));
+ } else if (Uncertainties.empty()) {
+ Result.Send(NKikimrProto::OK, std::nullopt);
+ } else {
+ Self->Data->UncertaintyResolver->PushResultWithUncertainties(std::move(Result), std::move(Uncertainties));
+ }
+ }
+
+ bool LoadKeys(TTransactionContext& txc, bool& progress) {
+ NIceDb::TNiceDb db(txc.DB);
- bool progress = false; // have we made some progress during scan?
const auto& record = Request->Get()->Record;
const auto& items = record.GetItems();
for (; ItemIndex < items.size(); ++ItemIndex, LastScannedKey.reset(), NumKeysRead = 0) {
const auto& item = items[ItemIndex];
- std::optional<ui64> cookie = item.HasCookie() ? std::make_optional(item.GetCookie()) : std::nullopt;
std::optional<TKey> begin;
std::optional<TKey> end;
@@ -180,12 +251,6 @@ namespace NKikimr::NBlobDepot {
}
if (Self->Data->Loaded || (end && Self->Data->LastLoadedKey && *end <= *Self->Data->LastLoadedKey)) {
- // we have everything we need contained in memory, generate response from memory
- auto callback = [&](const TKey& key, const TValue& value) {
- IssueResponseItem(cookie, key, value);
- return ++NumKeysRead != maxKeys;
- };
- Self->Data->ScanRange(begin ? &begin.value() : nullptr, end ? &end.value() : nullptr, flags, callback);
continue;
}
@@ -193,8 +258,7 @@ namespace NKikimr::NBlobDepot {
Y_VERIFY(!end || *Self->Data->LastLoadedKey < *end);
// special case -- forward scan and we have some data in memory
- auto callback = [&](const TKey& key, const TValue& value) {
- IssueResponseItem(cookie, key, value);
+ auto callback = [&](const TKey& key, const TValue& /*value*/) {
LastScannedKey = key;
return ++NumKeysRead != maxKeys;
};
@@ -206,7 +270,7 @@ namespace NKikimr::NBlobDepot {
flags &= ~EScanFlags::INCLUDE_BEGIN;
// check if we have read all the keys requested
- if (NumKeysRead == maxKeys) {
+ if (maxKeys && NumKeysRead == maxKeys) {
continue;
}
}
@@ -234,7 +298,6 @@ namespace NKikimr::NBlobDepot {
if (matchBegin && matchEnd) {
const TValue *value = Self->Data->FindKey(key);
Y_VERIFY(value); // value must exist as it was just loaded into memory and exists in the database
- IssueResponseItem(cookie, key, *value);
if (++NumKeysRead == maxKeys) {
// we have hit the MaxItems limit, exit
return true;
@@ -277,20 +340,6 @@ namespace NKikimr::NBlobDepot {
return true;
}
- void Complete(const TActorContext&) override {
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT30, "TTxResolve::Complete", (Id, Self->GetLogId()),
- (Sender, Request->Sender), (Cookie, Request->Cookie), (SuccessorTx, SuccessorTx),
- (Uncertainties.size, Uncertainties.size()));
-
- if (SuccessorTx) {
- Self->Execute(std::make_unique<TTxResolve>(*this));
- } else if (Uncertainties.empty()) {
- Result.Send(NKikimrProto::OK, std::nullopt);
- } else {
- Self->Data->UncertaintyResolver->PushResultWithUncertainties(std::move(Result), std::move(Uncertainties));
- }
- }
-
void IssueResponseItem(std::optional<ui64> cookie, const TKey& key, const TValue& value) {
NKikimrBlobDepot::TEvResolveResult::TResolvedKey item;
@@ -298,31 +347,40 @@ namespace NKikimr::NBlobDepot {
item.SetCookie(*cookie);
}
item.SetKey(key.MakeBinaryKey());
- EnumerateBlobsForValueChain(value.ValueChain, Self->TabletID(), [&](const TLogoBlobID& id, ui32 begin, ui32 end) {
- if (begin != end) {
- auto *out = item.AddValueChain();
- out->SetGroupId(Self->Info()->GroupFor(id.Channel(), id.Generation()));
- LogoBlobIDFromLogoBlobID(id, out->MutableBlobId());
- if (begin) {
- out->SetSubrangeBegin(begin);
- }
- if (end != id.BlobSize()) {
- out->SetSubrangeEnd(end);
- }
- }
- });
- if (value.OriginalBlobId) {
- Y_VERIFY(item.GetValueChain().empty());
+ if (value.ValueChain.empty() && Self->Config.GetIsDecommittingGroup()) {
+ Y_VERIFY(!value.UncertainWrite);
auto *out = item.AddValueChain();
out->SetGroupId(Self->Config.GetVirtualGroupId());
- LogoBlobIDFromLogoBlobID(*value.OriginalBlobId, out->MutableBlobId());
- Y_VERIFY(!value.UncertainWrite);
+ LogoBlobIDFromLogoBlobID(key.GetBlobId(), out->MutableBlobId());
+ } else {
+ EnumerateBlobsForValueChain(value.ValueChain, Self->TabletID(), [&](const TLogoBlobID& id, ui32 begin, ui32 end) {
+ if (begin != end) {
+ auto *out = item.AddValueChain();
+ out->SetGroupId(Self->Info()->GroupFor(id.Channel(), id.Generation()));
+ LogoBlobIDFromLogoBlobID(id, out->MutableBlobId());
+ if (begin) {
+ out->SetSubrangeBegin(begin);
+ }
+ if (end != id.BlobSize()) {
+ out->SetSubrangeEnd(end);
+ }
+ }
+ });
}
if (value.Meta) {
item.SetMeta(value.Meta.data(), value.Meta.size());
}
- if (!item.ValueChainSize()) {
+ bool foundError = false;
+
+ if (Errors) {
+ const TLogoBlobID id = key.GetBlobId();
+ foundError = TIntervalSet<TLogoBlobID>(id, BlobIdUpperBound(id)).IsSubsetOf(Errors);
+ }
+
+ if (foundError) {
+ item.SetErrorReason("item resolution error");
+ } else if (!item.ValueChainSize()) {
STLOG(PRI_WARN, BLOB_DEPOT, BDT48, "empty ValueChain on Resolve", (Id, Self->GetLogId()),
(Key, key), (Value, value), (Item, item), (Sender, Request->Sender), (Cookie, Request->Cookie));
}
@@ -389,7 +447,7 @@ namespace NKikimr::NBlobDepot {
}
if (minId == maxId) {
const auto it = Data.find(TKey(minId));
- if (it != Data.end() && (!it->second.ValueChain.empty() || it->second.OriginalBlobId)) {
+ if (it != Data.end() && !it->second.ValueChain.empty()) {
continue; // fast path for extreme queries
}
}
@@ -404,12 +462,8 @@ namespace NKikimr::NBlobDepot {
TInstant::Max(), true);
ev->Decommission = true;
- const auto& tabletId_ = tabletId;
- const auto& minId_ = minId;
- const auto& maxId_ = maxId;
- const auto& mustRestoreFirst_ = mustRestoreFirst;
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT46, "going to TEvRange", (Id, Self->GetLogId()), (TabletId, tabletId_),
- (MinId, minId_), (MaxId, maxId_), (MustRestoreFirst, mustRestoreFirst_));
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT46, "going to TEvRange", (Id, Self->GetLogId()), (TabletId, tabletId),
+ (MinId, minId), (MaxId, maxId), (MustRestoreFirst, mustRestoreFirst), (Cookie, id));
SendToBSProxy(Self->SelfId(), Self->Config.GetVirtualGroupId(), ev.release(), id);
}
ResolveDecommitContexts[id] = {ev, (ui32)queries.size()};
@@ -421,54 +475,27 @@ namespace NKikimr::NBlobDepot {
}
void TData::Handle(TEvBlobStorage::TEvRangeResult::TPtr ev) {
- class TTxCommitRange : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
- TEvBlobStorage::TEvRangeResult::TPtr Ev;
-
- public:
- TTxType GetTxType() const override { return NKikimrBlobDepot::TXTYPE_COMMIT_RANGE; }
-
- TTxCommitRange(TBlobDepot *self, TEvBlobStorage::TEvRangeResult::TPtr ev)
- : TTransactionBase(self)
- , Ev(ev)
- {}
-
- bool Execute(TTransactionContext& txc, const TActorContext&) override {
- if (Ev->Get()->Status == NKikimrProto::OK) {
- for (const auto& response : Ev->Get()->Responses) {
- Self->Data->AddDataOnDecommit({
- .Id = response.Id,
- .Keep = response.Keep,
- .DoNotKeep = response.DoNotKeep
- }, txc, this);
- }
- }
- return true;
- }
+ auto& msg = *ev->Get();
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT50, "TEvRangeResult", (Id, Self->GetLogId()), (Msg, msg), (Cookie, ev->Cookie));
- void Complete(const TActorContext&) override {
- Self->Data->CommitTrash(this);
+ auto& contexts = Self->Data->ResolveDecommitContexts;
+ if (const auto it = contexts.find(ev->Cookie); it != contexts.end()) {
+ auto& context = it->second;
- auto& contexts = Self->Data->ResolveDecommitContexts;
- if (const auto it = contexts.find(Ev->Cookie); it != contexts.end()) {
- TResolveDecommitContext& context = it->second;
- if (Ev->Get()->Status != NKikimrProto::OK) {
- context.Errors = true;
- }
- if (!--context.NumRangesInFlight) {
- if (context.Errors) {
- auto [response, record] = TEvBlobDepot::MakeResponseFor(*context.Ev, NKikimrProto::ERROR,
- "errors in range queries");
- TActivationContext::Send(response.release());
- } else {
- Self->Execute(std::make_unique<TTxResolve>(Self, context.Ev));
- }
- }
- contexts.erase(it);
+ if (msg.Status == NKikimrProto::OK) {
+ for (const auto& response : msg.Responses) {
+ context.DecommitBlobs.push_back({response.Id, response.Keep, response.DoNotKeep});
}
+ } else {
+ context.Errors.Add(msg.From, BlobIdUpperBound(msg.To));
}
- };
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT50, "TEvRangeResult", (Id, Self->GetLogId()), (Msg, *ev->Get()));
- Self->Execute(std::make_unique<TTxCommitRange>(Self, ev));
+
+ if (!--context.NumRangesInFlight) {
+ Self->Execute(std::make_unique<TTxResolve>(Self, context.Ev, std::move(context.DecommitBlobs),
+ std::move(context.Errors)));
+ contexts.erase(it);
+ }
+ }
}
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/defs.h b/ydb/core/blob_depot/defs.h
index 6e201abe902..809281a0e85 100644
--- a/ydb/core/blob_depot/defs.h
+++ b/ydb/core/blob_depot/defs.h
@@ -14,6 +14,7 @@
#include <ydb/core/tablet_flat/flat_cxx_database.h>
#include <ydb/core/protos/blob_depot.pb.h>
#include <ydb/core/protos/counters_blob_depot.pb.h>
+#include <ydb/core/util/interval_set.h>
#include <ydb/core/util/format.h>
#include <ydb/core/util/stlog.h>
diff --git a/ydb/core/blob_depot/space_monitor.cpp b/ydb/core/blob_depot/space_monitor.cpp
index 27d77a4a7b0..2809d3a4c79 100644
--- a/ydb/core/blob_depot/space_monitor.cpp
+++ b/ydb/core/blob_depot/space_monitor.cpp
@@ -22,6 +22,7 @@ namespace NKikimr::NBlobDepot {
if (msg.Status == NKikimrProto::OK) {
group.StatusFlags = msg.StatusFlags;
group.ApproximateFreeSpaceShare = msg.ApproximateFreeSpaceShare;
+ Self->InvalidateGroupForAllocation(groupId);
}
}
diff --git a/ydb/core/blob_depot/types.h b/ydb/core/blob_depot/types.h
index 069de64e6a4..72714424d63 100644
--- a/ydb/core/blob_depot/types.h
+++ b/ydb/core/blob_depot/types.h
@@ -69,6 +69,15 @@ namespace NKikimr::NBlobDepot {
};
}
+ static TBlobSeqId FromLogoBlobId(TLogoBlobID id) {
+ return TBlobSeqId{
+ id.Channel(),
+ id.Generation(),
+ id.Step(),
+ IndexFromCookie(id.Cookie())
+ };
+ }
+
void ToProto(NKikimrBlobDepot::TBlobSeqId *proto) const {
proto->SetChannel(Channel);
proto->SetGeneration(Generation);
@@ -94,6 +103,14 @@ namespace NKikimr::NBlobDepot {
Y_FAIL();
}
+
+ static ui32 IndexFromCookie(ui32 cookie) {
+ static constexpr ui32 typeBits = 24 - IndexBits;
+ const auto type = static_cast<EBlobType>(cookie & ((1 << typeBits) - 1));
+ Y_VERIFY(type == EBlobType::VG_COMPOSITE_BLOB || type == EBlobType::VG_DATA_BLOB ||
+ type == EBlobType::VG_FOOTER_BLOB || type == EBlobType::VG_GC_BLOB);
+ return cookie >> typeBits;
+ }
};
class TGivenIdRange {
diff --git a/ydb/core/mind/bscontroller/impl.h b/ydb/core/mind/bscontroller/impl.h
index 8a9b182e959..2b671db14a2 100644
--- a/ydb/core/mind/bscontroller/impl.h
+++ b/ydb/core/mind/bscontroller/impl.h
@@ -1502,7 +1502,7 @@ private:
}
TVSlotInfo* FindVSlot(TVDiskID id) { // GroupGeneration may be zero
- if (TGroupInfo *group = FindGroup(id.GroupID)) {
+ if (TGroupInfo *group = FindGroup(id.GroupID); group && !group->VDisksInGroup.empty()) {
const ui32 index = group->Topology->GetOrderNumber(id);
const TVSlotInfo *slot = group->VDisksInGroup[index];
Y_VERIFY(slot->GetShortVDiskId() == TVDiskIdShort(id)); // sanity check
diff --git a/ydb/core/mind/bscontroller/load_everything.cpp b/ydb/core/mind/bscontroller/load_everything.cpp
index d370d27a66b..a6d75e057f7 100644
--- a/ydb/core/mind/bscontroller/load_everything.cpp
+++ b/ydb/core/mind/bscontroller/load_everything.cpp
@@ -200,6 +200,9 @@ public:
std::get<2>(geom));
group.DecommitStatus = groups.GetValueOrDefault<T::DecommitStatus>();
+ if (group.DecommitStatus == NKikimrBlobStorage::TGroupDecommitStatus::DONE) {
+ group.VDisksInGroup.clear();
+ }
#define OPTIONAL(NAME) \
if (groups.HaveValue<T::NAME>()) { \
diff --git a/ydb/core/protos/counters_blob_depot.proto b/ydb/core/protos/counters_blob_depot.proto
index 698768b083d..16225814a81 100644
--- a/ydb/core/protos/counters_blob_depot.proto
+++ b/ydb/core/protos/counters_blob_depot.proto
@@ -42,7 +42,7 @@ enum EPercentileCounters {
enum ETxTypes {
TXTYPE_PUT_ASSIMILATED_DATA = 0 [(NKikimr.TxTypeOpts) = {Name: "TTxPutAssimilatedData"}];
- TXTYPE_DROP_BLOB_IF_NODATA = 1 [(NKikimr.TxTypeOpts) = {Name: "TTxDropBlobIfNoData"}];
+ TXTYPE_COMMIT_ASSIMILATED_BLOB = 1 [(NKikimr.TxTypeOpts) = {Name: "TTxCommitAssimilatedBlob"}];
TXTYPE_FINISH_COPYING = 2 [(NKikimr.TxTypeOpts) = {Name: "TTxFinishCopying"}];
TXTYPE_FINISH_DECOMMISSION = 3 [(NKikimr.TxTypeOpts) = {Name: "TTxFinishDecommission"}];
TXTYPE_MON_DATA = 4 [(NKikimr.TxTypeOpts) = {Name: "TTxMonData"}];
@@ -53,9 +53,8 @@ enum ETxTypes {
TXTYPE_CONFIRM_GC = 9 [(NKikimr.TxTypeOpts) = {Name: "TTxConfirmGC"}];
TXTYPE_COMMIT_CERTAIN_KEYS = 10 [(NKikimr.TxTypeOpts) = {Name: "TTxCommitCertainKeys"}];
TXTYPE_RESOLVE = 11 [(NKikimr.TxTypeOpts) = {Name: "TTxResolve"}];
- TXTYPE_COMMIT_RANGE = 12 [(NKikimr.TxTypeOpts) = {Name: "TTxCommitRange"}];
- TXTYPE_DATA_LOAD = 13 [(NKikimr.TxTypeOpts) = {Name: "TTxDataLoad"}];
- TXTYPE_COLLECT_GARBAGE = 14 [(NKikimr.TxTypeOpts) = {Name: "TTxCollectGarbage"}];
- TXTYPE_COMMIT_BLOB_SEQ = 15 [(NKikimr.TxTypeOpts) = {Name: "TTxCommitBlobSeq"}];
- TXTYPE_UPDATE_BLOCK = 16 [(NKikimr.TxTypeOpts) = {Name: "TTxUpdateBlock"}];
+ TXTYPE_DATA_LOAD = 12 [(NKikimr.TxTypeOpts) = {Name: "TTxDataLoad"}];
+ TXTYPE_COLLECT_GARBAGE = 13 [(NKikimr.TxTypeOpts) = {Name: "TTxCollectGarbage"}];
+ TXTYPE_COMMIT_BLOB_SEQ = 14 [(NKikimr.TxTypeOpts) = {Name: "TTxCommitBlobSeq"}];
+ TXTYPE_UPDATE_BLOCK = 15 [(NKikimr.TxTypeOpts) = {Name: "TTxUpdateBlock"}];
}