diff options
author | alexvru <alexvru@ydb.tech> | 2023-07-21 12:28:45 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-07-21 12:28:45 +0300 |
commit | f23ee396c023ff431442b8322dd075ac8b8dff63 (patch) | |
tree | f4598b243fbc2b45476ed9c78182a4333d38473b | |
parent | d8bf44488de42e74f19a724f685c6e9dde6a7239 (diff) | |
download | ydb-f23ee396c023ff431442b8322dd075ac8b8dff63.tar.gz |
Fix trash processing before actual commit KIKIMR-18784
-rw-r--r-- | ydb/core/keyvalue/keyvalue_flat_impl.h | 8 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_intermediate.h | 2 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_state.cpp | 57 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_state.h | 9 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_storage_request.cpp | 5 |
5 files changed, 46 insertions, 35 deletions
diff --git a/ydb/core/keyvalue/keyvalue_flat_impl.h b/ydb/core/keyvalue/keyvalue_flat_impl.h index ecdd246c058..9b42199f91f 100644 --- a/ydb/core/keyvalue/keyvalue_flat_impl.h +++ b/ydb/core/keyvalue/keyvalue_flat_impl.h @@ -165,10 +165,12 @@ protected: struct TTxDropRefCountsOnError : NTabletFlatExecutor::ITransaction { std::deque<std::pair<TLogoBlobID, bool>> RefCountsIncr; + const ui64 RequestUid; TKeyValueFlat *Self; - TTxDropRefCountsOnError(std::deque<std::pair<TLogoBlobID, bool>>&& refCountsIncr, TKeyValueFlat *self) + TTxDropRefCountsOnError(std::deque<std::pair<TLogoBlobID, bool>>&& refCountsIncr, ui64 requestUid, TKeyValueFlat *self) : RefCountsIncr(std::move(refCountsIncr)) + , RequestUid(requestUid) , Self(self) {} @@ -178,7 +180,7 @@ protected: LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << Self->TabletID() << " TTxDropRefCountsOnError Execute"); if (!Self->State.GetIsDamaged()) { TSimpleDbFlat db(txc.DB); - Self->State.DropRefCountsOnErrorInTx(std::move(RefCountsIncr), db, ctx); + Self->State.DropRefCountsOnErrorInTx(std::move(RefCountsIncr), db, ctx, RequestUid); } return true; } @@ -385,7 +387,7 @@ protected: State.OnRequestComplete(event.RequestUid, event.Generation, event.Step, ctx, Info(), event.Status, event.Stat); State.DropRefCountsOnError(event.RefCountsIncr, true, ctx); if (!event.RefCountsIncr.empty()) { - Execute(new TTxDropRefCountsOnError(std::move(event.RefCountsIncr), this), ctx); + Execute(new TTxDropRefCountsOnError(std::move(event.RefCountsIncr), event.RequestUid, this), ctx); } } diff --git a/ydb/core/keyvalue/keyvalue_intermediate.h b/ydb/core/keyvalue/keyvalue_intermediate.h index 5f5b7e20e8f..8df55fcaed7 100644 --- a/ydb/core/keyvalue/keyvalue_intermediate.h +++ b/ydb/core/keyvalue/keyvalue_intermediate.h @@ -87,7 +87,7 @@ struct TIntermediate { }; struct TGetStatus { NKikimrClient::TKeyValueRequest::EStorageChannel StorageChannel; - TLogoBlobID LogoBlobId; + ui32 GroupId; NKikimrProto::EReplyStatus Status; TStorageStatusFlags StatusFlags; }; diff --git a/ydb/core/keyvalue/keyvalue_state.cpp b/ydb/core/keyvalue/keyvalue_state.cpp index b39de392d84..aba59e184fb 100644 --- a/ydb/core/keyvalue/keyvalue_state.cpp +++ b/ydb/core/keyvalue/keyvalue_state.cpp @@ -754,10 +754,8 @@ TLogoBlobID TKeyValueState::AllocateLogoBlobId(ui32 size, ui32 storageChannelIdx } Y_VERIFY(!CollectOperation || THelpers::GenerationStep(id) > THelpers::TGenerationStep(CollectOperation->Header.GetCollectGeneration(), CollectOperation->Header.GetCollectStep())); - if (requestUid) { - ++InFlightForStep[id.Step()]; - ++RequestUidStepToCount[std::make_tuple(requestUid, id.Step())]; - } + ++InFlightForStep[id.Step()]; + ++RequestUidStepToCount[std::make_tuple(requestUid, id.Step())]; return id; } @@ -784,7 +782,7 @@ void TKeyValueState::RequestExecute(THolder<TIntermediate> &intermediate, ISimpl // All reads done intermediate->Response.SetStatus(NMsgBusProxy::MSTATUS_REJECTED); intermediate->Response.SetErrorReason(str.Str()); - DropRefCountsOnErrorInTx(std::move(intermediate->RefCountsIncr), db, ctx); + DropRefCountsOnErrorInTx(std::move(intermediate->RefCountsIncr), db, ctx, intermediate->RequestUid); return; } } @@ -816,7 +814,7 @@ void TKeyValueState::RequestExecute(THolder<TIntermediate> &intermediate, ISimpl // All reads done intermediate->Response.SetStatus(NMsgBusProxy::MSTATUS_INTERNALERROR); intermediate->Response.SetErrorReason(str.Str()); - DropRefCountsOnErrorInTx(std::move(intermediate->RefCountsIncr), db, ctx); + DropRefCountsOnErrorInTx(std::move(intermediate->RefCountsIncr), db, ctx, intermediate->RequestUid); return; } } @@ -831,14 +829,20 @@ void TKeyValueState::RequestComplete(THolder<TIntermediate> &intermediate, const const TTabletStorageInfo *info) { Reply(intermediate, ctx, info); + + if (const auto it = TrashBeingCommitted.find(intermediate->RequestUid); it != TrashBeingCommitted.end()) { + Trash.insert(it->second.begin(), it->second.end()); + TrashBeingCommitted.erase(it); + } + // Make sure there is no way for a successfull delete transaction to end without getting here PrepareCollectIfNeeded(ctx); } void TKeyValueState::DropRefCountsOnErrorInTx(std::deque<std::pair<TLogoBlobID, bool>>&& refCountsIncr, ISimpleDb& db, - const TActorContext& ctx) { + const TActorContext& ctx, ui64 requestUid) { for (const auto& [logoBlobId, initial] : refCountsIncr) { - Dereference(logoBlobId, db, ctx, initial); + Dereference(logoBlobId, db, ctx, initial, requestUid); } } @@ -1025,10 +1029,10 @@ void TKeyValueState::ProcessCmd(TIntermediate::TWrite &request, NKikimrClient::TKeyValueResponse::TWriteResult *legacyResponse, NKikimrKeyValue::StorageChannel *response, ISimpleDb &db, const TActorContext &ctx, TRequestStat &/*stat*/, ui64 unixTime, - TIntermediate* /*intermediate*/) + TIntermediate *intermediate) { TIndexRecord& record = Index[request.Key]; - Dereference(record, db, ctx); + Dereference(record, db, ctx, intermediate->RequestUid); record.Chain = {}; ui32 storage_channel = 0; @@ -1077,12 +1081,16 @@ void TKeyValueState::ProcessCmd(const TIntermediate::TDelete &request, NKikimrClient::TKeyValueResponse::TDeleteRangeResult *legacyResponse, NKikimrKeyValue::StorageChannel */*response*/, ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 /*unixTime*/, - TIntermediate* /*intermediate*/) + TIntermediate *intermediate) { TraverseRange(request.Range, [&](TIndex::iterator it) { stat.Deletes++; stat.DeleteBytes += it->second.GetFullValueSize(); - Dereference(it->second, db, ctx); + for (const auto& chain : it->second.Chain) { + STLOG(PRI_DEBUG, KEYVALUE_GC, KVC99, "DeleteRange", (TabletId, TabletId), (Generation, ExecutorGeneration), + (Key, it->first), (Id, chain.LogoBlobId)); + } + Dereference(it->second, db, ctx, intermediate->RequestUid); THelpers::DbEraseUserKey(it->first, db, ctx); Index.erase(it); }); @@ -1096,14 +1104,14 @@ void TKeyValueState::ProcessCmd(const TIntermediate::TRename &request, NKikimrClient::TKeyValueResponse::TRenameResult *legacyResponse, NKikimrKeyValue::StorageChannel */*response*/, ISimpleDb &db, const TActorContext &ctx, TRequestStat &/*stat*/, ui64 unixTime, - TIntermediate* /*intermediate*/) + TIntermediate *intermediate) { auto oldIter = Index.find(request.OldKey); Y_VERIFY(oldIter != Index.end()); TIndexRecord& source = oldIter->second; TIndexRecord& dest = Index[request.NewKey]; - Dereference(dest, db, ctx); + Dereference(dest, db, ctx, intermediate->RequestUid); dest.Chain = std::move(source.Chain); dest.CreationUnixTime = unixTime; @@ -1142,7 +1150,7 @@ void TKeyValueState::ProcessCmd(const TIntermediate::TCopyRange &request, TString newKey = request.PrefixToAdd + it->first.substr(request.PrefixToRemove.size()); TIndexRecord& record = Index[newKey]; - Dereference(record, db, ctx); + Dereference(record, db, ctx, intermediate->RequestUid); record.Chain = sourceRecord.Chain; record.CreationUnixTime = sourceRecord.CreationUnixTime; UpdateKeyValue(newKey, record, db, ctx); @@ -1180,14 +1188,14 @@ void TKeyValueState::ProcessCmd(const TIntermediate::TConcat &request, } if (!request.KeepInputs) { - Dereference(input, db, ctx); + Dereference(input, db, ctx, intermediate->RequestUid); THelpers::DbEraseUserKey(it->first, db, ctx); Index.erase(it); } } TIndexRecord& record = Index[request.OutputKey]; - Dereference(record, db, ctx); + Dereference(record, db, ctx, intermediate->RequestUid); record.Chain = std::move(chain); record.CreationUnixTime = unixTime; UpdateKeyValue(request.OutputKey, record, db, ctx); @@ -1575,7 +1583,7 @@ void TKeyValueState::ProcessCmds(THolder<TIntermediate> &intermediate, ISimpleDb success = success && CheckCmds(intermediate, ctx, keys, info); success = success && CheckCmdGetStatus(intermediate, ctx, keys, info); if (!success) { - DropRefCountsOnErrorInTx(std::exchange(intermediate->RefCountsIncr, {}), db, ctx); + DropRefCountsOnErrorInTx(std::exchange(intermediate->RefCountsIncr, {}), db, ctx, intermediate->RequestUid); } else { // Read + validate CmdRead(intermediate, db, ctx); @@ -1621,21 +1629,22 @@ bool TKeyValueState::IncrementGeneration(THolder<TIntermediate> &intermediate, I return true; } -void TKeyValueState::Dereference(const TIndexRecord& record, ISimpleDb& db, const TActorContext& ctx) { +void TKeyValueState::Dereference(const TIndexRecord& record, ISimpleDb& db, const TActorContext& ctx, ui64 requestUid) { for (const TIndexRecord::TChainItem& item : record.Chain) { if (!item.IsInline()) { - Dereference(item.LogoBlobId, db, ctx, false); + Dereference(item.LogoBlobId, db, ctx, false, requestUid); } } } -void TKeyValueState::Dereference(const TLogoBlobID& id, ISimpleDb& db, const TActorContext& ctx, bool initial) { +void TKeyValueState::Dereference(const TLogoBlobID& id, ISimpleDb& db, const TActorContext& ctx, bool initial, + ui64 requestUid) { auto it = RefCounts.find(id); Y_VERIFY(it != RefCounts.end()); --it->second; if (!it->second) { RefCounts.erase(it); - Trash.insert(id); + TrashBeingCommitted[requestUid].push_back(id); THelpers::DbUpdateTrash(id, db, ctx); if (initial) { CountInitialTrashRecord(id.BlobSize()); @@ -2273,7 +2282,7 @@ TKeyValueState::TPrepareResult TKeyValueState::InitGetStatusCommand(TIntermediat TString msg; if (storageChannel == NKikimrClient::TKeyValueRequest::INLINE) { cmd.StorageChannel = storageChannel; - cmd.LogoBlobId = TLogoBlobID(); + cmd.GroupId = 0; cmd.Status = NKikimrProto::OK; cmd.StatusFlags = TStorageStatusFlags(ui32(NKikimrBlobStorage::StatusIsValid)); if (GetIsTabletYellowMove()) { @@ -2294,7 +2303,7 @@ TKeyValueState::TPrepareResult TKeyValueState::InitGetStatusCommand(TIntermediat } cmd.StorageChannel = storageChannel; - cmd.LogoBlobId = AllocateLogoBlobId(1, storageChannelIdx, 0); + cmd.GroupId = info->GroupFor(storageChannelIdx, ExecutorGeneration); cmd.Status = NKikimrProto::UNKNOWN; } return {false, msg}; diff --git a/ydb/core/keyvalue/keyvalue_state.h b/ydb/core/keyvalue/keyvalue_state.h index 17edd034494..901cd5d19ef 100644 --- a/ydb/core/keyvalue/keyvalue_state.h +++ b/ydb/core/keyvalue/keyvalue_state.h @@ -254,6 +254,7 @@ protected: TIndex Index; THashMap<TLogoBlobID, ui32> RefCounts; TSet<TLogoBlobID> Trash; + THashMap<ui64, TVector<TLogoBlobID>> TrashBeingCommitted; TMap<ui64, ui64> InFlightForStep; TMap<std::tuple<ui64, ui32>, ui32> RequestUidStepToCount; THashSet<ui64> CmdTrimLeakedBlobsUids; @@ -336,7 +337,8 @@ public: void RequestExecute(THolder<TIntermediate> &intermediate, ISimpleDb &db, const TActorContext &ctx, const TTabletStorageInfo *info); void RequestComplete(THolder<TIntermediate> &intermediate, const TActorContext &ctx, const TTabletStorageInfo *info); - void DropRefCountsOnErrorInTx(std::deque<std::pair<TLogoBlobID, bool>>&& refCountsIncr, ISimpleDb& db, const TActorContext& ctx); + void DropRefCountsOnErrorInTx(std::deque<std::pair<TLogoBlobID, bool>>&& refCountsIncr, ISimpleDb& db, const TActorContext& ctx, + ui64 requestUid); void DropRefCountsOnError(std::deque<std::pair<TLogoBlobID, bool>>& refCountsIncr /*in-out*/, bool writesMade, const TActorContext& ctx); void ProcessPostponedTrims(const TActorContext& ctx, const TTabletStorageInfo *info); @@ -431,9 +433,10 @@ public: return CollectOperation; } - void Dereference(const TIndexRecord& record, ISimpleDb& db, const TActorContext& ctx); + void Dereference(const TIndexRecord& record, ISimpleDb& db, const TActorContext& ctx, ui64 requestUid); void UpdateKeyValue(const TString& key, const TIndexRecord& record, ISimpleDb& db, const TActorContext& ctx); - void Dereference(const TLogoBlobID& id, ISimpleDb& db, const TActorContext& ctx, bool initial); + void Dereference(const TLogoBlobID& id, ISimpleDb& db, const TActorContext& ctx, bool initial, + ui64 requestUid); ui32 GetPerGenerationCounter() { return PerGenerationCounter; diff --git a/ydb/core/keyvalue/keyvalue_storage_request.cpp b/ydb/core/keyvalue/keyvalue_storage_request.cpp index 0d780c546ec..84005ead266 100644 --- a/ydb/core/keyvalue/keyvalue_storage_request.cpp +++ b/ydb/core/keyvalue/keyvalue_storage_request.cpp @@ -601,11 +601,8 @@ public: auto &getStatus = IntermediateResults->GetStatuses[i]; if (getStatus.Status != NKikimrProto::OK) { Y_VERIFY(getStatus.Status == NKikimrProto::UNKNOWN); - const ui32 groupId = TabletInfo->GroupFor(getStatus.LogoBlobId.Channel(), getStatus.LogoBlobId.Generation()); - Y_VERIFY(groupId != Max<ui32>(), "GetStatus Blob# %s is mapped to an invalid group (-1)!", - getStatus.LogoBlobId.ToString().c_str()); SendToBSProxy( - ctx, groupId, + ctx, getStatus.GroupId, new TEvBlobStorage::TEvStatus(IntermediateResults->Deadline), i); ++GetStatusRequestsSent; } |