diff options
author | alexvru <alexvru@ydb.tech> | 2022-12-02 10:59:00 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2022-12-02 10:59:00 +0300 |
commit | 567ea46fac457b40c5b618839857ef1772af38c6 (patch) | |
tree | 080d66a52485ce881a34de7dc813175611f3cade | |
parent | 4dafd91976dfadc522dc2edee34eeb89e185cac7 (diff) | |
download | ydb-567ea46fac457b40c5b618839857ef1772af38c6.tar.gz |
Harden result checking and fix collection and commit race problem
-rw-r--r-- | ydb/core/blob_depot/agent.cpp | 45 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/storage_discover.cpp | 1 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/storage_get.cpp | 32 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/storage_range.cpp | 14 | ||||
-rw-r--r-- | ydb/core/blob_depot/blob_depot_tablet.h | 23 | ||||
-rw-r--r-- | ydb/core/blob_depot/data.cpp | 77 | ||||
-rw-r--r-- | ydb/core/blob_depot/data.h | 7 | ||||
-rw-r--r-- | ydb/core/blob_depot/given_id_range.cpp | 5 | ||||
-rw-r--r-- | ydb/core/blob_depot/op_commit_blob_seq.cpp | 80 | ||||
-rw-r--r-- | ydb/core/blob_depot/types.h | 11 |
10 files changed, 184 insertions, 111 deletions
diff --git a/ydb/core/blob_depot/agent.cpp b/ydb/core/blob_depot/agent.cpp index 1e9e54febc..b3836650aa 100644 --- a/ydb/core/blob_depot/agent.cpp +++ b/ydb/core/blob_depot/agent.cpp @@ -226,11 +226,14 @@ namespace NKikimr::NBlobDepot { } // calculate if this agent can be blocking garbage collection by holding least conserved blob sequence id - const bool unblock = Channels[channel].GivenIdRanges.GetMinimumValue() == agentGivenIdRange.GetMinimumValue(); + auto& givenIdRanges = Channels[channel].GivenIdRanges; + const bool unblock = givenIdRanges.GetMinimumValue() == agentGivenIdRange.GetMinimumValue(); STLOG(PRI_DEBUG, BLOB_DEPOT, BDT06, "ResetAgent", (Id, GetLogId()), (AgentId, agent.Connection->NodeId), - (Channel, int(channel)), (GivenIdRanges, Channels[channel].GivenIdRanges), - (Agent.GivenIdRanges, agentGivenIdRange), (Unblock, unblock)); + (Channel, int(channel)), (GivenIdRanges, givenIdRanges), (Agent.GivenIdRanges, agentGivenIdRange), + (Unblock, unblock)); + + givenIdRanges.Subtract(std::exchange(agentGivenIdRange, {})); if (unblock) { Data->OnLeastExpectedBlobIdChange(channel); @@ -261,6 +264,8 @@ namespace NKikimr::NBlobDepot { &p, {}, TBlobSeqId{channel, generation, 1, 0}.ToSequentialNumber(), + {}, + {}, }); } else { Channels.push_back({ @@ -268,7 +273,9 @@ namespace NKikimr::NBlobDepot { NKikimrBlobDepot::TChannelKind::System, nullptr, {}, - 0 + 0, + {}, + {}, }); } } @@ -276,32 +283,12 @@ namespace NKikimr::NBlobDepot { } void TBlobDepot::Handle(TEvBlobDepot::TEvPushNotifyResult::TPtr ev) { - class TTxInvokeCallback : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { - const ui32 NodeId; - TEvBlobDepot::TEvPushNotifyResult::TPtr Ev; - - public: - TTxInvokeCallback(TBlobDepot *self, ui32 nodeId, TEvBlobDepot::TEvPushNotifyResult::TPtr ev) - : TTransactionBase(self) - , NodeId(nodeId) - , Ev(ev) - {} - - bool Execute(TTransactionContext& /*txc*/, const TActorContext&) override { - TAgent& agent = Self->GetAgent(NodeId); - if (const auto it = agent.PushCallbacks.find(Ev->Get()->Record.GetId()); it != agent.PushCallbacks.end()) { - auto callback = std::move(it->second); - agent.PushCallbacks.erase(it); - callback(Ev); - } - return true; - } - - void Complete(const TActorContext&) override {} - }; - TAgent& agent = GetAgent(ev->Recipient); - Execute(std::make_unique<TTxInvokeCallback>(this, agent.Connection->NodeId, ev)); + if (const auto it = agent.PushCallbacks.find(ev->Get()->Record.GetId()); it != agent.PushCallbacks.end()) { + auto callback = std::move(it->second); + agent.PushCallbacks.erase(it); + callback(ev); + } } void TBlobDepot::ProcessRegisterAgentQ() { diff --git a/ydb/core/blob_depot/agent/storage_discover.cpp b/ydb/core/blob_depot/agent/storage_discover.cpp index a65c558f78..7b52f1be96 100644 --- a/ydb/core/blob_depot/agent/storage_discover.cpp +++ b/ydb/core/blob_depot/agent/storage_discover.cpp @@ -162,6 +162,7 @@ namespace NKikimr::NBlobDepot { void CheckIfDone() { if (DoneWithBlockedGeneration && DoneWithData) { + Y_VERIFY_S(!Id || !ReadBody || Buffer.size() == Id.BlobSize(), "Id# " << Id << " Buffer.size# " << Buffer.size()); EndWithSuccess(Id ? std::make_unique<TEvBlobStorage::TEvDiscoverResult>(Id, MinGeneration, Buffer, BlockedGeneration) : std::make_unique<TEvBlobStorage::TEvDiscoverResult>(NKikimrProto::NODATA, MinGeneration, BlockedGeneration)); diff --git a/ydb/core/blob_depot/agent/storage_get.cpp b/ydb/core/blob_depot/agent/storage_get.cpp index 5a66a5853f..7b011d2d52 100644 --- a/ydb/core/blob_depot/agent/storage_get.cpp +++ b/ydb/core/blob_depot/agent/storage_get.cpp @@ -62,6 +62,8 @@ namespace NKikimr::NBlobDepot { (QueryId, GetQueryId()), (QueryIdx, i), (BlobId, query.Id)); } } + + CheckAndFinish(); } bool ProcessSingleResult(ui32 queryIdx, const TResolvedValueChain *value, const std::optional<TString>& errorReason) { @@ -69,6 +71,7 @@ namespace NKikimr::NBlobDepot { (QueryId, GetQueryId()), (QueryIdx, queryIdx), (Value, value), (ErrorReason, errorReason)); auto& r = Response->Responses[queryIdx]; + Y_VERIFY(r.Status == NKikimrProto::UNKNOWN); if (errorReason) { r.Status = NKikimrProto::ERROR; --AnswersRemain; @@ -95,20 +98,33 @@ namespace NKikimr::NBlobDepot { return false; } } - if (!AnswersRemain) { - EndWithSuccess(std::move(Response)); - return false; - } return true; } void OnRead(ui64 tag, NKikimrProto::EReplyStatus status, TString buffer) override { auto& resp = Response->Responses[tag]; + Y_VERIFY(resp.Status == NKikimrProto::UNKNOWN); resp.Status = status; if (status == NKikimrProto::OK) { resp.Buffer = std::move(buffer); } - if (!--AnswersRemain) { + --AnswersRemain; + CheckAndFinish(); + } + + void CheckAndFinish() { + if (!AnswersRemain) { + if (!Request.IsIndexOnly) { + for (size_t i = 0, count = Response->ResponseSz; i < count; ++i) { + const auto& item = Response->Responses[i]; + if (item.Status == NKikimrProto::OK) { + Y_VERIFY_S(item.Buffer.size() == item.RequestedSize ? Min(item.RequestedSize, + item.Id.BlobSize() - Min(item.Id.BlobSize(), item.Shift)) : item.Id.BlobSize(), + "Id# " << item.Id << " Shift# " << item.Shift << " RequestedSize# " << item.RequestedSize + << " Buffer.size# " << item.Buffer.size()); + } + } + } EndWithSuccess(std::move(Response)); } } @@ -116,14 +132,14 @@ namespace NKikimr::NBlobDepot { void ProcessResponse(ui64 /*id*/, TRequestContext::TPtr context, TResponse response) override { if (auto *p = std::get_if<TKeyResolved>(&response)) { ProcessSingleResult(context->Obtain<TResolveKeyContext>().QueryIdx, p->ValueChain, p->ErrorReason); + CheckAndFinish(); } else if (auto *p = std::get_if<TEvBlobStorage::TEvGetResult*>(&response)) { Agent.HandleGetResult(context, **p); } else if (std::holds_alternative<TTabletDisconnected>(response)) { if (auto *resolveContext = dynamic_cast<TResolveKeyContext*>(context.get())) { Response->Responses[resolveContext->QueryIdx].Status = NKikimrProto::ERROR; - if (!--AnswersRemain) { - EndWithSuccess(std::move(Response)); - } + --AnswersRemain; + CheckAndFinish(); } } else { Y_FAIL(); diff --git a/ydb/core/blob_depot/agent/storage_range.cpp b/ydb/core/blob_depot/agent/storage_range.cpp index 5c889ff675..db6c462214 100644 --- a/ydb/core/blob_depot/agent/storage_range.cpp +++ b/ydb/core/blob_depot/agent/storage_range.cpp @@ -128,9 +128,7 @@ namespace NKikimr::NBlobDepot { if (msg.GetStatus() == NKikimrProto::OVERRUN) { Agent.RegisterRequest(id, this, std::move(context), {}, true); } else if (msg.GetStatus() == NKikimrProto::OK) { - if (!ReadsInFlight && !ResolvesInFlight) { - EndWithSuccess(std::move(Response)); - } + CheckAndFinish(); } else { Y_UNREACHABLE(); } @@ -154,7 +152,17 @@ namespace NKikimr::NBlobDepot { << item.Id << " Error# " << dataOrErrorReason); } + CheckAndFinish(); + } + + void CheckAndFinish() { if (!ReadsInFlight && !ResolvesInFlight) { + if (!Request.IsIndexOnly) { + for (const auto& response : Response->Responses) { + Y_VERIFY_S(response.Buffer.size() == response.Id.BlobSize(), "Id# " << response.Id + << " Buffer.size# " << response.Buffer.size()); + } + } EndWithSuccess(std::move(Response)); } } diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h index 40295e2bc2..a2c21afbfc 100644 --- a/ydb/core/blob_depot/blob_depot_tablet.h +++ b/ydb/core/blob_depot/blob_depot_tablet.h @@ -92,11 +92,24 @@ namespace NKikimr::NBlobDepot { TChannelKind *KindPtr; TGivenIdRange GivenIdRanges; // accumulated through all agents ui64 NextBlobSeqId = 0; - - // Obtain the least BlobSeqId that is not yet confirmed, but may be written by any agent - TBlobSeqId GetLeastExpectedBlobId(ui32 generation) const { - return TBlobSeqId::FromSequentalNumber(Index, generation, GivenIdRanges.IsEmpty() ? NextBlobSeqId : - GivenIdRanges.GetMinimumValue()); + std::set<ui64> SequenceNumbersInFlight; // of blobs being committed + std::optional<TBlobSeqId> LastReportedLeastId; + + // Obtain the least BlobSeqId that is not yet committed, but may be written by any agent + TBlobSeqId GetLeastExpectedBlobId(ui32 generation) { + const auto result = TBlobSeqId::FromSequentalNumber(Index, generation, Min(NextBlobSeqId, + GivenIdRanges.IsEmpty() ? Max<ui64>() : GivenIdRanges.GetMinimumValue(), + SequenceNumbersInFlight.empty() ? Max<ui64>() : *SequenceNumbersInFlight.begin())); + // this value can't decrease, because it may lead to data loss + Y_VERIFY_S(!LastReportedLeastId || *LastReportedLeastId <= result, + "decreasing LeastExpectedBlobId" + << " LastReportedLeastId# " << LastReportedLeastId->ToString() + << " result# " << result.ToString() + << " NextBlobSeqId# " << NextBlobSeqId + << " GivenIdRanges# " << GivenIdRanges.ToString() + << " SequenceNumbersInFlight# " << FormatList(SequenceNumbersInFlight)); + LastReportedLeastId.emplace(result); + return result; } }; std::vector<TChannelInfo> Channels; diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp index e6d99c7f41..2808646c15 100644 --- a/ydb/core/blob_depot/data.cpp +++ b/ydb/core/blob_depot/data.cpp @@ -357,17 +357,17 @@ namespace NKikimr::NBlobDepot { } std::sort(writesInFlight.begin(), writesInFlight.end()); - for (const auto& [channel, invalidatedStep] : items) { - const ui32 channel_ = channel; - auto& agentGivenIdRanges = agent.GivenIdRanges[channel]; - auto& givenIdRanges = Self->Channels[channel].GivenIdRanges; + for (const auto& [channelIndex, invalidatedStep] : items) { + auto& channel = Self->Channels[channelIndex]; + auto& agentGivenIdRanges = agent.GivenIdRanges[channelIndex]; + auto& givenIdRanges = channel.GivenIdRanges; - auto begin = std::lower_bound(writesInFlight.begin(), writesInFlight.end(), TBlobSeqId{channel, 0, 0, 0}); + auto begin = std::lower_bound(writesInFlight.begin(), writesInFlight.end(), TBlobSeqId{channelIndex, 0, 0, 0}); auto makeWritesInFlight = [&] { TStringStream s; s << "["; - for (auto it = begin; it != writesInFlight.end() && it->Channel == channel_; ++it) { + for (auto it = begin; it != writesInFlight.end() && it->Channel == channel.Index; ++it) { s << (it != begin ? " " : "") << it->ToString(); } s << "]"; @@ -375,33 +375,30 @@ namespace NKikimr::NBlobDepot { }; STLOG(PRI_DEBUG, BLOB_DEPOT, BDT13, "Trim", (Id, Self->GetLogId()), (AgentId, agent.Connection->NodeId), - (Id, ev->Cookie), (Channel, channel_), (InvalidatedStep, invalidatedStep), - (GivenIdRanges, Self->Channels[channel].GivenIdRanges), - (Agent.GivenIdRanges, agent.GivenIdRanges[channel]), + (Id, ev->Cookie), (Channel, channelIndex), (InvalidatedStep, invalidatedStep), + (GivenIdRanges, channel.GivenIdRanges), + (Agent.GivenIdRanges, agent.GivenIdRanges[channelIndex]), (WritesInFlight, makeWritesInFlight())); // sanity check -- ensure that current writes in flight would be conserved when processing garbage - for (auto it = begin; it != writesInFlight.end() && it->Channel == channel; ++it) { + for (auto it = begin; it != writesInFlight.end() && it->Channel == channelIndex; ++it) { Y_VERIFY_S(agentGivenIdRanges.GetPoint(it->ToSequentialNumber()), "blobSeqId# " << it->ToString()); Y_VERIFY_S(givenIdRanges.GetPoint(it->ToSequentialNumber()), "blobSeqId# " << it->ToString()); } - const TBlobSeqId leastExpectedBlobIdBefore = Self->Channels[channel].GetLeastExpectedBlobId(generation); + const TBlobSeqId leastExpectedBlobIdBefore = channel.GetLeastExpectedBlobId(generation); - const TBlobSeqId trimmedBlobSeqId{channel, generation, invalidatedStep, TBlobSeqId::MaxIndex}; + const TBlobSeqId trimmedBlobSeqId{channelIndex, generation, invalidatedStep, TBlobSeqId::MaxIndex}; const ui64 validSince = trimmedBlobSeqId.ToSequentialNumber() + 1; givenIdRanges.Subtract(agentGivenIdRanges.Trim(validSince)); - for (auto it = begin; it != writesInFlight.end() && it->Channel == channel; ++it) { + for (auto it = begin; it != writesInFlight.end() && it->Channel == channelIndex; ++it) { agentGivenIdRanges.AddPoint(it->ToSequentialNumber()); givenIdRanges.AddPoint(it->ToSequentialNumber()); } - const TBlobSeqId leastExpectedBlobIdAfter = Self->Channels[channel].GetLeastExpectedBlobId(generation); - Y_VERIFY(leastExpectedBlobIdBefore <= leastExpectedBlobIdAfter); - - if (leastExpectedBlobIdBefore != leastExpectedBlobIdAfter) { - OnLeastExpectedBlobIdChange(channel); + if (channel.GetLeastExpectedBlobId(generation) != leastExpectedBlobIdBefore) { + OnLeastExpectedBlobIdChange(channelIndex); } } } @@ -448,7 +445,8 @@ namespace NKikimr::NBlobDepot { } } - bool TData::CanBeCollected(ui32 groupId, TBlobSeqId id) const { + bool TData::CanBeCollected(TBlobSeqId id) const { + const ui32 groupId = Self->Info()->GroupFor(id.Channel, id.Generation); const auto it = RecordsPerChannelGroup.find(std::make_tuple(id.Channel, groupId)); return it != RecordsPerChannelGroup.end() && TGenStep(id) <= it->second.IssuedGenStep; } @@ -496,4 +494,45 @@ namespace NKikimr::NBlobDepot { } } + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + bool TData::BeginCommittingBlobSeqId(TAgent& agent, TBlobSeqId blobSeqId) { + const ui32 generation = Self->Executor()->Generation(); + if (blobSeqId.Generation != generation) { + return false; + } + + Y_VERIFY(blobSeqId.Channel < Self->Channels.size()); + auto& channel = Self->Channels[blobSeqId.Channel]; + const ui64 value = blobSeqId.ToSequentialNumber(); + Y_VERIFY_S(agent.GivenIdRanges[blobSeqId.Channel].GetPoint(value), "BlobSeqId# " << blobSeqId.ToString() << " Id# " << Self->GetLogId()); + Y_VERIFY_S(channel.GivenIdRanges.GetPoint(value), " BlobSeqId# " << blobSeqId.ToString() << " Id# " << Self->GetLogId()); + const bool inserted = channel.SequenceNumbersInFlight.insert(value).second; + Y_VERIFY(inserted); + + return true; + } + + void TData::EndCommittingBlobSeqId(TAgent& agent, TBlobSeqId blobSeqId) { + + Y_VERIFY(blobSeqId.Channel < Self->Channels.size()); + auto& channel = Self->Channels[blobSeqId.Channel]; + + const ui32 generation = Self->Executor()->Generation(); + const auto leastExpectedBlobIdBefore = channel.GetLeastExpectedBlobId(generation); + + const size_t numErased = channel.SequenceNumbersInFlight.erase(blobSeqId.ToSequentialNumber()); + Y_VERIFY(numErased == 1); + + const ui64 value = blobSeqId.ToSequentialNumber(); + if (channel.GivenIdRanges.GetPoint(value)) { // if not set, it must have been trimmed by the agent during transaction (or even reset) + agent.GivenIdRanges[blobSeqId.Channel].RemovePoint(value); + channel.GivenIdRanges.RemovePoint(value); + } + + if (channel.GetLeastExpectedBlobId(generation) != leastExpectedBlobIdBefore) { + OnLeastExpectedBlobIdChange(blobSeqId.Channel); + } + } + } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h index 95f8ffe315..21574f2228 100644 --- a/ydb/core/blob_depot/data.h +++ b/ydb/core/blob_depot/data.h @@ -463,7 +463,7 @@ namespace NKikimr::NBlobDepot { void AddFirstMentionedBlob(TLogoBlobID id); void AccountBlob(TLogoBlobID id, bool add); - bool CanBeCollected(ui32 groupId, TBlobSeqId id) const; + bool CanBeCollected(TBlobSeqId id) const; void OnLeastExpectedBlobIdChange(ui8 channel); @@ -498,6 +498,11 @@ namespace NKikimr::NBlobDepot { void RenderMainPage(IOutputStream& s); + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + bool BeginCommittingBlobSeqId(TAgent& agent, TBlobSeqId blobSeqId); + void EndCommittingBlobSeqId(TAgent& agent, TBlobSeqId blobSeqId); + private: void ExecuteIssueGC(ui8 channel, ui32 groupId, TGenStep issuedGenStep, std::unique_ptr<TEvBlobStorage::TEvCollectGarbage> collectGarbage, ui64 cookie); diff --git a/ydb/core/blob_depot/given_id_range.cpp b/ydb/core/blob_depot/given_id_range.cpp index fd11e50156..b92864df28 100644 --- a/ydb/core/blob_depot/given_id_range.cpp +++ b/ydb/core/blob_depot/given_id_range.cpp @@ -37,15 +37,12 @@ namespace NKikimr::NBlobDepot { ++NumAvailableItems; } - void TGivenIdRange::RemovePoint(ui64 value, bool *wasLeast) { + void TGivenIdRange::RemovePoint(ui64 value) { const ui64 key = value / BitsPerChunk; const auto it = Ranges.find(key); Y_VERIFY(it != Ranges.end()); TChunk& chunk = it->second; const size_t offset = value % BitsPerChunk; - if (wasLeast) { - *wasLeast = it == Ranges.begin() && chunk.FirstNonZeroBit() == offset; - } chunk.Reset(offset); --NumAvailableItems; if (chunk.Empty()) { diff --git a/ydb/core/blob_depot/op_commit_blob_seq.cpp b/ydb/core/blob_depot/op_commit_blob_seq.cpp index a7fb2c255d..11524247f6 100644 --- a/ydb/core/blob_depot/op_commit_blob_seq.cpp +++ b/ydb/core/blob_depot/op_commit_blob_seq.cpp @@ -8,19 +8,37 @@ namespace NKikimr::NBlobDepot { void TBlobDepot::Handle(TEvBlobDepot::TEvCommitBlobSeq::TPtr ev) { class TTxCommitBlobSeq : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { const ui32 NodeId; + const ui64 AgentInstanceId; std::unique_ptr<TEvBlobDepot::TEvCommitBlobSeq::THandle> Request; std::unique_ptr<IEventHandle> Response; + std::vector<TBlobSeqId> BlobSeqIds; public: - TTxCommitBlobSeq(TBlobDepot *self, ui32 nodeId, std::unique_ptr<TEvBlobDepot::TEvCommitBlobSeq::THandle> request) + TTxCommitBlobSeq(TBlobDepot *self, TAgent& agent, std::unique_ptr<TEvBlobDepot::TEvCommitBlobSeq::THandle> request) : TTransactionBase(self) - , NodeId(nodeId) + , NodeId(agent.Connection->NodeId) + , AgentInstanceId(*agent.AgentInstanceId) , Request(std::move(request)) - {} + { + const ui32 generation = Self->Executor()->Generation(); + for (const auto& item : Request->Get()->Record.GetItems()) { + if (TData::TValue::Validate(item) && !item.GetCommitNotify()) { + const auto blobSeqId = TBlobSeqId::FromProto(item.GetBlobLocator().GetBlobSeqId()); + if (Self->Data->CanBeCollected(blobSeqId)) { + // check for internal sanity -- we can't issue barriers on given ids without confirmed trimming + Y_VERIFY_S(blobSeqId.Generation < generation, "committing trimmed BlobSeqId" + << " BlobSeqId# " << blobSeqId.ToString() + << " Id# " << Self->GetLogId()); + } else if (Self->Data->BeginCommittingBlobSeqId(agent, blobSeqId)) { + BlobSeqIds.push_back(blobSeqId); + } + } + } + } bool Execute(TTransactionContext& txc, const TActorContext&) override { TAgent& agent = Self->GetAgent(NodeId); - if (!agent.Connection) { // agent disconnected while transaction was in queue -- drop this request + if (!agent.Connection || agent.AgentInstanceId != AgentInstanceId) { // agent disconnected while transaction was in queue -- drop this request return true; } @@ -36,8 +54,6 @@ namespace NKikimr::NBlobDepot { const ui32 generation = Self->Executor()->Generation(); for (const auto& item : Request->Get()->Record.GetItems()) { - auto key = TData::TKey::FromBinaryKey(item.GetKey(), Self->Config); - auto *responseItem = responseRecord->AddItems(); if (!TData::TValue::Validate(item)) { responseItem->SetStatus(NKikimrProto::ERROR); @@ -49,15 +65,14 @@ namespace NKikimr::NBlobDepot { responseItem->SetStatus(NKikimrProto::OK); const auto blobSeqId = TBlobSeqId::FromProto(blobLocator.GetBlobSeqId()); - const bool canBeCollected = Self->Data->CanBeCollected(blobLocator.GetGroupId(), blobSeqId); + const bool canBeCollected = Self->Data->CanBeCollected(blobSeqId); + + auto key = TData::TKey::FromBinaryKey(item.GetKey(), Self->Config); STLOG(PRI_DEBUG, BLOB_DEPOT, BDT68, "TTxCommitBlobSeq process key", (Id, Self->GetLogId()), (Key, key), (Item, item), (CanBeCollected, canBeCollected), (Generation, generation)); - if (blobSeqId.Generation == generation) { - // check for internal sanity -- we can't issue barriers on given ids without confirmed trimming - Y_VERIFY_S(!canBeCollected || item.GetCommitNotify(), "BlobSeqId# " << blobSeqId.ToString()); - } else if (canBeCollected) { + if (canBeCollected) { // we can't accept this record, because it is potentially under already issued barrier responseItem->SetStatus(NKikimrProto::ERROR); responseItem->SetErrorReason("generation race"); @@ -84,12 +99,6 @@ namespace NKikimr::NBlobDepot { } } else { Self->Data->UpdateKey(key, item, txc, this); - if (blobSeqId.Generation == generation) { - // mark given blob as committed only when it was issued in current generation -- only for this - // generation we have correct GivenIdRanges; and we can do this only after updating key as the - // callee function may trigger garbage collection - MarkGivenIdCommitted(agent, blobSeqId); - } } } @@ -119,24 +128,6 @@ namespace NKikimr::NBlobDepot { return success; } - void MarkGivenIdCommitted(TAgent& agent, const TBlobSeqId& blobSeqId) { - Y_VERIFY(blobSeqId.Channel < Self->Channels.size()); - - const ui64 value = blobSeqId.ToSequentialNumber(); - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT18, "MarkGivenIdCommitted", (Id, Self->GetLogId()), - (AgentId, agent.Connection->NodeId), (BlobSeqId, blobSeqId), (Value, value), - (GivenIdRanges, Self->Channels[blobSeqId.Channel].GivenIdRanges), - (Agent.GivenIdRanges, agent.GivenIdRanges[blobSeqId.Channel])); - - agent.GivenIdRanges[blobSeqId.Channel].RemovePoint(value, nullptr); - - bool wasLeast; - Self->Channels[blobSeqId.Channel].GivenIdRanges.RemovePoint(value, &wasLeast); - if (wasLeast) { - Self->Data->OnLeastExpectedBlobIdChange(blobSeqId.Channel); - } - } - bool CheckKeyAgainstBarrier(const TData::TKey& key, TString *error) { const auto& v = key.AsVariant(); if (const auto *id = std::get_if<TLogoBlobID>(&v)) { @@ -159,13 +150,16 @@ namespace NKikimr::NBlobDepot { } void Complete(const TActorContext&) override { + TAgent& agent = Self->GetAgent(NodeId); + for (const TBlobSeqId blobSeqId : BlobSeqIds) { + Self->Data->EndCommittingBlobSeqId(agent, blobSeqId); + } Self->Data->CommitTrash(this); TActivationContext::Send(Response.release()); } }; - TAgent& agent = GetAgent(ev->Recipient); - Execute(std::make_unique<TTxCommitBlobSeq>(this, agent.Connection->NodeId, + Execute(std::make_unique<TTxCommitBlobSeq>(this, GetAgent(ev->Recipient), std::unique_ptr<TEvBlobDepot::TEvCommitBlobSeq::THandle>(ev.Release()))); } @@ -182,11 +176,15 @@ namespace NKikimr::NBlobDepot { const auto blobSeqId = TBlobSeqId::FromProto(item); if (blobSeqId.Generation == generation) { Y_VERIFY(blobSeqId.Channel < Channels.size()); + auto& channel = Channels[blobSeqId.Channel]; + + const TBlobSeqId leastExpectedBlobIdBefore = channel.GetLeastExpectedBlobId(generation); + const ui64 value = blobSeqId.ToSequentialNumber(); - agent.GivenIdRanges[blobSeqId.Channel].RemovePoint(value, nullptr); - bool wasLeast; - Channels[blobSeqId.Channel].GivenIdRanges.RemovePoint(value, &wasLeast); - if (wasLeast) { + agent.GivenIdRanges[blobSeqId.Channel].RemovePoint(value); + Channels[blobSeqId.Channel].GivenIdRanges.RemovePoint(value); + + if (channel.GetLeastExpectedBlobId(generation) != leastExpectedBlobIdBefore) { Data->OnLeastExpectedBlobIdChange(blobSeqId.Channel); } } diff --git a/ydb/core/blob_depot/types.h b/ydb/core/blob_depot/types.h index 1b926f5f87..2add0298c2 100644 --- a/ydb/core/blob_depot/types.h +++ b/ydb/core/blob_depot/types.h @@ -106,7 +106,7 @@ namespace NKikimr::NBlobDepot { public: void IssueNewRange(ui64 begin, ui64 end); void AddPoint(ui64 value); - void RemovePoint(ui64 value, bool *wasLeast); + void RemovePoint(ui64 value); bool GetPoint(ui64 value) const; bool IsEmpty() const; @@ -122,6 +122,15 @@ namespace NKikimr::NBlobDepot { std::vector<bool> ToDebugArray(size_t numItems) const; void CheckConsistency() const; + + template<typename T> + void ForEach(T&& callback) const { + for (const auto& [index, chunk] : Ranges) { + Y_FOR_EACH_BIT(offset, chunk) { + callback(index * BitsPerChunk + offset); + } + } + } }; using TValueChain = NProtoBuf::RepeatedPtrField<NKikimrBlobDepot::TValueChain>; |