aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-03-16 19:51:38 +0300
committeralexvru <alexvru@ydb.tech>2023-03-16 19:51:38 +0300
commitf34b420a371177f39c58bb3dde6afdab7d8edef5 (patch)
treeaeddeb2b8b2b0f83294a1b08ae3acb2193fff34b
parent3781a2b9d119c682e0ab48766a7762dda8acd9a3 (diff)
downloadydb-f34b420a371177f39c58bb3dde6afdab7d8edef5.tar.gz
Fix blob depot bug
-rw-r--r--ydb/core/blob_depot/data_decommit.cpp27
1 files changed, 19 insertions, 8 deletions
diff --git a/ydb/core/blob_depot/data_decommit.cpp b/ydb/core/blob_depot/data_decommit.cpp
index 26b3b8f682..8dc4c59cd8 100644
--- a/ydb/core/blob_depot/data_decommit.cpp
+++ b/ydb/core/blob_depot/data_decommit.cpp
@@ -16,6 +16,8 @@ namespace NKikimr::NBlobDepot {
THashSet<TLogoBlobID> ResolutionErrors;
TEvBlobDepot::TEvResolve::TPtr Ev;
+ ui32 TxInFlight = 0;
+
ui32 RangesInFlight = 0;
std::deque<std::tuple<TLogoBlobID, bool>> GetQ;
@@ -28,6 +30,8 @@ namespace NKikimr::NBlobDepot {
THashMap<TLogoBlobID, TKey> IdToKey;
+ bool Finished = false;
+
public:
TResolveDecommitActor(TBlobDepot *self, TEvBlobDepot::TEvResolve::TPtr ev)
: Self(self)
@@ -44,6 +48,7 @@ namespace NKikimr::NBlobDepot {
(Sender, Ev->Sender), (Cookie, Ev->Cookie));
Self->Execute(std::make_unique<TCoroTx>(Self, Token, std::bind(&TThis::TxPrepare, this)));
+ ++TxInFlight;
Become(&TThis::StateFunc);
}
@@ -111,7 +116,7 @@ namespace NKikimr::NBlobDepot {
}
TCoroTx::FinishTx();
- InvokeOtherActor(*this, &TThis::CheckIfDone);
+ TActivationContext::Send(new IEventHandleFat(TEvPrivate::EvTxComplete, 0, SelfId(), {}, nullptr, 0));
}
void ScanRangeAndIssueGets(TKey from, TKey to, TScanFlags flags) {
@@ -218,7 +223,7 @@ namespace NKikimr::NBlobDepot {
} else if (r.Status == NKikimrProto::NODATA) {
Self->Data->ExecuteTxCommitAssimilatedBlob(NKikimrProto::NODATA, TBlobSeqId(), TData::TKey(r.Id),
TEvPrivate::EvTxComplete, SelfId(), 0);
- ++PutsInFlight;
+ ++TxInFlight;
} else {
// mark this specific key as unresolvable
ResolutionErrors.emplace(r.Id);
@@ -281,17 +286,20 @@ namespace NKikimr::NBlobDepot {
}
void HandleTxComplete() {
- --PutsInFlight;
+ --TxInFlight;
CheckIfDone();
}
void CheckIfDone() {
- if (RangesInFlight + GetsInFlight + GetQ.size() + PutsInFlight == 0) {
+ if (TxInFlight + RangesInFlight + GetsInFlight + GetQ.size() + PutsInFlight == 0) {
FinishWithSuccess();
}
}
void FinishWithSuccess() {
+ Y_VERIFY(!Finished);
+ Finished = true;
+
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT92, "request succeeded", (Id, Self->GetLogId()), (Sender, Ev->Sender),
(Cookie, Ev->Cookie), (ResolutionErrors.size, ResolutionErrors.size()),
(DecommitBlobs.size, DecommitBlobs.size()));
@@ -317,11 +325,14 @@ namespace NKikimr::NBlobDepot {
}
void FinishWithError(NLog::EPriority prio, TString errorReason) {
- STLOG(prio, BLOB_DEPOT, BDT89, "request failed", (Id, Self->GetLogId()), (Sender, Ev->Sender),
+ Y_VERIFY(!Finished);
+ Finished = true;
+
+ STLOG(prio, BLOB_DEPOT, BDT89, "request failed", (Id, Self->GetLogId()), (Sender, Ev->Sender),
(Cookie, Ev->Cookie), (ErrorReason, errorReason));
- auto [response, record] = TEvBlobDepot::MakeResponseFor(*Ev, NKikimrProto::ERROR, std::move(errorReason));
- TActivationContext::Send(response.release());
- PassAway();
+ auto [response, record] = TEvBlobDepot::MakeResponseFor(*Ev, NKikimrProto::ERROR, std::move(errorReason));
+ TActivationContext::Send(response.release());
+ PassAway();
}
STATEFN(StateFunc) {