aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-07-20 19:01:36 +0300
committeralexvru <alexvru@ydb.tech>2023-07-20 19:01:36 +0300
commit81b72a16387307cd9ee2b69e36a9bb4f85d07a65 (patch)
tree9fd33f636fc8182f43764c4acc47b1fb91c8d8b8
parent02b77500dbd3deed2330d5401463c06159433cdb (diff)
downloadydb-81b72a16387307cd9ee2b69e36a9bb4f85d07a65.tar.gz
Ordered execution of CmdTrimLeakedBlobs vs garbage collection KIKIMR-18784
-rw-r--r--ydb/core/keyvalue/keyvalue_flat_impl.h4
-rw-r--r--ydb/core/keyvalue/keyvalue_state.cpp55
-rw-r--r--ydb/core/keyvalue/keyvalue_state.h7
-rw-r--r--ydb/core/keyvalue/keyvalue_state_collect.cpp8
4 files changed, 52 insertions, 22 deletions
diff --git a/ydb/core/keyvalue/keyvalue_flat_impl.h b/ydb/core/keyvalue/keyvalue_flat_impl.h
index 5e97a383c1d..ecdd246c058 100644
--- a/ydb/core/keyvalue/keyvalue_flat_impl.h
+++ b/ydb/core/keyvalue/keyvalue_flat_impl.h
@@ -228,7 +228,7 @@ protected:
};
using TExecuteMethod = void (TKeyValueState::*)(ISimpleDb &db, const TActorContext &ctx);
- using TCompleteMethod = void (TKeyValueState::*)(const TActorContext &ctx);
+ using TCompleteMethod = void (TKeyValueState::*)(const TActorContext &ctx, const TTabletStorageInfo *info);
template <typename TDerived, TExecuteMethod ExecuteMethod, TCompleteMethod CompleteMethod>
struct TTxUniversal : NTabletFlatExecutor::ITransaction {
@@ -248,7 +248,7 @@ protected:
void Complete(const TActorContext &ctx) override {
LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << Self->TabletID()
<< ' ' << TDerived::Name << " Complete");
- (Self->State.*CompleteMethod)(ctx);
+ (Self->State.*CompleteMethod)(ctx, Self->Info());
}
};
diff --git a/ydb/core/keyvalue/keyvalue_state.cpp b/ydb/core/keyvalue/keyvalue_state.cpp
index 9ea2fb90864..b39de392d84 100644
--- a/ydb/core/keyvalue/keyvalue_state.cpp
+++ b/ydb/core/keyvalue/keyvalue_state.cpp
@@ -668,7 +668,7 @@ void TKeyValueState::InitExecute(ui64 tabletId, TActorId keyValueActorId, ui32 e
// corner case, if no CollectGarbage events were sent
if (InitialCollectsSent == 0) {
SendCutHistory(ctx);
- RegisterInitialGCCompletionComplete(ctx);
+ RegisterInitialGCCompletionComplete(ctx, info);
} else {
IsCollectEventSent = true;
}
@@ -693,9 +693,10 @@ void TKeyValueState::RegisterInitialGCCompletionExecute(ISimpleDb &db, const TAc
THelpers::DbUpdateState(StoredState, db, ctx);
}
-void TKeyValueState::RegisterInitialGCCompletionComplete(const TActorContext &ctx) {
+void TKeyValueState::RegisterInitialGCCompletionComplete(const TActorContext &ctx, const TTabletStorageInfo *info) {
IsCollectEventSent = false;
// initiate collection if trash was loaded from local base
+ ProcessPostponedTrims(ctx, info);
PrepareCollectIfNeeded(ctx);
}
@@ -1702,6 +1703,7 @@ void TKeyValueState::OnRequestComplete(ui64 requestUid, ui64 generation, ui64 st
CountRequestTakeOffOrEnqueue(requestType);
}
+ CmdTrimLeakedBlobsUids.erase(requestUid);
CancelInFlight(requestUid);
PrepareCollectIfNeeded(ctx);
}
@@ -2868,6 +2870,16 @@ void TKeyValueState::ProcessPostponedIntermediate(const TActorContext& ctx, THol
}
}
+void TKeyValueState::ProcessPostponedTrims(const TActorContext& ctx, const TTabletStorageInfo *info) {
+ if (!IsCollectEventSent && !InitialCollectsSent) {
+ for (auto& interm : CmdTrimLeakedBlobsPostponed) {
+ CmdTrimLeakedBlobsUids.insert(interm->RequestUid);
+ RegisterRequestActor(ctx, std::move(interm), info, ExecutorGeneration);
+ }
+ CmdTrimLeakedBlobsPostponed.clear();
+ }
+}
+
void TKeyValueState::OnEvReadRequest(TEvKeyValue::TEvRead::TPtr &ev, const TActorContext &ctx,
const TTabletStorageInfo *info)
{
@@ -3030,6 +3042,8 @@ void TKeyValueState::OnEvRequest(TEvKeyValue::TEvRequest::TPtr &ev, const TActor
bool hasReads = request.CmdReadSize() || request.CmdReadRangeSize();
+ bool hasTrim = request.HasCmdTrimLeakedBlobs();
+
TRequestType::EType requestType;
if (hasWrites) {
if (hasReads) {
@@ -3045,25 +3059,32 @@ void TKeyValueState::OnEvRequest(TEvKeyValue::TEvRequest::TPtr &ev, const TActor
if (PrepareIntermediate(ev, intermediate, requestType, ctx, info)) {
// Spawn KeyValueStorageRequest actor on the same thread
- if (requestType == TRequestType::WriteOnly) {
- LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId
- << " Create storage request for WO, Marker# KV42");
- RegisterRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration);
- } else if (requestType == TRequestType::ReadOnlyInline) {
- LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId
- << " Create storage request for RO_INLINE, Marker# KV45");
- RegisterRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration);
- ++RoInlineIntermediatesInFlight;
+ if (hasTrim && (IsCollectEventSent || InitialCollectsSent)) {
+ CmdTrimLeakedBlobsPostponed.push_back(std::move(intermediate));
} else {
- if (IntermediatesInFlight < IntermediatesInFlightLimit) {
+ if (hasTrim) {
+ CmdTrimLeakedBlobsUids.insert(intermediate->RequestUid);
+ }
+ if (requestType == TRequestType::WriteOnly) {
LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId
- << " Create storage request for RO/RW, Marker# KV43");
+ << " Create storage request for WO, Marker# KV42");
RegisterRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration);
- ++IntermediatesInFlight;
- } else {
+ } else if (requestType == TRequestType::ReadOnlyInline) {
LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId
- << " Enqueue storage request for RO/RW, Marker# KV44");
- PostponeIntermediate<TEvKeyValue::TEvRequest>(std::move(intermediate));
+ << " Create storage request for RO_INLINE, Marker# KV45");
+ RegisterRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration);
+ ++RoInlineIntermediatesInFlight;
+ } else {
+ if (IntermediatesInFlight < IntermediatesInFlightLimit) {
+ LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId
+ << " Create storage request for RO/RW, Marker# KV43");
+ RegisterRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration);
+ ++IntermediatesInFlight;
+ } else {
+ LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId
+ << " Enqueue storage request for RO/RW, Marker# KV44");
+ PostponeIntermediate<TEvKeyValue::TEvRequest>(std::move(intermediate));
+ }
}
}
diff --git a/ydb/core/keyvalue/keyvalue_state.h b/ydb/core/keyvalue/keyvalue_state.h
index e7749f33904..17edd034494 100644
--- a/ydb/core/keyvalue/keyvalue_state.h
+++ b/ydb/core/keyvalue/keyvalue_state.h
@@ -256,6 +256,8 @@ protected:
TSet<TLogoBlobID> Trash;
TMap<ui64, ui64> InFlightForStep;
TMap<std::tuple<ui64, ui32>, ui32> RequestUidStepToCount;
+ THashSet<ui64> CmdTrimLeakedBlobsUids;
+ std::vector<THolder<TIntermediate>> CmdTrimLeakedBlobsPostponed;
THashMap<ui64, TInstant> RequestInputTime;
ui64 NextRequestUid = 1;
TIntrusivePtr<TCollectOperation> CollectOperation;
@@ -327,7 +329,7 @@ public:
const TActorContext &ctx, const TTabletStorageInfo *info);
bool RegisterInitialCollectResult(const TActorContext &ctx);
void RegisterInitialGCCompletionExecute(ISimpleDb &db, const TActorContext &ctx);
- void RegisterInitialGCCompletionComplete(const TActorContext &ctx);
+ void RegisterInitialGCCompletionComplete(const TActorContext &ctx, const TTabletStorageInfo *info);
void SendCutHistory(const TActorContext &ctx);
void OnInitQueueEmpty(const TActorContext &ctx);
void OnStateWork(const TActorContext &ctx);
@@ -337,13 +339,14 @@ public:
void DropRefCountsOnErrorInTx(std::deque<std::pair<TLogoBlobID, bool>>&& refCountsIncr, ISimpleDb& db, const TActorContext& ctx);
void DropRefCountsOnError(std::deque<std::pair<TLogoBlobID, bool>>& refCountsIncr /*in-out*/, bool writesMade,
const TActorContext& ctx);
+ void ProcessPostponedTrims(const TActorContext& ctx, const TTabletStorageInfo *info);
// garbage collection methods
void PrepareCollectIfNeeded(const TActorContext &ctx);
bool RemoveCollectedTrash(ISimpleDb &db, const TActorContext &ctx);
void UpdateStoredState(ISimpleDb &db, const TActorContext &ctx, const NKeyValue::THelpers::TGenerationStep &genStep);
void CompleteGCExecute(ISimpleDb &db, const TActorContext &ctx);
- void CompleteGCComplete(const TActorContext &ctx);
+ void CompleteGCComplete(const TActorContext &ctx, const TTabletStorageInfo *info);
void StartGC(const TActorContext &ctx, TVector<TLogoBlobID> &keep, TVector<TLogoBlobID> &doNotKeep,
TVector<TLogoBlobID>& trashGoingToCollect);
void StartCollectingIfPossible(const TActorContext &ctx);
diff --git a/ydb/core/keyvalue/keyvalue_state_collect.cpp b/ydb/core/keyvalue/keyvalue_state_collect.cpp
index 18f111ea04c..27e5ecadc8f 100644
--- a/ydb/core/keyvalue/keyvalue_state_collect.cpp
+++ b/ydb/core/keyvalue/keyvalue_state_collect.cpp
@@ -7,6 +7,11 @@ namespace NKeyValue {
void TKeyValueState::PrepareCollectIfNeeded(const TActorContext &ctx) {
LOG_TRACE_S(ctx, NKikimrServices::KEYVALUE, "PrepareCollectIfNeeded KeyValue# " << TabletId << " Marker# KV61");
+ if (CmdTrimLeakedBlobsUids) {
+ // Do not start garbage collection while we are trimming to avoid race.
+ return;
+ }
+
if (IsCollectEventSent || InitialCollectsSent) {
// We already are trying to collect something, just pass this time.
return;
@@ -95,7 +100,7 @@ void TKeyValueState::CompleteGCExecute(ISimpleDb &db, const TActorContext &ctx)
}
}
-void TKeyValueState::CompleteGCComplete(const TActorContext &ctx) {
+void TKeyValueState::CompleteGCComplete(const TActorContext &ctx, const TTabletStorageInfo *info) {
if (RepeatGCTX) {
STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC20, "Repeat CompleteGC",
(TabletId, TabletId),
@@ -110,6 +115,7 @@ void TKeyValueState::CompleteGCComplete(const TActorContext &ctx) {
STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC22, "CompleteGC Complete",
(TabletId, TabletId),
(TrashCount, Trash.size()));
+ ProcessPostponedTrims(ctx, info);
PrepareCollectIfNeeded(ctx);
}