aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2022-12-02 10:59:00 +0300
committeralexvru <alexvru@ydb.tech>2022-12-02 10:59:00 +0300
commit567ea46fac457b40c5b618839857ef1772af38c6 (patch)
tree080d66a52485ce881a34de7dc813175611f3cade
parent4dafd91976dfadc522dc2edee34eeb89e185cac7 (diff)
downloadydb-567ea46fac457b40c5b618839857ef1772af38c6.tar.gz
Harden result checking and fix collection and commit race problem
-rw-r--r--ydb/core/blob_depot/agent.cpp45
-rw-r--r--ydb/core/blob_depot/agent/storage_discover.cpp1
-rw-r--r--ydb/core/blob_depot/agent/storage_get.cpp32
-rw-r--r--ydb/core/blob_depot/agent/storage_range.cpp14
-rw-r--r--ydb/core/blob_depot/blob_depot_tablet.h23
-rw-r--r--ydb/core/blob_depot/data.cpp77
-rw-r--r--ydb/core/blob_depot/data.h7
-rw-r--r--ydb/core/blob_depot/given_id_range.cpp5
-rw-r--r--ydb/core/blob_depot/op_commit_blob_seq.cpp80
-rw-r--r--ydb/core/blob_depot/types.h11
10 files changed, 184 insertions, 111 deletions
diff --git a/ydb/core/blob_depot/agent.cpp b/ydb/core/blob_depot/agent.cpp
index 1e9e54febc..b3836650aa 100644
--- a/ydb/core/blob_depot/agent.cpp
+++ b/ydb/core/blob_depot/agent.cpp
@@ -226,11 +226,14 @@ namespace NKikimr::NBlobDepot {
}
// calculate if this agent can be blocking garbage collection by holding least conserved blob sequence id
- const bool unblock = Channels[channel].GivenIdRanges.GetMinimumValue() == agentGivenIdRange.GetMinimumValue();
+ auto& givenIdRanges = Channels[channel].GivenIdRanges;
+ const bool unblock = givenIdRanges.GetMinimumValue() == agentGivenIdRange.GetMinimumValue();
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT06, "ResetAgent", (Id, GetLogId()), (AgentId, agent.Connection->NodeId),
- (Channel, int(channel)), (GivenIdRanges, Channels[channel].GivenIdRanges),
- (Agent.GivenIdRanges, agentGivenIdRange), (Unblock, unblock));
+ (Channel, int(channel)), (GivenIdRanges, givenIdRanges), (Agent.GivenIdRanges, agentGivenIdRange),
+ (Unblock, unblock));
+
+ givenIdRanges.Subtract(std::exchange(agentGivenIdRange, {}));
if (unblock) {
Data->OnLeastExpectedBlobIdChange(channel);
@@ -261,6 +264,8 @@ namespace NKikimr::NBlobDepot {
&p,
{},
TBlobSeqId{channel, generation, 1, 0}.ToSequentialNumber(),
+ {},
+ {},
});
} else {
Channels.push_back({
@@ -268,7 +273,9 @@ namespace NKikimr::NBlobDepot {
NKikimrBlobDepot::TChannelKind::System,
nullptr,
{},
- 0
+ 0,
+ {},
+ {},
});
}
}
@@ -276,32 +283,12 @@ namespace NKikimr::NBlobDepot {
}
void TBlobDepot::Handle(TEvBlobDepot::TEvPushNotifyResult::TPtr ev) {
- class TTxInvokeCallback : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
- const ui32 NodeId;
- TEvBlobDepot::TEvPushNotifyResult::TPtr Ev;
-
- public:
- TTxInvokeCallback(TBlobDepot *self, ui32 nodeId, TEvBlobDepot::TEvPushNotifyResult::TPtr ev)
- : TTransactionBase(self)
- , NodeId(nodeId)
- , Ev(ev)
- {}
-
- bool Execute(TTransactionContext& /*txc*/, const TActorContext&) override {
- TAgent& agent = Self->GetAgent(NodeId);
- if (const auto it = agent.PushCallbacks.find(Ev->Get()->Record.GetId()); it != agent.PushCallbacks.end()) {
- auto callback = std::move(it->second);
- agent.PushCallbacks.erase(it);
- callback(Ev);
- }
- return true;
- }
-
- void Complete(const TActorContext&) override {}
- };
-
TAgent& agent = GetAgent(ev->Recipient);
- Execute(std::make_unique<TTxInvokeCallback>(this, agent.Connection->NodeId, ev));
+ if (const auto it = agent.PushCallbacks.find(ev->Get()->Record.GetId()); it != agent.PushCallbacks.end()) {
+ auto callback = std::move(it->second);
+ agent.PushCallbacks.erase(it);
+ callback(ev);
+ }
}
void TBlobDepot::ProcessRegisterAgentQ() {
diff --git a/ydb/core/blob_depot/agent/storage_discover.cpp b/ydb/core/blob_depot/agent/storage_discover.cpp
index a65c558f78..7b52f1be96 100644
--- a/ydb/core/blob_depot/agent/storage_discover.cpp
+++ b/ydb/core/blob_depot/agent/storage_discover.cpp
@@ -162,6 +162,7 @@ namespace NKikimr::NBlobDepot {
void CheckIfDone() {
if (DoneWithBlockedGeneration && DoneWithData) {
+ Y_VERIFY_S(!Id || !ReadBody || Buffer.size() == Id.BlobSize(), "Id# " << Id << " Buffer.size# " << Buffer.size());
EndWithSuccess(Id
? std::make_unique<TEvBlobStorage::TEvDiscoverResult>(Id, MinGeneration, Buffer, BlockedGeneration)
: std::make_unique<TEvBlobStorage::TEvDiscoverResult>(NKikimrProto::NODATA, MinGeneration, BlockedGeneration));
diff --git a/ydb/core/blob_depot/agent/storage_get.cpp b/ydb/core/blob_depot/agent/storage_get.cpp
index 5a66a5853f..7b011d2d52 100644
--- a/ydb/core/blob_depot/agent/storage_get.cpp
+++ b/ydb/core/blob_depot/agent/storage_get.cpp
@@ -62,6 +62,8 @@ namespace NKikimr::NBlobDepot {
(QueryId, GetQueryId()), (QueryIdx, i), (BlobId, query.Id));
}
}
+
+ CheckAndFinish();
}
bool ProcessSingleResult(ui32 queryIdx, const TResolvedValueChain *value, const std::optional<TString>& errorReason) {
@@ -69,6 +71,7 @@ namespace NKikimr::NBlobDepot {
(QueryId, GetQueryId()), (QueryIdx, queryIdx), (Value, value), (ErrorReason, errorReason));
auto& r = Response->Responses[queryIdx];
+ Y_VERIFY(r.Status == NKikimrProto::UNKNOWN);
if (errorReason) {
r.Status = NKikimrProto::ERROR;
--AnswersRemain;
@@ -95,20 +98,33 @@ namespace NKikimr::NBlobDepot {
return false;
}
}
- if (!AnswersRemain) {
- EndWithSuccess(std::move(Response));
- return false;
- }
return true;
}
void OnRead(ui64 tag, NKikimrProto::EReplyStatus status, TString buffer) override {
auto& resp = Response->Responses[tag];
+ Y_VERIFY(resp.Status == NKikimrProto::UNKNOWN);
resp.Status = status;
if (status == NKikimrProto::OK) {
resp.Buffer = std::move(buffer);
}
- if (!--AnswersRemain) {
+ --AnswersRemain;
+ CheckAndFinish();
+ }
+
+ void CheckAndFinish() {
+ if (!AnswersRemain) {
+ if (!Request.IsIndexOnly) {
+ for (size_t i = 0, count = Response->ResponseSz; i < count; ++i) {
+ const auto& item = Response->Responses[i];
+ if (item.Status == NKikimrProto::OK) {
+ Y_VERIFY_S(item.Buffer.size() == item.RequestedSize ? Min(item.RequestedSize,
+ item.Id.BlobSize() - Min(item.Id.BlobSize(), item.Shift)) : item.Id.BlobSize(),
+ "Id# " << item.Id << " Shift# " << item.Shift << " RequestedSize# " << item.RequestedSize
+ << " Buffer.size# " << item.Buffer.size());
+ }
+ }
+ }
EndWithSuccess(std::move(Response));
}
}
@@ -116,14 +132,14 @@ namespace NKikimr::NBlobDepot {
void ProcessResponse(ui64 /*id*/, TRequestContext::TPtr context, TResponse response) override {
if (auto *p = std::get_if<TKeyResolved>(&response)) {
ProcessSingleResult(context->Obtain<TResolveKeyContext>().QueryIdx, p->ValueChain, p->ErrorReason);
+ CheckAndFinish();
} else if (auto *p = std::get_if<TEvBlobStorage::TEvGetResult*>(&response)) {
Agent.HandleGetResult(context, **p);
} else if (std::holds_alternative<TTabletDisconnected>(response)) {
if (auto *resolveContext = dynamic_cast<TResolveKeyContext*>(context.get())) {
Response->Responses[resolveContext->QueryIdx].Status = NKikimrProto::ERROR;
- if (!--AnswersRemain) {
- EndWithSuccess(std::move(Response));
- }
+ --AnswersRemain;
+ CheckAndFinish();
}
} else {
Y_FAIL();
diff --git a/ydb/core/blob_depot/agent/storage_range.cpp b/ydb/core/blob_depot/agent/storage_range.cpp
index 5c889ff675..db6c462214 100644
--- a/ydb/core/blob_depot/agent/storage_range.cpp
+++ b/ydb/core/blob_depot/agent/storage_range.cpp
@@ -128,9 +128,7 @@ namespace NKikimr::NBlobDepot {
if (msg.GetStatus() == NKikimrProto::OVERRUN) {
Agent.RegisterRequest(id, this, std::move(context), {}, true);
} else if (msg.GetStatus() == NKikimrProto::OK) {
- if (!ReadsInFlight && !ResolvesInFlight) {
- EndWithSuccess(std::move(Response));
- }
+ CheckAndFinish();
} else {
Y_UNREACHABLE();
}
@@ -154,7 +152,17 @@ namespace NKikimr::NBlobDepot {
<< item.Id << " Error# " << dataOrErrorReason);
}
+ CheckAndFinish();
+ }
+
+ void CheckAndFinish() {
if (!ReadsInFlight && !ResolvesInFlight) {
+ if (!Request.IsIndexOnly) {
+ for (const auto& response : Response->Responses) {
+ Y_VERIFY_S(response.Buffer.size() == response.Id.BlobSize(), "Id# " << response.Id
+ << " Buffer.size# " << response.Buffer.size());
+ }
+ }
EndWithSuccess(std::move(Response));
}
}
diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h
index 40295e2bc2..a2c21afbfc 100644
--- a/ydb/core/blob_depot/blob_depot_tablet.h
+++ b/ydb/core/blob_depot/blob_depot_tablet.h
@@ -92,11 +92,24 @@ namespace NKikimr::NBlobDepot {
TChannelKind *KindPtr;
TGivenIdRange GivenIdRanges; // accumulated through all agents
ui64 NextBlobSeqId = 0;
-
- // Obtain the least BlobSeqId that is not yet confirmed, but may be written by any agent
- TBlobSeqId GetLeastExpectedBlobId(ui32 generation) const {
- return TBlobSeqId::FromSequentalNumber(Index, generation, GivenIdRanges.IsEmpty() ? NextBlobSeqId :
- GivenIdRanges.GetMinimumValue());
+ std::set<ui64> SequenceNumbersInFlight; // of blobs being committed
+ std::optional<TBlobSeqId> LastReportedLeastId;
+
+ // Obtain the least BlobSeqId that is not yet committed, but may be written by any agent
+ TBlobSeqId GetLeastExpectedBlobId(ui32 generation) {
+ const auto result = TBlobSeqId::FromSequentalNumber(Index, generation, Min(NextBlobSeqId,
+ GivenIdRanges.IsEmpty() ? Max<ui64>() : GivenIdRanges.GetMinimumValue(),
+ SequenceNumbersInFlight.empty() ? Max<ui64>() : *SequenceNumbersInFlight.begin()));
+ // this value can't decrease, because it may lead to data loss
+ Y_VERIFY_S(!LastReportedLeastId || *LastReportedLeastId <= result,
+ "decreasing LeastExpectedBlobId"
+ << " LastReportedLeastId# " << LastReportedLeastId->ToString()
+ << " result# " << result.ToString()
+ << " NextBlobSeqId# " << NextBlobSeqId
+ << " GivenIdRanges# " << GivenIdRanges.ToString()
+ << " SequenceNumbersInFlight# " << FormatList(SequenceNumbersInFlight));
+ LastReportedLeastId.emplace(result);
+ return result;
}
};
std::vector<TChannelInfo> Channels;
diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp
index e6d99c7f41..2808646c15 100644
--- a/ydb/core/blob_depot/data.cpp
+++ b/ydb/core/blob_depot/data.cpp
@@ -357,17 +357,17 @@ namespace NKikimr::NBlobDepot {
}
std::sort(writesInFlight.begin(), writesInFlight.end());
- for (const auto& [channel, invalidatedStep] : items) {
- const ui32 channel_ = channel;
- auto& agentGivenIdRanges = agent.GivenIdRanges[channel];
- auto& givenIdRanges = Self->Channels[channel].GivenIdRanges;
+ for (const auto& [channelIndex, invalidatedStep] : items) {
+ auto& channel = Self->Channels[channelIndex];
+ auto& agentGivenIdRanges = agent.GivenIdRanges[channelIndex];
+ auto& givenIdRanges = channel.GivenIdRanges;
- auto begin = std::lower_bound(writesInFlight.begin(), writesInFlight.end(), TBlobSeqId{channel, 0, 0, 0});
+ auto begin = std::lower_bound(writesInFlight.begin(), writesInFlight.end(), TBlobSeqId{channelIndex, 0, 0, 0});
auto makeWritesInFlight = [&] {
TStringStream s;
s << "[";
- for (auto it = begin; it != writesInFlight.end() && it->Channel == channel_; ++it) {
+ for (auto it = begin; it != writesInFlight.end() && it->Channel == channel.Index; ++it) {
s << (it != begin ? " " : "") << it->ToString();
}
s << "]";
@@ -375,33 +375,30 @@ namespace NKikimr::NBlobDepot {
};
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT13, "Trim", (Id, Self->GetLogId()), (AgentId, agent.Connection->NodeId),
- (Id, ev->Cookie), (Channel, channel_), (InvalidatedStep, invalidatedStep),
- (GivenIdRanges, Self->Channels[channel].GivenIdRanges),
- (Agent.GivenIdRanges, agent.GivenIdRanges[channel]),
+ (Id, ev->Cookie), (Channel, channelIndex), (InvalidatedStep, invalidatedStep),
+ (GivenIdRanges, channel.GivenIdRanges),
+ (Agent.GivenIdRanges, agent.GivenIdRanges[channelIndex]),
(WritesInFlight, makeWritesInFlight()));
// sanity check -- ensure that current writes in flight would be conserved when processing garbage
- for (auto it = begin; it != writesInFlight.end() && it->Channel == channel; ++it) {
+ for (auto it = begin; it != writesInFlight.end() && it->Channel == channelIndex; ++it) {
Y_VERIFY_S(agentGivenIdRanges.GetPoint(it->ToSequentialNumber()), "blobSeqId# " << it->ToString());
Y_VERIFY_S(givenIdRanges.GetPoint(it->ToSequentialNumber()), "blobSeqId# " << it->ToString());
}
- const TBlobSeqId leastExpectedBlobIdBefore = Self->Channels[channel].GetLeastExpectedBlobId(generation);
+ const TBlobSeqId leastExpectedBlobIdBefore = channel.GetLeastExpectedBlobId(generation);
- const TBlobSeqId trimmedBlobSeqId{channel, generation, invalidatedStep, TBlobSeqId::MaxIndex};
+ const TBlobSeqId trimmedBlobSeqId{channelIndex, generation, invalidatedStep, TBlobSeqId::MaxIndex};
const ui64 validSince = trimmedBlobSeqId.ToSequentialNumber() + 1;
givenIdRanges.Subtract(agentGivenIdRanges.Trim(validSince));
- for (auto it = begin; it != writesInFlight.end() && it->Channel == channel; ++it) {
+ for (auto it = begin; it != writesInFlight.end() && it->Channel == channelIndex; ++it) {
agentGivenIdRanges.AddPoint(it->ToSequentialNumber());
givenIdRanges.AddPoint(it->ToSequentialNumber());
}
- const TBlobSeqId leastExpectedBlobIdAfter = Self->Channels[channel].GetLeastExpectedBlobId(generation);
- Y_VERIFY(leastExpectedBlobIdBefore <= leastExpectedBlobIdAfter);
-
- if (leastExpectedBlobIdBefore != leastExpectedBlobIdAfter) {
- OnLeastExpectedBlobIdChange(channel);
+ if (channel.GetLeastExpectedBlobId(generation) != leastExpectedBlobIdBefore) {
+ OnLeastExpectedBlobIdChange(channelIndex);
}
}
}
@@ -448,7 +445,8 @@ namespace NKikimr::NBlobDepot {
}
}
- bool TData::CanBeCollected(ui32 groupId, TBlobSeqId id) const {
+ bool TData::CanBeCollected(TBlobSeqId id) const {
+ const ui32 groupId = Self->Info()->GroupFor(id.Channel, id.Generation);
const auto it = RecordsPerChannelGroup.find(std::make_tuple(id.Channel, groupId));
return it != RecordsPerChannelGroup.end() && TGenStep(id) <= it->second.IssuedGenStep;
}
@@ -496,4 +494,45 @@ namespace NKikimr::NBlobDepot {
}
}
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ bool TData::BeginCommittingBlobSeqId(TAgent& agent, TBlobSeqId blobSeqId) {
+ const ui32 generation = Self->Executor()->Generation();
+ if (blobSeqId.Generation != generation) {
+ return false;
+ }
+
+ Y_VERIFY(blobSeqId.Channel < Self->Channels.size());
+ auto& channel = Self->Channels[blobSeqId.Channel];
+ const ui64 value = blobSeqId.ToSequentialNumber();
+ Y_VERIFY_S(agent.GivenIdRanges[blobSeqId.Channel].GetPoint(value), "BlobSeqId# " << blobSeqId.ToString() << " Id# " << Self->GetLogId());
+ Y_VERIFY_S(channel.GivenIdRanges.GetPoint(value), " BlobSeqId# " << blobSeqId.ToString() << " Id# " << Self->GetLogId());
+ const bool inserted = channel.SequenceNumbersInFlight.insert(value).second;
+ Y_VERIFY(inserted);
+
+ return true;
+ }
+
+ void TData::EndCommittingBlobSeqId(TAgent& agent, TBlobSeqId blobSeqId) {
+
+ Y_VERIFY(blobSeqId.Channel < Self->Channels.size());
+ auto& channel = Self->Channels[blobSeqId.Channel];
+
+ const ui32 generation = Self->Executor()->Generation();
+ const auto leastExpectedBlobIdBefore = channel.GetLeastExpectedBlobId(generation);
+
+ const size_t numErased = channel.SequenceNumbersInFlight.erase(blobSeqId.ToSequentialNumber());
+ Y_VERIFY(numErased == 1);
+
+ const ui64 value = blobSeqId.ToSequentialNumber();
+ if (channel.GivenIdRanges.GetPoint(value)) { // if not set, it must have been trimmed by the agent during transaction (or even reset)
+ agent.GivenIdRanges[blobSeqId.Channel].RemovePoint(value);
+ channel.GivenIdRanges.RemovePoint(value);
+ }
+
+ if (channel.GetLeastExpectedBlobId(generation) != leastExpectedBlobIdBefore) {
+ OnLeastExpectedBlobIdChange(blobSeqId.Channel);
+ }
+ }
+
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h
index 95f8ffe315..21574f2228 100644
--- a/ydb/core/blob_depot/data.h
+++ b/ydb/core/blob_depot/data.h
@@ -463,7 +463,7 @@ namespace NKikimr::NBlobDepot {
void AddFirstMentionedBlob(TLogoBlobID id);
void AccountBlob(TLogoBlobID id, bool add);
- bool CanBeCollected(ui32 groupId, TBlobSeqId id) const;
+ bool CanBeCollected(TBlobSeqId id) const;
void OnLeastExpectedBlobIdChange(ui8 channel);
@@ -498,6 +498,11 @@ namespace NKikimr::NBlobDepot {
void RenderMainPage(IOutputStream& s);
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ bool BeginCommittingBlobSeqId(TAgent& agent, TBlobSeqId blobSeqId);
+ void EndCommittingBlobSeqId(TAgent& agent, TBlobSeqId blobSeqId);
+
private:
void ExecuteIssueGC(ui8 channel, ui32 groupId, TGenStep issuedGenStep,
std::unique_ptr<TEvBlobStorage::TEvCollectGarbage> collectGarbage, ui64 cookie);
diff --git a/ydb/core/blob_depot/given_id_range.cpp b/ydb/core/blob_depot/given_id_range.cpp
index fd11e50156..b92864df28 100644
--- a/ydb/core/blob_depot/given_id_range.cpp
+++ b/ydb/core/blob_depot/given_id_range.cpp
@@ -37,15 +37,12 @@ namespace NKikimr::NBlobDepot {
++NumAvailableItems;
}
- void TGivenIdRange::RemovePoint(ui64 value, bool *wasLeast) {
+ void TGivenIdRange::RemovePoint(ui64 value) {
const ui64 key = value / BitsPerChunk;
const auto it = Ranges.find(key);
Y_VERIFY(it != Ranges.end());
TChunk& chunk = it->second;
const size_t offset = value % BitsPerChunk;
- if (wasLeast) {
- *wasLeast = it == Ranges.begin() && chunk.FirstNonZeroBit() == offset;
- }
chunk.Reset(offset);
--NumAvailableItems;
if (chunk.Empty()) {
diff --git a/ydb/core/blob_depot/op_commit_blob_seq.cpp b/ydb/core/blob_depot/op_commit_blob_seq.cpp
index a7fb2c255d..11524247f6 100644
--- a/ydb/core/blob_depot/op_commit_blob_seq.cpp
+++ b/ydb/core/blob_depot/op_commit_blob_seq.cpp
@@ -8,19 +8,37 @@ namespace NKikimr::NBlobDepot {
void TBlobDepot::Handle(TEvBlobDepot::TEvCommitBlobSeq::TPtr ev) {
class TTxCommitBlobSeq : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
const ui32 NodeId;
+ const ui64 AgentInstanceId;
std::unique_ptr<TEvBlobDepot::TEvCommitBlobSeq::THandle> Request;
std::unique_ptr<IEventHandle> Response;
+ std::vector<TBlobSeqId> BlobSeqIds;
public:
- TTxCommitBlobSeq(TBlobDepot *self, ui32 nodeId, std::unique_ptr<TEvBlobDepot::TEvCommitBlobSeq::THandle> request)
+ TTxCommitBlobSeq(TBlobDepot *self, TAgent& agent, std::unique_ptr<TEvBlobDepot::TEvCommitBlobSeq::THandle> request)
: TTransactionBase(self)
- , NodeId(nodeId)
+ , NodeId(agent.Connection->NodeId)
+ , AgentInstanceId(*agent.AgentInstanceId)
, Request(std::move(request))
- {}
+ {
+ const ui32 generation = Self->Executor()->Generation();
+ for (const auto& item : Request->Get()->Record.GetItems()) {
+ if (TData::TValue::Validate(item) && !item.GetCommitNotify()) {
+ const auto blobSeqId = TBlobSeqId::FromProto(item.GetBlobLocator().GetBlobSeqId());
+ if (Self->Data->CanBeCollected(blobSeqId)) {
+ // check for internal sanity -- we can't issue barriers on given ids without confirmed trimming
+ Y_VERIFY_S(blobSeqId.Generation < generation, "committing trimmed BlobSeqId"
+ << " BlobSeqId# " << blobSeqId.ToString()
+ << " Id# " << Self->GetLogId());
+ } else if (Self->Data->BeginCommittingBlobSeqId(agent, blobSeqId)) {
+ BlobSeqIds.push_back(blobSeqId);
+ }
+ }
+ }
+ }
bool Execute(TTransactionContext& txc, const TActorContext&) override {
TAgent& agent = Self->GetAgent(NodeId);
- if (!agent.Connection) { // agent disconnected while transaction was in queue -- drop this request
+ if (!agent.Connection || agent.AgentInstanceId != AgentInstanceId) { // agent disconnected while transaction was in queue -- drop this request
return true;
}
@@ -36,8 +54,6 @@ namespace NKikimr::NBlobDepot {
const ui32 generation = Self->Executor()->Generation();
for (const auto& item : Request->Get()->Record.GetItems()) {
- auto key = TData::TKey::FromBinaryKey(item.GetKey(), Self->Config);
-
auto *responseItem = responseRecord->AddItems();
if (!TData::TValue::Validate(item)) {
responseItem->SetStatus(NKikimrProto::ERROR);
@@ -49,15 +65,14 @@ namespace NKikimr::NBlobDepot {
responseItem->SetStatus(NKikimrProto::OK);
const auto blobSeqId = TBlobSeqId::FromProto(blobLocator.GetBlobSeqId());
- const bool canBeCollected = Self->Data->CanBeCollected(blobLocator.GetGroupId(), blobSeqId);
+ const bool canBeCollected = Self->Data->CanBeCollected(blobSeqId);
+
+ auto key = TData::TKey::FromBinaryKey(item.GetKey(), Self->Config);
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT68, "TTxCommitBlobSeq process key", (Id, Self->GetLogId()),
(Key, key), (Item, item), (CanBeCollected, canBeCollected), (Generation, generation));
- if (blobSeqId.Generation == generation) {
- // check for internal sanity -- we can't issue barriers on given ids without confirmed trimming
- Y_VERIFY_S(!canBeCollected || item.GetCommitNotify(), "BlobSeqId# " << blobSeqId.ToString());
- } else if (canBeCollected) {
+ if (canBeCollected) {
// we can't accept this record, because it is potentially under already issued barrier
responseItem->SetStatus(NKikimrProto::ERROR);
responseItem->SetErrorReason("generation race");
@@ -84,12 +99,6 @@ namespace NKikimr::NBlobDepot {
}
} else {
Self->Data->UpdateKey(key, item, txc, this);
- if (blobSeqId.Generation == generation) {
- // mark given blob as committed only when it was issued in current generation -- only for this
- // generation we have correct GivenIdRanges; and we can do this only after updating key as the
- // callee function may trigger garbage collection
- MarkGivenIdCommitted(agent, blobSeqId);
- }
}
}
@@ -119,24 +128,6 @@ namespace NKikimr::NBlobDepot {
return success;
}
- void MarkGivenIdCommitted(TAgent& agent, const TBlobSeqId& blobSeqId) {
- Y_VERIFY(blobSeqId.Channel < Self->Channels.size());
-
- const ui64 value = blobSeqId.ToSequentialNumber();
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT18, "MarkGivenIdCommitted", (Id, Self->GetLogId()),
- (AgentId, agent.Connection->NodeId), (BlobSeqId, blobSeqId), (Value, value),
- (GivenIdRanges, Self->Channels[blobSeqId.Channel].GivenIdRanges),
- (Agent.GivenIdRanges, agent.GivenIdRanges[blobSeqId.Channel]));
-
- agent.GivenIdRanges[blobSeqId.Channel].RemovePoint(value, nullptr);
-
- bool wasLeast;
- Self->Channels[blobSeqId.Channel].GivenIdRanges.RemovePoint(value, &wasLeast);
- if (wasLeast) {
- Self->Data->OnLeastExpectedBlobIdChange(blobSeqId.Channel);
- }
- }
-
bool CheckKeyAgainstBarrier(const TData::TKey& key, TString *error) {
const auto& v = key.AsVariant();
if (const auto *id = std::get_if<TLogoBlobID>(&v)) {
@@ -159,13 +150,16 @@ namespace NKikimr::NBlobDepot {
}
void Complete(const TActorContext&) override {
+ TAgent& agent = Self->GetAgent(NodeId);
+ for (const TBlobSeqId blobSeqId : BlobSeqIds) {
+ Self->Data->EndCommittingBlobSeqId(agent, blobSeqId);
+ }
Self->Data->CommitTrash(this);
TActivationContext::Send(Response.release());
}
};
- TAgent& agent = GetAgent(ev->Recipient);
- Execute(std::make_unique<TTxCommitBlobSeq>(this, agent.Connection->NodeId,
+ Execute(std::make_unique<TTxCommitBlobSeq>(this, GetAgent(ev->Recipient),
std::unique_ptr<TEvBlobDepot::TEvCommitBlobSeq::THandle>(ev.Release())));
}
@@ -182,11 +176,15 @@ namespace NKikimr::NBlobDepot {
const auto blobSeqId = TBlobSeqId::FromProto(item);
if (blobSeqId.Generation == generation) {
Y_VERIFY(blobSeqId.Channel < Channels.size());
+ auto& channel = Channels[blobSeqId.Channel];
+
+ const TBlobSeqId leastExpectedBlobIdBefore = channel.GetLeastExpectedBlobId(generation);
+
const ui64 value = blobSeqId.ToSequentialNumber();
- agent.GivenIdRanges[blobSeqId.Channel].RemovePoint(value, nullptr);
- bool wasLeast;
- Channels[blobSeqId.Channel].GivenIdRanges.RemovePoint(value, &wasLeast);
- if (wasLeast) {
+ agent.GivenIdRanges[blobSeqId.Channel].RemovePoint(value);
+ Channels[blobSeqId.Channel].GivenIdRanges.RemovePoint(value);
+
+ if (channel.GetLeastExpectedBlobId(generation) != leastExpectedBlobIdBefore) {
Data->OnLeastExpectedBlobIdChange(blobSeqId.Channel);
}
}
diff --git a/ydb/core/blob_depot/types.h b/ydb/core/blob_depot/types.h
index 1b926f5f87..2add0298c2 100644
--- a/ydb/core/blob_depot/types.h
+++ b/ydb/core/blob_depot/types.h
@@ -106,7 +106,7 @@ namespace NKikimr::NBlobDepot {
public:
void IssueNewRange(ui64 begin, ui64 end);
void AddPoint(ui64 value);
- void RemovePoint(ui64 value, bool *wasLeast);
+ void RemovePoint(ui64 value);
bool GetPoint(ui64 value) const;
bool IsEmpty() const;
@@ -122,6 +122,15 @@ namespace NKikimr::NBlobDepot {
std::vector<bool> ToDebugArray(size_t numItems) const;
void CheckConsistency() const;
+
+ template<typename T>
+ void ForEach(T&& callback) const {
+ for (const auto& [index, chunk] : Ranges) {
+ Y_FOR_EACH_BIT(offset, chunk) {
+ callback(index * BitsPerChunk + offset);
+ }
+ }
+ }
};
using TValueChain = NProtoBuf::RepeatedPtrField<NKikimrBlobDepot::TValueChain>;