diff options
author | alexvru <alexvru@ydb.tech> | 2022-12-30 19:51:48 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2022-12-30 19:51:48 +0300 |
commit | e8ac1f63d70588f605a4b93b64e6c07931702531 (patch) | |
tree | 892e9a05bb552e496c0864b4061f4419b90d14df | |
parent | e91537d589eda3fbbaaa9c6bb035f8d9856cb5a1 (diff) | |
download | ydb-e8ac1f63d70588f605a4b93b64e6c07931702531.tar.gz |
Fix BlobDepot group assimilation
-rw-r--r-- | ydb/core/blob_depot/agent.cpp | 143 | ||||
-rw-r--r-- | ydb/core/blob_depot/assimilator.cpp | 144 | ||||
-rw-r--r-- | ydb/core/blob_depot/assimilator.h | 8 | ||||
-rw-r--r-- | ydb/core/blob_depot/blob_depot.cpp | 126 | ||||
-rw-r--r-- | ydb/core/blob_depot/blob_depot_tablet.h | 22 | ||||
-rw-r--r-- | ydb/core/blob_depot/data.cpp | 44 | ||||
-rw-r--r-- | ydb/core/blob_depot/data.h | 17 | ||||
-rw-r--r-- | ydb/core/blob_depot/data_resolve.cpp | 225 | ||||
-rw-r--r-- | ydb/core/blob_depot/defs.h | 1 | ||||
-rw-r--r-- | ydb/core/blob_depot/space_monitor.cpp | 1 | ||||
-rw-r--r-- | ydb/core/blob_depot/types.h | 17 | ||||
-rw-r--r-- | ydb/core/mind/bscontroller/impl.h | 2 | ||||
-rw-r--r-- | ydb/core/mind/bscontroller/load_everything.cpp | 3 | ||||
-rw-r--r-- | ydb/core/protos/counters_blob_depot.proto | 11 |
14 files changed, 439 insertions, 325 deletions
diff --git a/ydb/core/blob_depot/agent.cpp b/ydb/core/blob_depot/agent.cpp index b1739b56d40..edc014ed8fc 100644 --- a/ydb/core/blob_depot/agent.cpp +++ b/ydb/core/blob_depot/agent.cpp @@ -33,14 +33,6 @@ namespace NKikimr::NBlobDepot { } void TBlobDepot::Handle(TEvBlobDepot::TEvRegisterAgent::TPtr ev) { - if (!Configured || (Config.GetIsDecommittingGroup() && DecommitState < EDecommitState::BlocksFinished)) { - const auto it = PipeServers.find(ev->Recipient); - Y_VERIFY(it != PipeServers.end()); - it->second.PostponeFromAgent = true; - it->second.PostponeQ.emplace_back(ev.Release()); - return; - } - const ui32 nodeId = ev->Sender.NodeId(); const TActorId& pipeServerId = ev->Recipient; const auto& req = ev->Get()->Record; @@ -142,66 +134,35 @@ namespace NKikimr::NBlobDepot { const ui32 generation = Executor()->Generation(); auto [response, record] = TEvBlobDepot::MakeResponseFor(*ev, ev->Get()->Record.GetChannelKind(), generation); - if (const auto it = ChannelKinds.find(record->GetChannelKind()); it != ChannelKinds.end()) { - auto& kind = it->second; - auto *givenIdRange = record->MutableGivenIdRange(); - - struct TGroupInfo { - std::vector<TChannelInfo*> Channels; - }; - std::unordered_map<ui32, TGroupInfo> groups; - - for (const auto& [channel, groupId] : kind.ChannelGroups) { - Y_VERIFY_DEBUG(channel < Channels.size() && Channels[channel].ChannelKind == it->first); - groups[groupId].Channels.push_back(&Channels[channel]); - } + auto *givenIdRange = record->MutableGivenIdRange(); - std::vector<std::tuple<ui64, const TGroupInfo*>> options; + std::vector<ui8> channels(ev->Get()->Record.GetCount()); + PickChannels(record->GetChannelKind(), channels); - ui64 accum = 0; - for (const auto& [groupId, group] : groups) { - if (const ui64 w = SpaceMonitor->GetGroupAllocationWeight(groupId)) { - accum += w; - options.emplace_back(accum, &group); - } - } + THashMap<ui8, NKikimrBlobDepot::TGivenIdRange::TChannelRange*> issuedRanges; + for (ui8 channelIndex : channels) { + TChannelInfo& channel = Channels[channelIndex]; + const ui64 value = channel.NextBlobSeqId++; - if (accum) { - THashMap<ui8, NKikimrBlobDepot::TGivenIdRange::TChannelRange*> issuedRanges; - for (ui32 i = 0, count = ev->Get()->Record.GetCount(); i < count; ++i) { - const ui64 selection = RandomNumber(accum); - const auto it = std::upper_bound(options.begin(), options.end(), selection, - [](ui64 x, const auto& y) { return x < std::get<0>(y); }); - const auto& [_, group] = *it; - - const size_t channelIndex = RandomNumber(group->Channels.size()); - TChannelInfo* const channel = group->Channels[channelIndex]; - - const ui64 value = channel->NextBlobSeqId++; - - // fill in range item - auto& range = issuedRanges[channel->Index]; - if (!range || range->GetEnd() != value) { - range = givenIdRange->AddChannelRanges(); - range->SetChannel(channel->Index); - range->SetBegin(value); - } - range->SetEnd(value + 1); - } - } else { - Y_VERIFY_DEBUG(false); // TODO(alexvru): handle this situation somehow -- agent needs to retry this query? + // fill in range item + auto& range = issuedRanges[channelIndex]; + if (!range || range->GetEnd() != value) { + range = givenIdRange->AddChannelRanges(); + range->SetChannel(channelIndex); + range->SetBegin(value); } + range->SetEnd(value + 1); + } - // register issued ranges in agent and global records - TAgent& agent = GetAgent(ev->Recipient); - for (const auto& range : givenIdRange->GetChannelRanges()) { - agent.GivenIdRanges[range.GetChannel()].IssueNewRange(range.GetBegin(), range.GetEnd()); - Channels[range.GetChannel()].GivenIdRanges.IssueNewRange(range.GetBegin(), range.GetEnd()); + // register issued ranges in agent and global records + TAgent& agent = GetAgent(ev->Recipient); + for (const auto& range : givenIdRange->GetChannelRanges()) { + agent.GivenIdRanges[range.GetChannel()].IssueNewRange(range.GetBegin(), range.GetEnd()); + Channels[range.GetChannel()].GivenIdRanges.IssueNewRange(range.GetBegin(), range.GetEnd()); - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT05, "IssueNewRange", (Id, GetLogId()), - (AgentId, agent.Connection->NodeId), (Channel, range.GetChannel()), - (Begin, range.GetBegin()), (End, range.GetEnd())); - } + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT05, "IssueNewRange", (Id, GetLogId()), + (AgentId, agent.Connection->NodeId), (Channel, range.GetChannel()), + (Begin, range.GetBegin()), (End, range.GetEnd())); } TActivationContext::Send(response.release()); @@ -246,46 +207,6 @@ namespace NKikimr::NBlobDepot { agent.InvalidatedStepInFlight.clear(); } - void TBlobDepot::InitChannelKinds() { - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT07, "InitChannelKinds", (Id, GetLogId())); - - TTabletStorageInfo *info = Info(); - const ui32 generation = Executor()->Generation(); - - Y_VERIFY(Channels.empty()); - - ui32 channel = 0; - for (const auto& profile : Config.GetChannelProfiles()) { - for (ui32 i = 0, count = profile.GetCount(); i < count; ++i, ++channel) { - if (channel >= 2) { - const auto kind = profile.GetChannelKind(); - auto& p = ChannelKinds[kind]; - p.ChannelToIndex[channel] = p.ChannelGroups.size(); - p.ChannelGroups.emplace_back(channel, info->GroupFor(channel, generation)); - Channels.push_back({ - ui8(channel), - kind, - &p, - {}, - TBlobSeqId{channel, generation, 1, 0}.ToSequentialNumber(), - {}, - {}, - }); - } else { - Channels.push_back({ - ui8(channel), - NKikimrBlobDepot::TChannelKind::System, - nullptr, - {}, - 0, - {}, - {}, - }); - } - } - } - } - void TBlobDepot::Handle(TEvBlobDepot::TEvPushNotifyResult::TPtr ev) { TAgent& agent = GetAgent(ev->Recipient); if (const auto it = agent.PushCallbacks.find(ev->Get()->Record.GetId()); it != agent.PushCallbacks.end()) { @@ -296,19 +217,19 @@ namespace NKikimr::NBlobDepot { } void TBlobDepot::ProcessRegisterAgentQ() { - if (!Configured || (Config.GetIsDecommittingGroup() && DecommitState < EDecommitState::BlocksFinished)) { + if (!ReadyForAgentQueries()) { return; } for (auto& [pipeServerId, info] : PipeServers) { - if (info.PostponeFromAgent) { - info.PostponeFromAgent = false; - for (auto& ev : std::exchange(info.PostponeQ, {})) { - Y_VERIFY(ev->Cookie == info.NextExpectedMsgId); - ++info.NextExpectedMsgId; - - TAutoPtr<IEventHandle> tmp(ev.release()); - HandleFromAgent(tmp); + if (info.ProcessThroughQueue) { + if (info.PostponeQ.empty()) { + info.ProcessThroughQueue = false; + } else { + for (auto& ev : std::exchange(info.PostponeQ, {})) { + TActivationContext::Send(ev.release()); + } + TActivationContext::Send(new IEventHandle(TEvPrivate::EvProcessRegisterAgentQ, 0, SelfId(), {}, nullptr, 0)); } } } diff --git a/ydb/core/blob_depot/assimilator.cpp b/ydb/core/blob_depot/assimilator.cpp index 7af0565cf99..ef444d09d28 100644 --- a/ydb/core/blob_depot/assimilator.cpp +++ b/ydb/core/blob_depot/assimilator.cpp @@ -16,6 +16,51 @@ namespace NKikimr::NBlobDepot { }; }; + class TAssimilator::TTxCommitAssimilatedBlob : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { + const TActorId AssimilatorId; + const NKikimrProto::EReplyStatus Status; + const TBlobSeqId BlobSeqId; + const TData::TKey Key; + + public: + TTxType GetTxType() const override { return NKikimrBlobDepot::TXTYPE_COMMIT_ASSIMILATED_BLOB; } + + TTxCommitAssimilatedBlob(TAssimilator *self, NKikimrProto::EReplyStatus status, TBlobSeqId blobSeqId, TData::TKey key) + : TTransactionBase(self->Self) + , AssimilatorId(self->SelfId()) + , Status(status) + , BlobSeqId(blobSeqId) + , Key(std::move(key)) + {} + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + if (Status == NKikimrProto::OK) { + Y_VERIFY(!Self->Data->CanBeCollected(BlobSeqId)); + Self->Data->BindToBlob(Key, BlobSeqId, txc, this); + } else if (Status == NKikimrProto::NODATA) { + if (const TData::TValue *value = Self->Data->FindKey(Key); value && value->ValueChain.empty()) { + Self->Data->DeleteKey(Key, txc, this); + } + } + return true; + } + + void Complete(const TActorContext&) override { + if (BlobSeqId) { + TChannelInfo& channel = Self->Channels[BlobSeqId.Channel]; + const ui32 generation = Self->Executor()->Generation(); + const TBlobSeqId leastExpectedBlobIdBefore = channel.GetLeastExpectedBlobId(generation); + const size_t numErased = channel.AssimilatedBlobsInFlight.erase(BlobSeqId.ToSequentialNumber()); + Y_VERIFY(numErased == 1); + if (leastExpectedBlobIdBefore != channel.GetLeastExpectedBlobId(generation)) { + Self->Data->OnLeastExpectedBlobIdChange(channel.Index); // allow garbage collection + } + } + Self->Data->CommitTrash(this); + TActivationContext::Send(new IEventHandle(TEvPrivate::EvTxComplete, 0, AssimilatorId, {}, nullptr, 0)); + } + }; + void TAssimilator::Bootstrap() { if (Token.expired()) { return PassAway(); @@ -42,7 +87,9 @@ namespace NKikimr::NBlobDepot { } void TAssimilator::PassAway() { - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT52, "TAssimilator::PassAway", (Id, Self->GetLogId())); + if (!Token.expired()) { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT52, "TAssimilator::PassAway", (Id, Self->GetLogId())); + } TActorBootstrapped::PassAway(); } @@ -56,7 +103,6 @@ namespace NKikimr::NBlobDepot { hFunc(TEvents::TEvUndelivered, Handle); hFunc(TEvBlobStorage::TEvGetResult, Handle); hFunc(TEvBlobStorage::TEvPutResult, Handle); - hFunc(TEvBlobStorage::TEvCollectGarbageResult, Handle); hFunc(TEvTabletPipe::TEvClientConnected, Handle); hFunc(TEvTabletPipe::TEvClientDestroyed, Handle); hFunc(TEvBlobStorage::TEvControllerGroupDecommittedResponse, Handle); @@ -222,16 +268,7 @@ namespace NKikimr::NBlobDepot { STLOG(PRI_DEBUG, BLOB_DEPOT, BDT54, "TAssimilator::ScanDataForCopying", (Id, Self->GetLogId()), (LastScannedKey, LastScannedKey)); - TData::TKey lastScannedKey; - if (LastScannedKey) { - lastScannedKey = TData::TKey(*LastScannedKey); - } - - struct TScanQueueItem { - TLogoBlobID Key; - TLogoBlobID OriginalBlobId; - }; - std::deque<TScanQueueItem> scanQ; + std::deque<TLogoBlobID> scanQ; ui32 totalSize = 0; THPTimer timer; ui32 numItems = 0; @@ -243,13 +280,13 @@ namespace NKikimr::NBlobDepot { return false; } } - if (!value.OriginalBlobId) { - LastScannedKey.emplace(key.GetBlobId()); + const TLogoBlobID& id = key.GetBlobId(); + if (!value.ValueChain.empty()) { + LastScannedKey.emplace(id); return true; // keep scanning - } else if (const TLogoBlobID& id = *value.OriginalBlobId; scanQ.empty() || - scanQ.front().OriginalBlobId.TabletID() == id.TabletID()) { - LastScannedKey.emplace(key.GetBlobId()); - scanQ.push_back({.Key = *LastScannedKey, .OriginalBlobId = id}); + } else if (scanQ.empty() || scanQ.front().TabletID() == id.TabletID()) { + LastScannedKey.emplace(id); + scanQ.push_back(id); totalSize += id.BlobSize(); EntriesToProcess = true; return totalSize < MaxSizeToQuery; @@ -259,6 +296,7 @@ namespace NKikimr::NBlobDepot { }; // FIXME: reentrable as it shares mailbox with the BlobDepot tablet itself + TData::TKey lastScannedKey = LastScannedKey ? TData::TKey(*LastScannedKey) : TData::TKey(); const bool finished = Self->Data->ScanRange(LastScannedKey ? &lastScannedKey : nullptr, nullptr, {}, callback); STLOG(PRI_DEBUG, BLOB_DEPOT, BDT56, "ScanDataForCopying done", (Id, Self->GetLogId()), @@ -270,8 +308,8 @@ namespace NKikimr::NBlobDepot { const ui32 sz = scanQ.size(); TArrayHolder<TQuery> queries(new TQuery[sz]); TQuery *query = queries.Get(); - for (const TScanQueueItem& item : scanQ) { - query->Set(item.OriginalBlobId); + for (const TLogoBlobID& id : scanQ) { + query->Set(id); ++query; } auto ev = std::make_unique<TEvBlobStorage::TEvGet>(queries, sz, TInstant::Max(), NKikimrBlobStorage::EGetHandleClass::FastRead); @@ -289,46 +327,28 @@ namespace NKikimr::NBlobDepot { } void TAssimilator::Handle(TEvBlobStorage::TEvGetResult::TPtr ev) { - class TTxDropBlobIfNoData : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { - const TLogoBlobID Id; - const TActorId AssimilatorId; - - public: - TTxType GetTxType() const override { return NKikimrBlobDepot::TXTYPE_DROP_BLOB_IF_NODATA; } - - TTxDropBlobIfNoData(TBlobDepot *self, TLogoBlobID id, TActorId assimilatorId) - : TTransactionBase(self) - , Id(id) - , AssimilatorId(assimilatorId) - {} - - bool Execute(TTransactionContext& txc, const TActorContext&) override { - const TData::TKey key(Id); - if (const TData::TValue *v = Self->Data->FindKey(key); v && v->OriginalBlobId && - v->KeepState != NKikimrBlobDepot::EKeepState::Keep) { - Self->Data->DeleteKey(key, txc, this); - } - return true; - } - - void Complete(const TActorContext&) override { - Self->Data->CommitTrash(this); - TActivationContext::Send(new IEventHandle(TEvPrivate::EvTxComplete, 0, AssimilatorId, {}, nullptr, 0)); - } - }; - auto& msg = *ev->Get(); for (ui32 i = 0; i < msg.ResponseSz; ++i) { auto& resp = msg.Responses[i]; STLOG(PRI_DEBUG, BLOB_DEPOT, BDT34, "got TEvGetResult", (Id, Self->GetLogId()), (BlobId, resp.Id), (Status, resp.Status), (NumPutsInFlight, NumPutsInFlight)); if (resp.Status == NKikimrProto::OK) { - auto ev = std::make_unique<TEvBlobStorage::TEvPut>(resp.Id, resp.Buffer, TInstant::Max()); - ev->Decommission = true; - SendToBSProxy(SelfId(), Self->Config.GetVirtualGroupId(), ev.release()); + std::vector<ui8> channels(1); + Self->PickChannels(NKikimrBlobDepot::TChannelKind::Data, channels); + TChannelInfo& channel = Self->Channels[channels.front()]; + const ui64 value = channel.NextBlobSeqId++; + const auto blobSeqId = TBlobSeqId::FromSequentalNumber(channel.Index, Self->Executor()->Generation(), value); + const TLogoBlobID id = blobSeqId.MakeBlobId(Self->TabletID(), EBlobType::VG_DATA_BLOB, 0, resp.Id.BlobSize()); + const ui64 putId = NextPutId++; + SendToBSProxy(SelfId(), channel.GroupId, new TEvBlobStorage::TEvPut(id, resp.Buffer, TInstant::Max()), putId); + const bool inserted = channel.AssimilatedBlobsInFlight.insert(value).second; // prevent from barrier advancing + Y_VERIFY(inserted); + const bool inserted1 = PutIdToKey.emplace(putId, TData::TKey(resp.Id)).second; + Y_VERIFY(inserted1); ++NumPutsInFlight; } else if (resp.Status == NKikimrProto::NODATA) { - Self->Execute(std::make_unique<TTxDropBlobIfNoData>(Self, resp.Id, SelfId())); + Self->Execute(std::make_unique<TTxCommitAssimilatedBlob>(this, NKikimrProto::NODATA, TBlobSeqId(), + TData::TKey(resp.Id))); ++NumPutsInFlight; } } @@ -345,20 +365,13 @@ namespace NKikimr::NBlobDepot { void TAssimilator::Handle(TEvBlobStorage::TEvPutResult::TPtr ev) { auto& msg = *ev->Get(); + const auto it = PutIdToKey.find(ev->Cookie); + Y_VERIFY(it != PutIdToKey.end()); STLOG(PRI_DEBUG, BLOB_DEPOT, BDT37, "got TEvPutResult", (Id, Self->GetLogId()), (Msg, msg), - (NumPutsInFlight, NumPutsInFlight)); - if (!--NumPutsInFlight) { - IssueCollects(); - } - } - - void TAssimilator::IssueCollects() { - // FIXME: do it - Action(); - } - - void TAssimilator::Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev) { - (void)ev; + (NumPutsInFlight, NumPutsInFlight), (Key, it->second)); + Self->Execute(std::make_unique<TTxCommitAssimilatedBlob>(this, msg.Status, TBlobSeqId::FromLogoBlobId(msg.Id), + std::move(it->second))); + PutIdToKey.erase(it); } void TAssimilator::OnCopyDone() { @@ -472,6 +485,7 @@ namespace NKikimr::NBlobDepot { void TBlobDepot::StartGroupAssimilator() { if (Config.GetIsDecommittingGroup()) { Y_VERIFY(!GroupAssimilatorId); + Y_VERIFY(Data->IsLoaded()); GroupAssimilatorId = RegisterWithSameMailbox(new TGroupAssimilator(this)); } } diff --git a/ydb/core/blob_depot/assimilator.h b/ydb/core/blob_depot/assimilator.h index 9ef88beb3c7..0cde2fc5246 100644 --- a/ydb/core/blob_depot/assimilator.h +++ b/ydb/core/blob_depot/assimilator.h @@ -2,6 +2,7 @@ #include "defs.h" #include "blob_depot_tablet.h" +#include "data.h" namespace NKikimr::NBlobDepot { @@ -29,6 +30,11 @@ namespace NKikimr::NBlobDepot { TActorId PipeId; + ui64 NextPutId = 1; + THashMap<ui64, TData::TKey> PutIdToKey; + + class TTxCommitAssimilatedBlob; + public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::BLOB_DEPOT_ASSIMILATOR_ACTOR; @@ -54,8 +60,6 @@ namespace NKikimr::NBlobDepot { void Handle(TEvBlobStorage::TEvGetResult::TPtr ev); void HandleTxComplete(); void Handle(TEvBlobStorage::TEvPutResult::TPtr ev); - void IssueCollects(); - void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev); void OnCopyDone(); void CreatePipe(); void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev); diff --git a/ydb/core/blob_depot/blob_depot.cpp b/ydb/core/blob_depot/blob_depot.cpp index af812228820..8c5e1627a2c 100644 --- a/ydb/core/blob_depot/blob_depot.cpp +++ b/ydb/core/blob_depot/blob_depot.cpp @@ -49,21 +49,24 @@ namespace NKikimr::NBlobDepot { auto handleFromAgentPipe = [this](auto& ev) { const auto it = PipeServers.find(ev->Recipient); Y_VERIFY(it != PipeServers.end()); + auto& info = it->second; STLOG(PRI_DEBUG, BLOB_DEPOT, BDT69, "HandleFromAgentPipe", (Id, GetLogId()), (RequestId, ev->Cookie), - (Postpone, it->second.PostponeFromAgent), (Sender, ev->Sender), (PipeServerId, ev->Recipient)); - - if (it->second.PostponeFromAgent) { - it->second.PostponeQ.emplace_back(ev.Release()); - return; + (Sender, ev->Sender), (PipeServerId, ev->Recipient), (ProcessThroughQueue, info.ProcessThroughQueue), + (NextExpectedMsgId, info.NextExpectedMsgId), (PostponeQ.size, info.PostponeQ.size())); + + if (info.ProcessThroughQueue || !ReadyForAgentQueries()) { + info.PostponeQ.emplace_back(ev.Release()); + info.ProcessThroughQueue = true; + } else { + // ensure correct ordering of incoming messages + Y_VERIFY_S(ev->Cookie == info.NextExpectedMsgId, "message reordering detected Cookie# " << ev->Cookie + << " NextExpectedMsgId# " << info.NextExpectedMsgId << " Type# " << Sprintf("%08" PRIx32, + ev->GetTypeRewrite()) << " Id# " << GetLogId()); + ++info.NextExpectedMsgId; + + HandleFromAgent(ev); } - - Y_VERIFY_S(ev->Cookie == it->second.NextExpectedMsgId, "pipe reordering detected Cookie# " << ev->Cookie - << " NextExpectedMsgId# " << it->second.NextExpectedMsgId << " Type# " << Sprintf("%08" PRIx32, - ev->GetTypeRewrite()) << " Id# " << GetLogId()); - - ++it->second.NextExpectedMsgId; - HandleFromAgent(ev); }; switch (const ui32 type = ev->GetTypeRewrite()) { @@ -81,6 +84,8 @@ namespace NKikimr::NBlobDepot { fFunc(TEvBlobDepot::EvPushNotifyResult, handleFromAgentPipe); fFunc(TEvBlobDepot::EvCollectGarbage, handleFromAgentPipe); + cFunc(TEvPrivate::EvProcessRegisterAgentQ, ProcessRegisterAgentQ); + hFunc(TEvBlobStorage::TEvCollectGarbageResult, Data->Handle); hFunc(TEvBlobStorage::TEvRangeResult, Data->Handle); hFunc(TEvBlobStorage::TEvGetResult, Data->UncertaintyResolver->Handle); @@ -116,6 +121,103 @@ namespace NKikimr::NBlobDepot { TActor::PassAway(); } + void TBlobDepot::InitChannelKinds() { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT07, "InitChannelKinds", (Id, GetLogId())); + + TTabletStorageInfo *info = Info(); + const ui32 generation = Executor()->Generation(); + + Y_VERIFY(Channels.empty()); + + ui32 channel = 0; + for (const auto& profile : Config.GetChannelProfiles()) { + for (ui32 i = 0, count = profile.GetCount(); i < count; ++i, ++channel) { + const ui32 groupId = info->GroupFor(channel, generation); + if (channel >= 2) { + const auto kind = profile.GetChannelKind(); + auto& p = ChannelKinds[kind]; + p.ChannelToIndex[channel] = p.ChannelGroups.size(); + p.ChannelGroups.emplace_back(channel, groupId); + Channels.push_back({ + ui8(channel), + groupId, + kind, + &p, + {}, + TBlobSeqId{channel, generation, 1, 0}.ToSequentialNumber(), + {}, + {}, + {}, + }); + Groups[groupId].Channels[kind].push_back(channel); + } else { + Channels.push_back({ + ui8(channel), + groupId, + NKikimrBlobDepot::TChannelKind::System, + nullptr, + {}, + 0, + {}, + {}, + {}, + }); + } + } + } + } + + void TBlobDepot::InvalidateGroupForAllocation(ui32 groupId) { + const auto groupIt = Groups.find(groupId); + Y_VERIFY(groupIt != Groups.end()); + const auto& group = groupIt->second; + for (const auto& [kind, channels] : group.Channels) { + const auto kindIt = ChannelKinds.find(kind); + Y_VERIFY(kindIt != ChannelKinds.end()); + auto& kindv = kindIt->second; + kindv.GroupAccumWeights.clear(); // invalidate + } + } + + void TBlobDepot::PickChannels(NKikimrBlobDepot::TChannelKind::E kind, std::vector<ui8>& channels) { + const auto kindIt = ChannelKinds.find(kind); + Y_VERIFY(kindIt != ChannelKinds.end()); + auto& kindv = kindIt->second; + + if (kindv.GroupAccumWeights.empty()) { + // recalculate group weights + ui64 accum = 0; + THashSet<ui32> seenGroups; + for (const auto& [channel, groupId] : kindv.ChannelGroups) { + if (const auto& [_, inserted] = seenGroups.insert(groupId); inserted) { + accum += SpaceMonitor->GetGroupAllocationWeight(groupId); + kindv.GroupAccumWeights.emplace_back(groupId, accum); + } + } + Y_VERIFY(!kindv.GroupAccumWeights.empty()); + } + + const auto [_, accum] = kindv.GroupAccumWeights.back(); + for (ui8& channel : channels) { + const ui64 random = RandomNumber(accum); + const auto comp = [](ui64 x, const auto& y) { return x < std::get<1>(y); }; + const auto it = std::upper_bound(kindv.GroupAccumWeights.begin(), kindv.GroupAccumWeights.end(), random, comp); + Y_VERIFY(it != kindv.GroupAccumWeights.end()); + const auto [groupId, _] = *it; + + const auto groupIt = Groups.find(groupId); + Y_VERIFY(groupIt != Groups.end()); + auto& group = groupIt->second; + + const auto channelsIt = group.Channels.find(kind); + Y_VERIFY(channelsIt != group.Channels.end()); + const auto& channels = channelsIt->second; + + const size_t channelIndex = RandomNumber(channels.size()); + channel = channels[channelIndex]; + } + } + IActor *CreateBlobDepot(const TActorId& tablet, TTabletStorageInfo *info) { return new TBlobDepot(tablet, info); } diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h index 98b62f4f1f8..e229220259d 100644 --- a/ydb/core/blob_depot/blob_depot_tablet.h +++ b/ydb/core/blob_depot/blob_depot_tablet.h @@ -25,6 +25,7 @@ namespace NKikimr::NBlobDepot { EvCommitCertainKeys, EvDoGroupMetricsExchange, EvKickSpaceMonitor, + EvProcessRegisterAgentQ, }; }; @@ -81,28 +82,35 @@ namespace NKikimr::NBlobDepot { std::optional<ui32> NodeId; // as reported by RegisterAgent ui64 NextExpectedMsgId = 1; std::deque<std::unique_ptr<IEventHandle>> PostponeQ; - bool PostponeFromAgent = false; + bool ProcessThroughQueue = false; }; THashMap<TActorId, TPipeServerContext> PipeServers; THashMap<ui32, TAgent> Agents; // NodeId -> Agent + struct TChannelKind : NBlobDepot::TChannelKind { + std::vector<std::tuple<ui32, ui64>> GroupAccumWeights; // last one is the total weight + }; + THashMap<NKikimrBlobDepot::TChannelKind::E, TChannelKind> ChannelKinds; struct TChannelInfo { ui8 Index; + ui32 GroupId; NKikimrBlobDepot::TChannelKind::E ChannelKind; TChannelKind *KindPtr; TGivenIdRange GivenIdRanges; // accumulated through all agents ui64 NextBlobSeqId = 0; std::set<ui64> SequenceNumbersInFlight; // of blobs being committed + std::set<ui64> AssimilatedBlobsInFlight; std::optional<TBlobSeqId> LastReportedLeastId; // Obtain the least BlobSeqId that is not yet committed, but may be written by any agent TBlobSeqId GetLeastExpectedBlobId(ui32 generation) { const auto result = TBlobSeqId::FromSequentalNumber(Index, generation, Min(NextBlobSeqId, GivenIdRanges.IsEmpty() ? Max<ui64>() : GivenIdRanges.GetMinimumValue(), - SequenceNumbersInFlight.empty() ? Max<ui64>() : *SequenceNumbersInFlight.begin())); + SequenceNumbersInFlight.empty() ? Max<ui64>() : *SequenceNumbersInFlight.begin(), + AssimilatedBlobsInFlight.empty() ? Max<ui64>() : *AssimilatedBlobsInFlight.begin())); // this value can't decrease, because it may lead to data loss Y_VERIFY_S(!LastReportedLeastId || *LastReportedLeastId <= result, "decreasing LeastExpectedBlobId" @@ -110,7 +118,8 @@ namespace NKikimr::NBlobDepot { << " result# " << result.ToString() << " NextBlobSeqId# " << NextBlobSeqId << " GivenIdRanges# " << GivenIdRanges.ToString() - << " SequenceNumbersInFlight# " << FormatList(SequenceNumbersInFlight)); + << " SequenceNumbersInFlight# " << FormatList(SequenceNumbersInFlight) + << " AssimilatedBlobsInFlight# " << FormatList(AssimilatedBlobsInFlight)); LastReportedLeastId.emplace(result); return result; } @@ -118,6 +127,7 @@ namespace NKikimr::NBlobDepot { std::vector<TChannelInfo> Channels; struct TGroupInfo { + THashMap<NKikimrBlobDepot::TChannelKind::E, std::vector<ui8>> Channels; ui64 AllocatedBytes = 0; }; THashMap<ui32, TGroupInfo> Groups; @@ -136,6 +146,10 @@ namespace NKikimr::NBlobDepot { void ProcessRegisterAgentQ(); + bool ReadyForAgentQueries() const { + return Configured && (!Config.GetIsDecommittingGroup() || DecommitState >= EDecommitState::BlocksFinished); + } + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// void Enqueue(TAutoPtr<IEventHandle>& ev, const TActorContext&) override { @@ -182,6 +196,8 @@ namespace NKikimr::NBlobDepot { void PassAway() override; void InitChannelKinds(); + void InvalidateGroupForAllocation(ui32 groupId); + void PickChannels(NKikimrBlobDepot::TChannelKind::E kind, std::vector<ui8>& channels); TString GetLogId() const { const auto *executor = Executor(); diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp index 3bc3899d79a..ae62f4d6bdd 100644 --- a/ydb/core/blob_depot/data.cpp +++ b/ydb/core/blob_depot/data.cpp @@ -194,6 +194,7 @@ namespace NKikimr::NBlobDepot { void TData::UpdateKey(const TKey& key, const NKikimrBlobDepot::TEvCommitBlobSeq::TItem& item, NTabletFlatExecutor::TTransactionContext& txc, void *cookie) { STLOG(PRI_DEBUG, BLOB_DEPOT, BDT10, "UpdateKey", (Id, Self->GetLogId()), (Key, key), (Item, item)); + Y_VERIFY(Loaded || IsKeyLoaded(key)); UpdateKey(key, txc, cookie, "UpdateKey", [&](TValue& value, bool inserted) { if (!inserted) { // update value items value.Meta = item.GetMeta(); @@ -205,15 +206,30 @@ namespace NKikimr::NBlobDepot { auto *chain = value.ValueChain.Add(); auto *locator = chain->MutableLocator(); locator->CopyFrom(item.GetBlobLocator()); - - // reset original blob id, if any - value.OriginalBlobId.reset(); } return EUpdateOutcome::CHANGE; }, item); } + void TData::BindToBlob(const TKey& key, TBlobSeqId blobSeqId, NTabletFlatExecutor::TTransactionContext& txc, void *cookie) { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT49, "BindToBlob", (Id, Self->GetLogId()), (Key, key), (BlobSeqId, blobSeqId)); + Y_VERIFY(Loaded || IsKeyLoaded(key)); + UpdateKey(key, txc, cookie, "BindToBlob", [&](TValue& value, bool inserted) { + if (!value.ValueChain.empty()) { + return EUpdateOutcome::NO_CHANGE; + } else { + auto *chain = value.ValueChain.Add(); + auto *locator = chain->MutableLocator(); + locator->SetGroupId(Self->Info()->GroupFor(blobSeqId.Channel, blobSeqId.Generation)); + blobSeqId.ToProto(locator->MutableBlobSeqId()); + locator->SetTotalDataLen(key.GetBlobId().BlobSize()); + locator->SetFooterLen(0); + return inserted ? EUpdateOutcome::DROP : EUpdateOutcome::CHANGE; + } + }); + } + void TData::MakeKeyCertain(const TKey& key) { const auto it = Data.find(key); Y_VERIFY(it != Data.end()); @@ -321,25 +337,23 @@ namespace NKikimr::NBlobDepot { ValidateRecords(); } - void TData::AddDataOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBlob& blob, + bool TData::AddDataOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBlob& blob, NTabletFlatExecutor::TTransactionContext& txc, void *cookie) { - UpdateKey(TKey(blob.Id), txc, cookie, "AddDataOnDecommit", [&](TValue& value, bool inserted) { - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT49, "AddDataOnDecommit", (Id, Self->GetLogId()), (Blob, blob), - (Value, value), (Inserted, inserted)); + Y_VERIFY(Loaded || IsKeyLoaded(TKey(blob.Id))); + + return UpdateKey(TKey(blob.Id), txc, cookie, "AddDataOnDecommit", [&](TValue& value, bool inserted) { + bool change = inserted; // update keep state if necessary if (blob.DoNotKeep && value.KeepState < EKeepState::DoNotKeep) { value.KeepState = EKeepState::DoNotKeep; + change = true; } else if (blob.Keep && value.KeepState < EKeepState::Keep) { value.KeepState = EKeepState::Keep; + change = true; } - // if there is not value chain for this blob, map it to the original blob id - if (value.ValueChain.empty()) { - value.OriginalBlobId = blob.Id; - } - - return EUpdateOutcome::CHANGE; + return change ? EUpdateOutcome::CHANGE : EUpdateOutcome::NO_CHANGE; }); } @@ -359,8 +373,8 @@ namespace NKikimr::NBlobDepot { record.LastConfirmedGenStep = confirmedGenStep; } - bool TData::UpdateKeepState(TKey key, EKeepState keepState, - NTabletFlatExecutor::TTransactionContext& txc, void *cookie) { + bool TData::UpdateKeepState(TKey key, EKeepState keepState, NTabletFlatExecutor::TTransactionContext& txc, void *cookie) { + Y_VERIFY(Loaded || IsKeyLoaded(key)); return UpdateKey(std::move(key), txc, cookie, "UpdateKeepState", [&](TValue& value, bool inserted) { STLOG(PRI_DEBUG, BLOB_DEPOT, BDT51, "UpdateKeepState", (Id, Self->GetLogId()), (Key, key), (KeepState, keepState), (Value, value)); diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h index 1f00909bcda..024ae6c0136 100644 --- a/ydb/core/blob_depot/data.h +++ b/ydb/core/blob_depot/data.h @@ -223,7 +223,6 @@ namespace NKikimr::NBlobDepot { TValueChain ValueChain; EKeepState KeepState = EKeepState::Default; bool Public = false; - std::optional<TLogoBlobID> OriginalBlobId; bool UncertainWrite = false; TValue() = default; @@ -238,9 +237,6 @@ namespace NKikimr::NBlobDepot { , ValueChain(std::move(*proto.MutableValueChain())) , KeepState(proto.GetKeepState()) , Public(proto.GetPublic()) - , OriginalBlobId(proto.HasOriginalBlobId() - ? std::make_optional(LogoBlobIDFromLogoBlobID(proto.GetOriginalBlobId())) - : std::nullopt) , UncertainWrite(uncertainWrite) {} @@ -277,9 +273,6 @@ namespace NKikimr::NBlobDepot { if (Public != proto->GetPublic()) { proto->SetPublic(Public); } - if (OriginalBlobId) { - LogoBlobIDFromLogoBlobID(*OriginalBlobId, proto->MutableOriginalBlobId()); - } } static bool Validate(const NKikimrBlobDepot::TEvCommitBlobSeq::TItem& item); @@ -305,7 +298,6 @@ namespace NKikimr::NBlobDepot { << " ValueChain# " << FormatList(ValueChain) << " KeepState# " << EKeepState_Name(KeepState) << " Public# " << (Public ? "true" : "false") - << " OriginalBlobId# " << (OriginalBlobId ? OriginalBlobId->ToString() : "<none>") << " UncertainWrite# " << (UncertainWrite ? "true" : "false") << "}"; } @@ -363,8 +355,9 @@ namespace NKikimr::NBlobDepot { struct TResolveDecommitContext { TEvBlobDepot::TEvResolve::TPtr Ev; // original resolve request - ui32 NumRangesInFlight = 0; - bool Errors = false; + ui32 NumRangesInFlight; + std::deque<TEvBlobStorage::TEvAssimilateResult::TBlob> DecommitBlobs = {}; + TIntervalMap<TLogoBlobID> Errors = {}; }; ui64 LastRangeId = 0; THashMap<ui64, TResolveDecommitContext> ResolveDecommitContexts; @@ -446,6 +439,8 @@ namespace NKikimr::NBlobDepot { void UpdateKey(const TKey& key, const NKikimrBlobDepot::TEvCommitBlobSeq::TItem& item, NTabletFlatExecutor::TTransactionContext& txc, void *cookie); + void BindToBlob(const TKey& key, TBlobSeqId blobSeqId, NTabletFlatExecutor::TTransactionContext& txc, void *cookie); + void MakeKeyCertain(const TKey& key); void HandleCommitCertainKeys(); @@ -454,7 +449,7 @@ namespace NKikimr::NBlobDepot { void AddLoadSkip(TKey key); void AddDataOnLoad(TKey key, TString value, bool uncertainWrite, bool skip); - void AddDataOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBlob& blob, + bool AddDataOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBlob& blob, NTabletFlatExecutor::TTransactionContext& txc, void *cookie); void AddTrashOnLoad(TLogoBlobID id); void AddGenStepOnLoad(ui8 channel, ui32 groupId, TGenStep issuedGenStep, TGenStep confirmedGenStep); diff --git a/ydb/core/blob_depot/data_resolve.cpp b/ydb/core/blob_depot/data_resolve.cpp index bfffd2dba53..ed11f6ba471 100644 --- a/ydb/core/blob_depot/data_resolve.cpp +++ b/ydb/core/blob_depot/data_resolve.cpp @@ -7,6 +7,13 @@ namespace NKikimr::NBlobDepot { using TData = TBlobDepot::TData; + namespace { + TLogoBlobID BlobIdUpperBound(TLogoBlobID id) { + return TLogoBlobID(id.TabletID(), id.Generation(), id.Step(), id.Channel(), TLogoBlobID::MaxBlobSize, + id.Cookie(), TLogoBlobID::MaxPartId, TLogoBlobID::MaxCrcMode); + } + } + TData::TResolveResultAccumulator::TResolveResultAccumulator(TEventHandle<TEvBlobDepot::TEvResolve>& ev) : Sender(ev.Sender) , Recipient(ev.Recipient) @@ -83,6 +90,10 @@ namespace NKikimr::NBlobDepot { class TData::TTxResolve : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { std::unique_ptr<TEvBlobDepot::TEvResolve::THandle> Request; + std::deque<TEvBlobStorage::TEvAssimilateResult::TBlob> DecommitBlobs; + TIntervalMap<TLogoBlobID> Errors; + + bool KeysLoaded = false; int ItemIndex = 0; std::optional<TKey> LastScannedKey; ui32 NumKeysRead = 0; // number of keys already read for this item @@ -95,15 +106,22 @@ namespace NKikimr::NBlobDepot { public: TTxType GetTxType() const override { return NKikimrBlobDepot::TXTYPE_RESOLVE; } - TTxResolve(TBlobDepot *self, TEvBlobDepot::TEvResolve::TPtr request) + TTxResolve(TBlobDepot *self, TEvBlobDepot::TEvResolve::TPtr request, + std::deque<TEvBlobStorage::TEvAssimilateResult::TBlob>&& decommitBlobs = {}, + TIntervalMap<TLogoBlobID>&& errors = {}) : TTransactionBase(self) , Request(request.Release()) + , DecommitBlobs(std::move(decommitBlobs)) + , Errors(std::move(errors)) , Result(*Request) {} TTxResolve(TTxResolve& predecessor) : TTransactionBase(predecessor.Self) , Request(std::move(predecessor.Request)) + , DecommitBlobs(std::move(predecessor.DecommitBlobs)) + , Errors(std::move(predecessor.Errors)) + , KeysLoaded(predecessor.KeysLoaded) , ItemIndex(predecessor.ItemIndex) , LastScannedKey(std::move(predecessor.LastScannedKey)) , NumKeysRead(predecessor.NumKeysRead) @@ -148,18 +166,71 @@ namespace NKikimr::NBlobDepot { } bool Execute(TTransactionContext& txc, const TActorContext&) override { - NIceDb::TNiceDb db(txc.DB); - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT22, "TTxResolve::Execute", (Id, Self->GetLogId()), (Sender, Request->Sender), (Cookie, Request->Cookie), (ItemIndex, ItemIndex), - (LastScannedKey, LastScannedKey)); + (LastScannedKey, LastScannedKey), (DecommitBlobs.size, DecommitBlobs.size())); + + bool progress = false; + if (!KeysLoaded && !LoadKeys(txc, progress)) { + return progress; + } else { + KeysLoaded = true; + } + + for (ui32 numItemsRemain = 10'000; !DecommitBlobs.empty(); DecommitBlobs.pop_front()) { + if (numItemsRemain) { + numItemsRemain -= Self->Data->AddDataOnDecommit(DecommitBlobs.front(), txc, this); + } else { + SuccessorTx = true; + return true; + } + } + + const auto& record = Request->Get()->Record; + for (const auto& item : record.GetItems()) { + std::optional<ui64> cookie = item.HasCookie() ? std::make_optional(item.GetCookie()) : std::nullopt; + + std::optional<TKey> begin; + std::optional<TKey> end; + TScanFlags flags; + ui64 maxKeys; + const bool success = GetScanParams(item, &begin, &end, &flags, &maxKeys); + Y_VERIFY_DEBUG(success); + + // we have everything we need contained in memory, generate response from memory + auto callback = [&](const TKey& key, const TValue& value) { + IssueResponseItem(cookie, key, value); + return --maxKeys != 0; + }; + Self->Data->ScanRange(begin ? &begin.value() : nullptr, end ? &end.value() : nullptr, flags, callback); + } + + return true; + } + + void Complete(const TActorContext&) override { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT30, "TTxResolve::Complete", (Id, Self->GetLogId()), + (Sender, Request->Sender), (Cookie, Request->Cookie), (SuccessorTx, SuccessorTx), + (Uncertainties.size, Uncertainties.size())); + + Self->Data->CommitTrash(this); + + if (SuccessorTx) { + Self->Execute(std::make_unique<TTxResolve>(*this)); + } else if (Uncertainties.empty()) { + Result.Send(NKikimrProto::OK, std::nullopt); + } else { + Self->Data->UncertaintyResolver->PushResultWithUncertainties(std::move(Result), std::move(Uncertainties)); + } + } + + bool LoadKeys(TTransactionContext& txc, bool& progress) { + NIceDb::TNiceDb db(txc.DB); - bool progress = false; // have we made some progress during scan? const auto& record = Request->Get()->Record; const auto& items = record.GetItems(); for (; ItemIndex < items.size(); ++ItemIndex, LastScannedKey.reset(), NumKeysRead = 0) { const auto& item = items[ItemIndex]; - std::optional<ui64> cookie = item.HasCookie() ? std::make_optional(item.GetCookie()) : std::nullopt; std::optional<TKey> begin; std::optional<TKey> end; @@ -180,12 +251,6 @@ namespace NKikimr::NBlobDepot { } if (Self->Data->Loaded || (end && Self->Data->LastLoadedKey && *end <= *Self->Data->LastLoadedKey)) { - // we have everything we need contained in memory, generate response from memory - auto callback = [&](const TKey& key, const TValue& value) { - IssueResponseItem(cookie, key, value); - return ++NumKeysRead != maxKeys; - }; - Self->Data->ScanRange(begin ? &begin.value() : nullptr, end ? &end.value() : nullptr, flags, callback); continue; } @@ -193,8 +258,7 @@ namespace NKikimr::NBlobDepot { Y_VERIFY(!end || *Self->Data->LastLoadedKey < *end); // special case -- forward scan and we have some data in memory - auto callback = [&](const TKey& key, const TValue& value) { - IssueResponseItem(cookie, key, value); + auto callback = [&](const TKey& key, const TValue& /*value*/) { LastScannedKey = key; return ++NumKeysRead != maxKeys; }; @@ -206,7 +270,7 @@ namespace NKikimr::NBlobDepot { flags &= ~EScanFlags::INCLUDE_BEGIN; // check if we have read all the keys requested - if (NumKeysRead == maxKeys) { + if (maxKeys && NumKeysRead == maxKeys) { continue; } } @@ -234,7 +298,6 @@ namespace NKikimr::NBlobDepot { if (matchBegin && matchEnd) { const TValue *value = Self->Data->FindKey(key); Y_VERIFY(value); // value must exist as it was just loaded into memory and exists in the database - IssueResponseItem(cookie, key, *value); if (++NumKeysRead == maxKeys) { // we have hit the MaxItems limit, exit return true; @@ -277,20 +340,6 @@ namespace NKikimr::NBlobDepot { return true; } - void Complete(const TActorContext&) override { - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT30, "TTxResolve::Complete", (Id, Self->GetLogId()), - (Sender, Request->Sender), (Cookie, Request->Cookie), (SuccessorTx, SuccessorTx), - (Uncertainties.size, Uncertainties.size())); - - if (SuccessorTx) { - Self->Execute(std::make_unique<TTxResolve>(*this)); - } else if (Uncertainties.empty()) { - Result.Send(NKikimrProto::OK, std::nullopt); - } else { - Self->Data->UncertaintyResolver->PushResultWithUncertainties(std::move(Result), std::move(Uncertainties)); - } - } - void IssueResponseItem(std::optional<ui64> cookie, const TKey& key, const TValue& value) { NKikimrBlobDepot::TEvResolveResult::TResolvedKey item; @@ -298,31 +347,40 @@ namespace NKikimr::NBlobDepot { item.SetCookie(*cookie); } item.SetKey(key.MakeBinaryKey()); - EnumerateBlobsForValueChain(value.ValueChain, Self->TabletID(), [&](const TLogoBlobID& id, ui32 begin, ui32 end) { - if (begin != end) { - auto *out = item.AddValueChain(); - out->SetGroupId(Self->Info()->GroupFor(id.Channel(), id.Generation())); - LogoBlobIDFromLogoBlobID(id, out->MutableBlobId()); - if (begin) { - out->SetSubrangeBegin(begin); - } - if (end != id.BlobSize()) { - out->SetSubrangeEnd(end); - } - } - }); - if (value.OriginalBlobId) { - Y_VERIFY(item.GetValueChain().empty()); + if (value.ValueChain.empty() && Self->Config.GetIsDecommittingGroup()) { + Y_VERIFY(!value.UncertainWrite); auto *out = item.AddValueChain(); out->SetGroupId(Self->Config.GetVirtualGroupId()); - LogoBlobIDFromLogoBlobID(*value.OriginalBlobId, out->MutableBlobId()); - Y_VERIFY(!value.UncertainWrite); + LogoBlobIDFromLogoBlobID(key.GetBlobId(), out->MutableBlobId()); + } else { + EnumerateBlobsForValueChain(value.ValueChain, Self->TabletID(), [&](const TLogoBlobID& id, ui32 begin, ui32 end) { + if (begin != end) { + auto *out = item.AddValueChain(); + out->SetGroupId(Self->Info()->GroupFor(id.Channel(), id.Generation())); + LogoBlobIDFromLogoBlobID(id, out->MutableBlobId()); + if (begin) { + out->SetSubrangeBegin(begin); + } + if (end != id.BlobSize()) { + out->SetSubrangeEnd(end); + } + } + }); } if (value.Meta) { item.SetMeta(value.Meta.data(), value.Meta.size()); } - if (!item.ValueChainSize()) { + bool foundError = false; + + if (Errors) { + const TLogoBlobID id = key.GetBlobId(); + foundError = TIntervalSet<TLogoBlobID>(id, BlobIdUpperBound(id)).IsSubsetOf(Errors); + } + + if (foundError) { + item.SetErrorReason("item resolution error"); + } else if (!item.ValueChainSize()) { STLOG(PRI_WARN, BLOB_DEPOT, BDT48, "empty ValueChain on Resolve", (Id, Self->GetLogId()), (Key, key), (Value, value), (Item, item), (Sender, Request->Sender), (Cookie, Request->Cookie)); } @@ -389,7 +447,7 @@ namespace NKikimr::NBlobDepot { } if (minId == maxId) { const auto it = Data.find(TKey(minId)); - if (it != Data.end() && (!it->second.ValueChain.empty() || it->second.OriginalBlobId)) { + if (it != Data.end() && !it->second.ValueChain.empty()) { continue; // fast path for extreme queries } } @@ -404,12 +462,8 @@ namespace NKikimr::NBlobDepot { TInstant::Max(), true); ev->Decommission = true; - const auto& tabletId_ = tabletId; - const auto& minId_ = minId; - const auto& maxId_ = maxId; - const auto& mustRestoreFirst_ = mustRestoreFirst; - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT46, "going to TEvRange", (Id, Self->GetLogId()), (TabletId, tabletId_), - (MinId, minId_), (MaxId, maxId_), (MustRestoreFirst, mustRestoreFirst_)); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT46, "going to TEvRange", (Id, Self->GetLogId()), (TabletId, tabletId), + (MinId, minId), (MaxId, maxId), (MustRestoreFirst, mustRestoreFirst), (Cookie, id)); SendToBSProxy(Self->SelfId(), Self->Config.GetVirtualGroupId(), ev.release(), id); } ResolveDecommitContexts[id] = {ev, (ui32)queries.size()}; @@ -421,54 +475,27 @@ namespace NKikimr::NBlobDepot { } void TData::Handle(TEvBlobStorage::TEvRangeResult::TPtr ev) { - class TTxCommitRange : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { - TEvBlobStorage::TEvRangeResult::TPtr Ev; - - public: - TTxType GetTxType() const override { return NKikimrBlobDepot::TXTYPE_COMMIT_RANGE; } - - TTxCommitRange(TBlobDepot *self, TEvBlobStorage::TEvRangeResult::TPtr ev) - : TTransactionBase(self) - , Ev(ev) - {} - - bool Execute(TTransactionContext& txc, const TActorContext&) override { - if (Ev->Get()->Status == NKikimrProto::OK) { - for (const auto& response : Ev->Get()->Responses) { - Self->Data->AddDataOnDecommit({ - .Id = response.Id, - .Keep = response.Keep, - .DoNotKeep = response.DoNotKeep - }, txc, this); - } - } - return true; - } + auto& msg = *ev->Get(); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT50, "TEvRangeResult", (Id, Self->GetLogId()), (Msg, msg), (Cookie, ev->Cookie)); - void Complete(const TActorContext&) override { - Self->Data->CommitTrash(this); + auto& contexts = Self->Data->ResolveDecommitContexts; + if (const auto it = contexts.find(ev->Cookie); it != contexts.end()) { + auto& context = it->second; - auto& contexts = Self->Data->ResolveDecommitContexts; - if (const auto it = contexts.find(Ev->Cookie); it != contexts.end()) { - TResolveDecommitContext& context = it->second; - if (Ev->Get()->Status != NKikimrProto::OK) { - context.Errors = true; - } - if (!--context.NumRangesInFlight) { - if (context.Errors) { - auto [response, record] = TEvBlobDepot::MakeResponseFor(*context.Ev, NKikimrProto::ERROR, - "errors in range queries"); - TActivationContext::Send(response.release()); - } else { - Self->Execute(std::make_unique<TTxResolve>(Self, context.Ev)); - } - } - contexts.erase(it); + if (msg.Status == NKikimrProto::OK) { + for (const auto& response : msg.Responses) { + context.DecommitBlobs.push_back({response.Id, response.Keep, response.DoNotKeep}); } + } else { + context.Errors.Add(msg.From, BlobIdUpperBound(msg.To)); } - }; - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT50, "TEvRangeResult", (Id, Self->GetLogId()), (Msg, *ev->Get())); - Self->Execute(std::make_unique<TTxCommitRange>(Self, ev)); + + if (!--context.NumRangesInFlight) { + Self->Execute(std::make_unique<TTxResolve>(Self, context.Ev, std::move(context.DecommitBlobs), + std::move(context.Errors))); + contexts.erase(it); + } + } } } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/defs.h b/ydb/core/blob_depot/defs.h index 6e201abe902..809281a0e85 100644 --- a/ydb/core/blob_depot/defs.h +++ b/ydb/core/blob_depot/defs.h @@ -14,6 +14,7 @@ #include <ydb/core/tablet_flat/flat_cxx_database.h> #include <ydb/core/protos/blob_depot.pb.h> #include <ydb/core/protos/counters_blob_depot.pb.h> +#include <ydb/core/util/interval_set.h> #include <ydb/core/util/format.h> #include <ydb/core/util/stlog.h> diff --git a/ydb/core/blob_depot/space_monitor.cpp b/ydb/core/blob_depot/space_monitor.cpp index 27d77a4a7b0..2809d3a4c79 100644 --- a/ydb/core/blob_depot/space_monitor.cpp +++ b/ydb/core/blob_depot/space_monitor.cpp @@ -22,6 +22,7 @@ namespace NKikimr::NBlobDepot { if (msg.Status == NKikimrProto::OK) { group.StatusFlags = msg.StatusFlags; group.ApproximateFreeSpaceShare = msg.ApproximateFreeSpaceShare; + Self->InvalidateGroupForAllocation(groupId); } } diff --git a/ydb/core/blob_depot/types.h b/ydb/core/blob_depot/types.h index 069de64e6a4..72714424d63 100644 --- a/ydb/core/blob_depot/types.h +++ b/ydb/core/blob_depot/types.h @@ -69,6 +69,15 @@ namespace NKikimr::NBlobDepot { }; } + static TBlobSeqId FromLogoBlobId(TLogoBlobID id) { + return TBlobSeqId{ + id.Channel(), + id.Generation(), + id.Step(), + IndexFromCookie(id.Cookie()) + }; + } + void ToProto(NKikimrBlobDepot::TBlobSeqId *proto) const { proto->SetChannel(Channel); proto->SetGeneration(Generation); @@ -94,6 +103,14 @@ namespace NKikimr::NBlobDepot { Y_FAIL(); } + + static ui32 IndexFromCookie(ui32 cookie) { + static constexpr ui32 typeBits = 24 - IndexBits; + const auto type = static_cast<EBlobType>(cookie & ((1 << typeBits) - 1)); + Y_VERIFY(type == EBlobType::VG_COMPOSITE_BLOB || type == EBlobType::VG_DATA_BLOB || + type == EBlobType::VG_FOOTER_BLOB || type == EBlobType::VG_GC_BLOB); + return cookie >> typeBits; + } }; class TGivenIdRange { diff --git a/ydb/core/mind/bscontroller/impl.h b/ydb/core/mind/bscontroller/impl.h index 8a9b182e959..2b671db14a2 100644 --- a/ydb/core/mind/bscontroller/impl.h +++ b/ydb/core/mind/bscontroller/impl.h @@ -1502,7 +1502,7 @@ private: } TVSlotInfo* FindVSlot(TVDiskID id) { // GroupGeneration may be zero - if (TGroupInfo *group = FindGroup(id.GroupID)) { + if (TGroupInfo *group = FindGroup(id.GroupID); group && !group->VDisksInGroup.empty()) { const ui32 index = group->Topology->GetOrderNumber(id); const TVSlotInfo *slot = group->VDisksInGroup[index]; Y_VERIFY(slot->GetShortVDiskId() == TVDiskIdShort(id)); // sanity check diff --git a/ydb/core/mind/bscontroller/load_everything.cpp b/ydb/core/mind/bscontroller/load_everything.cpp index d370d27a66b..a6d75e057f7 100644 --- a/ydb/core/mind/bscontroller/load_everything.cpp +++ b/ydb/core/mind/bscontroller/load_everything.cpp @@ -200,6 +200,9 @@ public: std::get<2>(geom)); group.DecommitStatus = groups.GetValueOrDefault<T::DecommitStatus>(); + if (group.DecommitStatus == NKikimrBlobStorage::TGroupDecommitStatus::DONE) { + group.VDisksInGroup.clear(); + } #define OPTIONAL(NAME) \ if (groups.HaveValue<T::NAME>()) { \ diff --git a/ydb/core/protos/counters_blob_depot.proto b/ydb/core/protos/counters_blob_depot.proto index 698768b083d..16225814a81 100644 --- a/ydb/core/protos/counters_blob_depot.proto +++ b/ydb/core/protos/counters_blob_depot.proto @@ -42,7 +42,7 @@ enum EPercentileCounters { enum ETxTypes { TXTYPE_PUT_ASSIMILATED_DATA = 0 [(NKikimr.TxTypeOpts) = {Name: "TTxPutAssimilatedData"}]; - TXTYPE_DROP_BLOB_IF_NODATA = 1 [(NKikimr.TxTypeOpts) = {Name: "TTxDropBlobIfNoData"}]; + TXTYPE_COMMIT_ASSIMILATED_BLOB = 1 [(NKikimr.TxTypeOpts) = {Name: "TTxCommitAssimilatedBlob"}]; TXTYPE_FINISH_COPYING = 2 [(NKikimr.TxTypeOpts) = {Name: "TTxFinishCopying"}]; TXTYPE_FINISH_DECOMMISSION = 3 [(NKikimr.TxTypeOpts) = {Name: "TTxFinishDecommission"}]; TXTYPE_MON_DATA = 4 [(NKikimr.TxTypeOpts) = {Name: "TTxMonData"}]; @@ -53,9 +53,8 @@ enum ETxTypes { TXTYPE_CONFIRM_GC = 9 [(NKikimr.TxTypeOpts) = {Name: "TTxConfirmGC"}]; TXTYPE_COMMIT_CERTAIN_KEYS = 10 [(NKikimr.TxTypeOpts) = {Name: "TTxCommitCertainKeys"}]; TXTYPE_RESOLVE = 11 [(NKikimr.TxTypeOpts) = {Name: "TTxResolve"}]; - TXTYPE_COMMIT_RANGE = 12 [(NKikimr.TxTypeOpts) = {Name: "TTxCommitRange"}]; - TXTYPE_DATA_LOAD = 13 [(NKikimr.TxTypeOpts) = {Name: "TTxDataLoad"}]; - TXTYPE_COLLECT_GARBAGE = 14 [(NKikimr.TxTypeOpts) = {Name: "TTxCollectGarbage"}]; - TXTYPE_COMMIT_BLOB_SEQ = 15 [(NKikimr.TxTypeOpts) = {Name: "TTxCommitBlobSeq"}]; - TXTYPE_UPDATE_BLOCK = 16 [(NKikimr.TxTypeOpts) = {Name: "TTxUpdateBlock"}]; + TXTYPE_DATA_LOAD = 12 [(NKikimr.TxTypeOpts) = {Name: "TTxDataLoad"}]; + TXTYPE_COLLECT_GARBAGE = 13 [(NKikimr.TxTypeOpts) = {Name: "TTxCollectGarbage"}]; + TXTYPE_COMMIT_BLOB_SEQ = 14 [(NKikimr.TxTypeOpts) = {Name: "TTxCommitBlobSeq"}]; + TXTYPE_UPDATE_BLOCK = 15 [(NKikimr.TxTypeOpts) = {Name: "TTxUpdateBlock"}]; } |