diff options
author | alexvru <alexvru@ydb.tech> | 2023-07-20 19:01:36 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-07-20 19:01:36 +0300 |
commit | 81b72a16387307cd9ee2b69e36a9bb4f85d07a65 (patch) | |
tree | 9fd33f636fc8182f43764c4acc47b1fb91c8d8b8 | |
parent | 02b77500dbd3deed2330d5401463c06159433cdb (diff) | |
download | ydb-81b72a16387307cd9ee2b69e36a9bb4f85d07a65.tar.gz |
Ordered execution of CmdTrimLeakedBlobs vs garbage collection KIKIMR-18784
-rw-r--r-- | ydb/core/keyvalue/keyvalue_flat_impl.h | 4 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_state.cpp | 55 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_state.h | 7 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_state_collect.cpp | 8 |
4 files changed, 52 insertions, 22 deletions
diff --git a/ydb/core/keyvalue/keyvalue_flat_impl.h b/ydb/core/keyvalue/keyvalue_flat_impl.h index 5e97a383c1d..ecdd246c058 100644 --- a/ydb/core/keyvalue/keyvalue_flat_impl.h +++ b/ydb/core/keyvalue/keyvalue_flat_impl.h @@ -228,7 +228,7 @@ protected: }; using TExecuteMethod = void (TKeyValueState::*)(ISimpleDb &db, const TActorContext &ctx); - using TCompleteMethod = void (TKeyValueState::*)(const TActorContext &ctx); + using TCompleteMethod = void (TKeyValueState::*)(const TActorContext &ctx, const TTabletStorageInfo *info); template <typename TDerived, TExecuteMethod ExecuteMethod, TCompleteMethod CompleteMethod> struct TTxUniversal : NTabletFlatExecutor::ITransaction { @@ -248,7 +248,7 @@ protected: void Complete(const TActorContext &ctx) override { LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << Self->TabletID() << ' ' << TDerived::Name << " Complete"); - (Self->State.*CompleteMethod)(ctx); + (Self->State.*CompleteMethod)(ctx, Self->Info()); } }; diff --git a/ydb/core/keyvalue/keyvalue_state.cpp b/ydb/core/keyvalue/keyvalue_state.cpp index 9ea2fb90864..b39de392d84 100644 --- a/ydb/core/keyvalue/keyvalue_state.cpp +++ b/ydb/core/keyvalue/keyvalue_state.cpp @@ -668,7 +668,7 @@ void TKeyValueState::InitExecute(ui64 tabletId, TActorId keyValueActorId, ui32 e // corner case, if no CollectGarbage events were sent if (InitialCollectsSent == 0) { SendCutHistory(ctx); - RegisterInitialGCCompletionComplete(ctx); + RegisterInitialGCCompletionComplete(ctx, info); } else { IsCollectEventSent = true; } @@ -693,9 +693,10 @@ void TKeyValueState::RegisterInitialGCCompletionExecute(ISimpleDb &db, const TAc THelpers::DbUpdateState(StoredState, db, ctx); } -void TKeyValueState::RegisterInitialGCCompletionComplete(const TActorContext &ctx) { +void TKeyValueState::RegisterInitialGCCompletionComplete(const TActorContext &ctx, const TTabletStorageInfo *info) { IsCollectEventSent = false; // initiate collection if trash was loaded from local base + ProcessPostponedTrims(ctx, info); PrepareCollectIfNeeded(ctx); } @@ -1702,6 +1703,7 @@ void TKeyValueState::OnRequestComplete(ui64 requestUid, ui64 generation, ui64 st CountRequestTakeOffOrEnqueue(requestType); } + CmdTrimLeakedBlobsUids.erase(requestUid); CancelInFlight(requestUid); PrepareCollectIfNeeded(ctx); } @@ -2868,6 +2870,16 @@ void TKeyValueState::ProcessPostponedIntermediate(const TActorContext& ctx, THol } } +void TKeyValueState::ProcessPostponedTrims(const TActorContext& ctx, const TTabletStorageInfo *info) { + if (!IsCollectEventSent && !InitialCollectsSent) { + for (auto& interm : CmdTrimLeakedBlobsPostponed) { + CmdTrimLeakedBlobsUids.insert(interm->RequestUid); + RegisterRequestActor(ctx, std::move(interm), info, ExecutorGeneration); + } + CmdTrimLeakedBlobsPostponed.clear(); + } +} + void TKeyValueState::OnEvReadRequest(TEvKeyValue::TEvRead::TPtr &ev, const TActorContext &ctx, const TTabletStorageInfo *info) { @@ -3030,6 +3042,8 @@ void TKeyValueState::OnEvRequest(TEvKeyValue::TEvRequest::TPtr &ev, const TActor bool hasReads = request.CmdReadSize() || request.CmdReadRangeSize(); + bool hasTrim = request.HasCmdTrimLeakedBlobs(); + TRequestType::EType requestType; if (hasWrites) { if (hasReads) { @@ -3045,25 +3059,32 @@ void TKeyValueState::OnEvRequest(TEvKeyValue::TEvRequest::TPtr &ev, const TActor if (PrepareIntermediate(ev, intermediate, requestType, ctx, info)) { // Spawn KeyValueStorageRequest actor on the same thread - if (requestType == TRequestType::WriteOnly) { - LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId - << " Create storage request for WO, Marker# KV42"); - RegisterRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration); - } else if (requestType == TRequestType::ReadOnlyInline) { - LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId - << " Create storage request for RO_INLINE, Marker# KV45"); - RegisterRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration); - ++RoInlineIntermediatesInFlight; + if (hasTrim && (IsCollectEventSent || InitialCollectsSent)) { + CmdTrimLeakedBlobsPostponed.push_back(std::move(intermediate)); } else { - if (IntermediatesInFlight < IntermediatesInFlightLimit) { + if (hasTrim) { + CmdTrimLeakedBlobsUids.insert(intermediate->RequestUid); + } + if (requestType == TRequestType::WriteOnly) { LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId - << " Create storage request for RO/RW, Marker# KV43"); + << " Create storage request for WO, Marker# KV42"); RegisterRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration); - ++IntermediatesInFlight; - } else { + } else if (requestType == TRequestType::ReadOnlyInline) { LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId - << " Enqueue storage request for RO/RW, Marker# KV44"); - PostponeIntermediate<TEvKeyValue::TEvRequest>(std::move(intermediate)); + << " Create storage request for RO_INLINE, Marker# KV45"); + RegisterRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration); + ++RoInlineIntermediatesInFlight; + } else { + if (IntermediatesInFlight < IntermediatesInFlightLimit) { + LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId + << " Create storage request for RO/RW, Marker# KV43"); + RegisterRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration); + ++IntermediatesInFlight; + } else { + LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId + << " Enqueue storage request for RO/RW, Marker# KV44"); + PostponeIntermediate<TEvKeyValue::TEvRequest>(std::move(intermediate)); + } } } diff --git a/ydb/core/keyvalue/keyvalue_state.h b/ydb/core/keyvalue/keyvalue_state.h index e7749f33904..17edd034494 100644 --- a/ydb/core/keyvalue/keyvalue_state.h +++ b/ydb/core/keyvalue/keyvalue_state.h @@ -256,6 +256,8 @@ protected: TSet<TLogoBlobID> Trash; TMap<ui64, ui64> InFlightForStep; TMap<std::tuple<ui64, ui32>, ui32> RequestUidStepToCount; + THashSet<ui64> CmdTrimLeakedBlobsUids; + std::vector<THolder<TIntermediate>> CmdTrimLeakedBlobsPostponed; THashMap<ui64, TInstant> RequestInputTime; ui64 NextRequestUid = 1; TIntrusivePtr<TCollectOperation> CollectOperation; @@ -327,7 +329,7 @@ public: const TActorContext &ctx, const TTabletStorageInfo *info); bool RegisterInitialCollectResult(const TActorContext &ctx); void RegisterInitialGCCompletionExecute(ISimpleDb &db, const TActorContext &ctx); - void RegisterInitialGCCompletionComplete(const TActorContext &ctx); + void RegisterInitialGCCompletionComplete(const TActorContext &ctx, const TTabletStorageInfo *info); void SendCutHistory(const TActorContext &ctx); void OnInitQueueEmpty(const TActorContext &ctx); void OnStateWork(const TActorContext &ctx); @@ -337,13 +339,14 @@ public: void DropRefCountsOnErrorInTx(std::deque<std::pair<TLogoBlobID, bool>>&& refCountsIncr, ISimpleDb& db, const TActorContext& ctx); void DropRefCountsOnError(std::deque<std::pair<TLogoBlobID, bool>>& refCountsIncr /*in-out*/, bool writesMade, const TActorContext& ctx); + void ProcessPostponedTrims(const TActorContext& ctx, const TTabletStorageInfo *info); // garbage collection methods void PrepareCollectIfNeeded(const TActorContext &ctx); bool RemoveCollectedTrash(ISimpleDb &db, const TActorContext &ctx); void UpdateStoredState(ISimpleDb &db, const TActorContext &ctx, const NKeyValue::THelpers::TGenerationStep &genStep); void CompleteGCExecute(ISimpleDb &db, const TActorContext &ctx); - void CompleteGCComplete(const TActorContext &ctx); + void CompleteGCComplete(const TActorContext &ctx, const TTabletStorageInfo *info); void StartGC(const TActorContext &ctx, TVector<TLogoBlobID> &keep, TVector<TLogoBlobID> &doNotKeep, TVector<TLogoBlobID>& trashGoingToCollect); void StartCollectingIfPossible(const TActorContext &ctx); diff --git a/ydb/core/keyvalue/keyvalue_state_collect.cpp b/ydb/core/keyvalue/keyvalue_state_collect.cpp index 18f111ea04c..27e5ecadc8f 100644 --- a/ydb/core/keyvalue/keyvalue_state_collect.cpp +++ b/ydb/core/keyvalue/keyvalue_state_collect.cpp @@ -7,6 +7,11 @@ namespace NKeyValue { void TKeyValueState::PrepareCollectIfNeeded(const TActorContext &ctx) { LOG_TRACE_S(ctx, NKikimrServices::KEYVALUE, "PrepareCollectIfNeeded KeyValue# " << TabletId << " Marker# KV61"); + if (CmdTrimLeakedBlobsUids) { + // Do not start garbage collection while we are trimming to avoid race. + return; + } + if (IsCollectEventSent || InitialCollectsSent) { // We already are trying to collect something, just pass this time. return; @@ -95,7 +100,7 @@ void TKeyValueState::CompleteGCExecute(ISimpleDb &db, const TActorContext &ctx) } } -void TKeyValueState::CompleteGCComplete(const TActorContext &ctx) { +void TKeyValueState::CompleteGCComplete(const TActorContext &ctx, const TTabletStorageInfo *info) { if (RepeatGCTX) { STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC20, "Repeat CompleteGC", (TabletId, TabletId), @@ -110,6 +115,7 @@ void TKeyValueState::CompleteGCComplete(const TActorContext &ctx) { STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC22, "CompleteGC Complete", (TabletId, TabletId), (TrashCount, Trash.size())); + ProcessPostponedTrims(ctx, info); PrepareCollectIfNeeded(ctx); } |