diff options
author | Aleksandr Kriukov <[email protected]> | 2022-06-02 16:35:44 +0300 |
---|---|---|
committer | Aleksandr Kriukov <[email protected]> | 2022-06-02 16:35:44 +0300 |
commit | c768b09694655b942cfd5bc66db6c8c2ece24f75 (patch) | |
tree | c5f11fd149f509551b174b233612de06a427c902 | |
parent | 57dd11ca4a9bdb907537c585ca65f1e067306824 (diff) |
Stop to store collect operation, KIKIMR-14854
ref:66c3b20a58a68348ac2c6827a2bc55918a5cf5af
-rw-r--r-- | ydb/core/keyvalue/keyvalue_collector.cpp | 2 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_collector_ut.cpp | 6 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_events.h | 5 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_flat_impl.h | 79 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_state.h | 6 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_state_collect.cpp | 104 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_ut.cpp | 14 |
7 files changed, 126 insertions, 90 deletions
diff --git a/ydb/core/keyvalue/keyvalue_collector.cpp b/ydb/core/keyvalue/keyvalue_collector.cpp index 6aa83dfd203..c4d9b120545 100644 --- a/ydb/core/keyvalue/keyvalue_collector.cpp +++ b/ydb/core/keyvalue/keyvalue_collector.cpp @@ -91,7 +91,7 @@ public: CollectorForGroupForChannel.begin()); } if (CollectorForGroupForChannel.empty()) { - ctx.Send(KeyValueActorId, new TEvKeyValue::TEvEraseCollect()); + ctx.Send(KeyValueActorId, new TEvKeyValue::TEvCompleteGC()); Die(ctx); return; } diff --git a/ydb/core/keyvalue/keyvalue_collector_ut.cpp b/ydb/core/keyvalue/keyvalue_collector_ut.cpp index 4c7b59b4af5..e1704e81785 100644 --- a/ydb/core/keyvalue/keyvalue_collector_ut.cpp +++ b/ydb/core/keyvalue/keyvalue_collector_ut.cpp @@ -129,7 +129,7 @@ Y_UNIT_TEST(TestKeyValueCollectorEmpty) { } TAutoPtr<IEventHandle> handle; - auto eraseCollect = context.GrabEvent<TEvKeyValue::TEvEraseCollect>(handle); + auto eraseCollect = context.GrabEvent<TEvKeyValue::TEvCompleteGC>(handle); UNIT_ASSERT(eraseCollect); } @@ -165,7 +165,7 @@ Y_UNIT_TEST(TestKeyValueCollectorSingle) { UNIT_ASSERT(erased == 1); TAutoPtr<IEventHandle> handle; - auto eraseCollect = context.GrabEvent<TEvKeyValue::TEvEraseCollect>(handle); + auto eraseCollect = context.GrabEvent<TEvKeyValue::TEvCompleteGC>(handle); UNIT_ASSERT(eraseCollect); } @@ -221,7 +221,7 @@ Y_UNIT_TEST(TestKeyValueCollectorMultiple) { UNIT_ASSERT(erased == 8); TAutoPtr<IEventHandle> handle; - auto eraseCollect = context.GrabEvent<TEvKeyValue::TEvEraseCollect>(handle); + auto eraseCollect = context.GrabEvent<TEvKeyValue::TEvCompleteGC>(handle); UNIT_ASSERT(eraseCollect); } diff --git a/ydb/core/keyvalue/keyvalue_events.h b/ydb/core/keyvalue/keyvalue_events.h index 2c8c23256a7..d2f7931f8a1 100644 --- a/ydb/core/keyvalue/keyvalue_events.h +++ b/ydb/core/keyvalue/keyvalue_events.h @@ -23,6 +23,7 @@ struct TEvKeyValue { EvPeriodicRefresh, EvReportWriteLatency, EvUpdateWeights, + EvCompleteGC, EvRead = EvRequest + 16, EvReadRange, @@ -193,6 +194,10 @@ struct TEvKeyValue { struct TEvPeriodicRefresh : public TEventLocal<TEvPeriodicRefresh, EvPeriodicRefresh> { TEvPeriodicRefresh() { } }; + + struct TEvCompleteGC : public TEventLocal<TEvCompleteGC, EvCompleteGC> { + TEvCompleteGC() { } + }; }; } // NKikimr diff --git a/ydb/core/keyvalue/keyvalue_flat_impl.h b/ydb/core/keyvalue/keyvalue_flat_impl.h index 7ab1c7cd43c..e0d696da488 100644 --- a/ydb/core/keyvalue/keyvalue_flat_impl.h +++ b/ydb/core/keyvalue/keyvalue_flat_impl.h @@ -204,69 +204,48 @@ protected: } }; - struct TTxStoreCollect : public NTabletFlatExecutor::ITransaction { - TKeyValueFlat *Self; - - TTxStoreCollect(TKeyValueFlat *keyValueFlat) - : Self(keyValueFlat) - {} - - bool Execute(NTabletFlatExecutor::TTransactionContext &txc, const TActorContext &ctx) override { - LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << txc.Tablet << " TTxStoreCollect Execute"); - TSimpleDbFlat db(txc.DB); - Self->State.StoreCollectExecute(db, ctx); - return true; - } + using TExecuteMethod = void (TKeyValueState::*)(ISimpleDb &db, const TActorContext &ctx); + using TCompleteMethod = void (TKeyValueState::*)(const TActorContext &ctx); - void Complete(const TActorContext &ctx) override { - LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << Self->TabletID() - << " TTxStoreCollect Complete"); - Self->State.StoreCollectComplete(ctx); - } - }; - - struct TTxEraseCollect : public NTabletFlatExecutor::ITransaction { + template <typename TDerived, TExecuteMethod ExecuteMethod, TCompleteMethod CompleteMethod> + struct TTxUniversal : NTabletFlatExecutor::ITransaction { TKeyValueFlat *Self; - TTxEraseCollect(TKeyValueFlat *keyValueFlat) + TTxUniversal(TKeyValueFlat *keyValueFlat) : Self(keyValueFlat) {} bool Execute(NTabletFlatExecutor::TTransactionContext &txc, const TActorContext &ctx) override { - LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << txc.Tablet << " TTxEraseCollect Execute"); + LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << txc.Tablet << ' ' << TDerived::Name << " Execute"); TSimpleDbFlat db(txc.DB); - Self->State.EraseCollectExecute(db, ctx); + (Self->State.*ExecuteMethod)(db, ctx); return true; } void Complete(const TActorContext &ctx) override { LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << Self->TabletID() - << " TTxEraseCollect Complete"); - Self->State.EraseCollectComplete(ctx); + << ' ' << TDerived::Name << " Complete"); + (Self->State.*CompleteMethod)(ctx); } }; +#ifdef KV_SIMPLE_TX +#error "KV_SIMPLE_TX already exist" +#else +#define KV_SIMPLE_TX(name) \ + struct TTx ## name : public TTxUniversal<TTx ## name, \ + &TKeyValueState:: name ## Execute, \ + &TKeyValueState:: name ## Complete> \ + { \ + static constexpr auto Name = "TTx" #name; \ + using TTxUniversal::TTxUniversal; \ + } +#endif - struct TTxRegisterInitialGCCompletion : public NTabletFlatExecutor::ITransaction { - TKeyValueFlat *Self; - - TTxRegisterInitialGCCompletion(TKeyValueFlat *keyValueFlat) - : Self(keyValueFlat) - {} - - bool Execute(NTabletFlatExecutor::TTransactionContext &txc, const TActorContext &ctx) override { - LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << txc.Tablet << " TTxRegisterInitialGCCompletion Execute"); - TSimpleDbFlat db(txc.DB); - Self->State.RegisterInitialGCCompletionExecute(db, ctx); - return true; - } - - void Complete(const TActorContext &ctx) override { - LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << Self->TabletID() - << " TTxRegisterInitialGCCompletion Complete"); - Self->State.RegisterInitialGCCompletionComplete(ctx); - } - }; + KV_SIMPLE_TX(StoreCollect); + KV_SIMPLE_TX(EraseCollect); + KV_SIMPLE_TX(RegisterInitialGCCompletion); + KV_SIMPLE_TX(CompleteGC); TKeyValueState State; TDeque<TAutoPtr<IEventHandle>> InitialEventsQueue; @@ -332,6 +311,13 @@ protected: Execute(new TTxEraseCollect(this), ctx); } + void Handle(TEvKeyValue::TEvCompleteGC::TPtr &ev, const TActorContext &ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletID() + << " Handle TEvCompleteGC " << ev->Get()->ToString()); + State.OnEvCompleteGC(); + Execute(new TTxCompleteGC(this), ctx); + } + void Handle(TEvKeyValue::TEvCollect::TPtr &ev, const TActorContext &ctx) { LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletID() << " Handle TEvCollect " << ev->Get()->ToString()); @@ -522,6 +508,7 @@ public: hFunc(TEvKeyValue::TEvAcquireLock, Handle); HFunc(TEvKeyValue::TEvEraseCollect, Handle); + HFunc(TEvKeyValue::TEvCompleteGC, Handle); HFunc(TEvKeyValue::TEvCollect, Handle); HFunc(TEvKeyValue::TEvStoreCollect, Handle); HFunc(TEvKeyValue::TEvRequest, Handle); diff --git a/ydb/core/keyvalue/keyvalue_state.h b/ydb/core/keyvalue/keyvalue_state.h index a14dee8f233..418be9d2a54 100644 --- a/ydb/core/keyvalue/keyvalue_state.h +++ b/ydb/core/keyvalue/keyvalue_state.h @@ -330,16 +330,22 @@ public: // garbage collection methods void PrepareCollectIfNeeded(const TActorContext &ctx); + void UpdateGC(ISimpleDb &db, const TActorContext &ctx, bool updateTrash, bool updateState); void StoreCollectExecute(ISimpleDb &db, const TActorContext &ctx); void StoreCollectComplete(const TActorContext &ctx); void EraseCollectExecute(ISimpleDb &db, const TActorContext &ctx); void EraseCollectComplete(const TActorContext &ctx); + void CompleteGCExecute(ISimpleDb &db, const TActorContext &ctx); + void CompleteGCComplete(const TActorContext &ctx); void SendStoreCollect(const TActorContext &ctx, const THelpers::TGenerationStep &genStep, TVector<TLogoBlobID> &keep, TVector<TLogoBlobID> &doNotKeep); + void StartGC(const TActorContext &ctx, const THelpers::TGenerationStep &genStep, + TVector<TLogoBlobID> &keep, TVector<TLogoBlobID> &doNotKeep); void StartCollectingIfPossible(const TActorContext &ctx); ui64 OnEvCollect(const TActorContext &ctx); void OnEvCollectDone(ui64 perGenerationCounterStepSize, const TActorContext &ctx); void OnEvEraseCollect(const TActorContext &ctx); + void OnEvCompleteGC(); void Reply(THolder<TIntermediate> &intermediate, const TActorContext &ctx, const TTabletStorageInfo *info); void ProcessCmd(TIntermediate::TRead &read, diff --git a/ydb/core/keyvalue/keyvalue_state_collect.cpp b/ydb/core/keyvalue/keyvalue_state_collect.cpp index 248d62fd3e6..8458532b293 100644 --- a/ydb/core/keyvalue/keyvalue_state_collect.cpp +++ b/ydb/core/keyvalue/keyvalue_state_collect.cpp @@ -48,6 +48,50 @@ void TKeyValueState::PrepareCollectIfNeeded(const TActorContext &ctx) { StartCollectingIfPossible(ctx); } + +void TKeyValueState::UpdateGC(ISimpleDb &db, const TActorContext &ctx, bool updateTrash, bool updateState) { + if (IsDamaged) { + return; + } + Y_VERIFY(CollectOperation); + + ui64 collectGeneration = CollectOperation->Header.GetCollectGeneration(); + ui64 collectStep = CollectOperation->Header.GetCollectStep(); + + if (updateTrash) { + ui64 storedCollectGeneration = StoredState.GetCollectGeneration(); + ui64 storedCollectStep = StoredState.GetCollectStep(); + + for (TLogoBlobID &id: CollectOperation->DoNotKeep) { + THelpers::DbEraseTrash(id, db, ctx); + ui32 num = Trash.erase(id); + Y_VERIFY(num == 1); + CountTrashCollected(id.BlobSize()); + } + + // remove trash entries that were not marked as 'Keep' before, but which are automatically deleted by this barrier + // to prevent them from being added to 'DoNotKeep' list after + for (auto it = Trash.begin(); it != Trash.end(); ) { + THelpers::TGenerationStep trashGenStep = THelpers::GenerationStep(*it); + bool afterStoredSoftBarrier = trashGenStep > THelpers::TGenerationStep(storedCollectGeneration, storedCollectStep); + bool beforeSoftBarrier = trashGenStep <= THelpers::TGenerationStep(collectGeneration, collectStep); + if (afterStoredSoftBarrier && beforeSoftBarrier) { + CountTrashCollected(it->BlobSize()); + THelpers::DbEraseTrash(*it, db, ctx); + it = Trash.erase(it); + } else { + ++it; + } + } + } + + if (updateState) { + StoredState.SetCollectGeneration(collectGeneration); + StoredState.SetCollectStep(collectStep); + THelpers::DbUpdateState(StoredState, db, ctx); + } +} + void TKeyValueState::StoreCollectExecute(ISimpleDb &db, const TActorContext &ctx) { LOG_TRACE_S(ctx, NKikimrServices::KEYVALUE, "StoreCollectExecute KeyValue# " << TabletId << " IsDamaged# " << IsDamaged << " Marker# KV62"); @@ -66,30 +110,7 @@ void TKeyValueState::StoreCollectExecute(ISimpleDb &db, const TActorContext &ctx CollectOperation->DoNotKeep, db, ctx); - // We don't need to keep the trash list anymore - for (TLogoBlobID &id: CollectOperation->DoNotKeep) { - THelpers::DbEraseTrash(id, db, ctx); - const ui32 num = Trash.erase(id); - Y_VERIFY(num == 1); - CountTrashCollected(id.BlobSize()); - } - - // remove trash entries that were not marked as 'Keep' before, but which are automatically deleted by this barrier - // to prevent them from being added to 'DoNotKeep' list after - for (auto it = Trash.begin(); it != Trash.end(); ) { - const THelpers::TGenerationStep trashGenStep = THelpers::GenerationStep(*it); - ui64 storedCollectGeneration = StoredState.GetCollectGeneration(); - ui64 storedCollectStep = StoredState.GetCollectStep(); - const bool a = trashGenStep > THelpers::TGenerationStep(storedCollectGeneration, storedCollectStep); - const bool b = trashGenStep <= THelpers::TGenerationStep(collectGen, collectStep); - if (a && b) { - CountTrashCollected(it->BlobSize()); - THelpers::DbEraseTrash(*it, db, ctx); - it = Trash.erase(it); - } else { - ++it; - } - } + UpdateGC(db, ctx, true, false); } void TKeyValueState::StoreCollectComplete(const TActorContext &ctx) { @@ -102,18 +123,15 @@ void TKeyValueState::EraseCollectExecute(ISimpleDb &db, const TActorContext &ctx if (IsDamaged) { return; } - Y_VERIFY(CollectOperation.Get()); + Y_VERIFY(CollectOperation); // Erase the collect operation THelpers::DbEraseCollect(db, ctx); // Update the state - StoredState.SetCollectGeneration(CollectOperation->Header.GetCollectGeneration()); - StoredState.SetCollectStep(CollectOperation->Header.GetCollectStep()); - THelpers::DbUpdateState(StoredState, db, ctx); + UpdateGC(db, ctx, false, true); } void TKeyValueState::EraseCollectComplete(const TActorContext &ctx) { - Y_UNUSED(ctx); - Y_VERIFY(CollectOperation.Get()); + Y_VERIFY(CollectOperation); CollectOperation.Reset(nullptr); IsCollectEventSent = false; @@ -121,6 +139,17 @@ void TKeyValueState::EraseCollectComplete(const TActorContext &ctx) { PrepareCollectIfNeeded(ctx); } +void TKeyValueState::CompleteGCExecute(ISimpleDb &db, const TActorContext &ctx) { + UpdateGC(db, ctx, true, true); +} + +void TKeyValueState::CompleteGCComplete(const TActorContext &ctx) { + Y_VERIFY(CollectOperation); + CollectOperation.Reset(nullptr); + IsCollectEventSent = false; + PrepareCollectIfNeeded(ctx); +} + // Prepare the completely new full collect operation with the same gen/step, but with correct keep & doNotKeep lists void TKeyValueState::SendStoreCollect(const TActorContext &ctx, const THelpers::TGenerationStep &genStep, TVector<TLogoBlobID> &keep, TVector<TLogoBlobID> &doNotKeep) { @@ -130,6 +159,15 @@ void TKeyValueState::SendStoreCollect(const TActorContext &ctx, const THelpers:: ctx.Send(KeyValueActorId, new TEvKeyValue::TEvStoreCollect()); } +void TKeyValueState::StartGC(const TActorContext &ctx, const THelpers::TGenerationStep &genStep, + TVector<TLogoBlobID> &keep, TVector<TLogoBlobID> &doNotKeep) +{ + ui32 generation, step; + std::tie(generation, step) = genStep; + CollectOperation.Reset(new TCollectOperation(generation, step, std::move(keep), std::move(doNotKeep))); + ctx.Send(KeyValueActorId, new TEvKeyValue::TEvCollect()); +} + void TKeyValueState::StartCollectingIfPossible(const TActorContext &ctx) { LOG_TRACE_S(ctx, NKikimrServices::KEYVALUE, "StartCollectingIfPossible KeyValue# " << TabletId << " IsCollectEventSent# " << IsCollectEventSent << " Marker# KV64"); @@ -188,7 +226,7 @@ void TKeyValueState::StartCollectingIfPossible(const TActorContext &ctx) { LOG_TRACE_S(ctx, NKikimrServices::KEYVALUE, "StartCollectingIfPossible KeyValue# " << TabletId << "Flags Keep.Size# " << keep.size() << " DoNotKeep.Size# " << doNotKeep.size() << " Marker# KV65"); - SendStoreCollect(ctx, collectGenStep, keep, doNotKeep); + StartGC(ctx, collectGenStep, keep, doNotKeep); } ui64 TKeyValueState::OnEvCollect(const TActorContext &ctx) { @@ -215,6 +253,10 @@ void TKeyValueState::OnEvEraseCollect(const TActorContext &ctx) { CountLatencyBsCollect(); } +void TKeyValueState::OnEvCompleteGC() { + CountLatencyBsCollect(); +} + } // NKeyValue } // NKikimr diff --git a/ydb/core/keyvalue/keyvalue_ut.cpp b/ydb/core/keyvalue/keyvalue_ut.cpp index 2b5eb69c82a..ca3089ac351 100644 --- a/ydb/core/keyvalue/keyvalue_ut.cpp +++ b/ydb/core/keyvalue/keyvalue_ut.cpp @@ -989,13 +989,13 @@ Y_UNIT_TEST(TestWriteReadDeleteWithRestartsAndCatchCollectGarbageEvents) { tc.Prepare(INITIAL_TEST_DISPATCH_NAME, setup, activeZone); ExecuteWrite(tc, {{"key", "value"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME); tc.Runtime->Send(new IEventHandle(*tabletActor, *tabletActor, new TKikimrEvents::TEvPoisonPill)); + TestLog("After the first death"); ExecuteWrite(tc, {{"key1", "value1"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME); ExecuteWrite(tc, {{"key2", "value2"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME); ExecuteRead(tc, "key", "value", 0, 0, 0); + TestLog("Before delete range"); ExecuteDeleteRange(tc, "key", EBorderKind::Include, "key", EBorderKind::Include, 0); - TDispatchOptions options; - options.FinalEvents.push_back(TKikimrEvents::TEvPoisonPill::EventType); - tc.Runtime->DispatchEvents(options); + TestLog("After delete range"); ExecuteRead<NKikimrKeyValue::Statuses::RSTATUS_NOT_FOUND>(tc, "key", "", 0, 0, 0); ExecuteRead(tc, "key1", "value1", 0, 0, 0); } @@ -1050,10 +1050,6 @@ Y_UNIT_TEST(TestWriteReadDeleteWithRestartsAndCatchCollectGarbageEventsWithSlowI ExecuteWrite(tc, {{"key2", "value2"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME); ExecuteRead(tc, "key", "value", 0, 0, 0); ExecuteDeleteRange(tc, "key", EBorderKind::Include, "key", EBorderKind::Include, 0); - TDispatchOptions options; - options.FinalEvents.push_back(TKikimrEvents::TEvPoisonPill::EventType); - TestLog("First dispatch"); - tc.Runtime->DispatchEvents(options); ExecuteWrite(tc, {{"key3", "value2"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME); @@ -1079,7 +1075,7 @@ Y_UNIT_TEST(TestWriteReadDeleteWithRestartsAndCatchCollectGarbageEventsWithSlowI TestLog("Third dispatch ", collectStep); UNIT_ASSERT_VALUES_EQUAL(collectStep, 2); - + ExecuteWrite(tc, {{"key4", "value2"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME); ExecuteWrite(tc, {{"key5", "value2"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME); ExecuteWrite(tc, {{"key6", "value2"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME); @@ -2066,7 +2062,7 @@ Y_UNIT_TEST(TestRenameWorks) { } -Y_UNIT_TEST(TestRenameWorksewApi) { +Y_UNIT_TEST(TestRenameWorksNewApi) { TTestContext tc; RunTestWithReboots(tc.TabletIds, [&]() { return tc.InitialEventsFilter.Prepare(); |