summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksandr Kriukov <[email protected]>2022-06-02 16:35:44 +0300
committerAleksandr Kriukov <[email protected]>2022-06-02 16:35:44 +0300
commitc768b09694655b942cfd5bc66db6c8c2ece24f75 (patch)
treec5f11fd149f509551b174b233612de06a427c902
parent57dd11ca4a9bdb907537c585ca65f1e067306824 (diff)
Stop to store collect operation, KIKIMR-14854
ref:66c3b20a58a68348ac2c6827a2bc55918a5cf5af
-rw-r--r--ydb/core/keyvalue/keyvalue_collector.cpp2
-rw-r--r--ydb/core/keyvalue/keyvalue_collector_ut.cpp6
-rw-r--r--ydb/core/keyvalue/keyvalue_events.h5
-rw-r--r--ydb/core/keyvalue/keyvalue_flat_impl.h79
-rw-r--r--ydb/core/keyvalue/keyvalue_state.h6
-rw-r--r--ydb/core/keyvalue/keyvalue_state_collect.cpp104
-rw-r--r--ydb/core/keyvalue/keyvalue_ut.cpp14
7 files changed, 126 insertions, 90 deletions
diff --git a/ydb/core/keyvalue/keyvalue_collector.cpp b/ydb/core/keyvalue/keyvalue_collector.cpp
index 6aa83dfd203..c4d9b120545 100644
--- a/ydb/core/keyvalue/keyvalue_collector.cpp
+++ b/ydb/core/keyvalue/keyvalue_collector.cpp
@@ -91,7 +91,7 @@ public:
CollectorForGroupForChannel.begin());
}
if (CollectorForGroupForChannel.empty()) {
- ctx.Send(KeyValueActorId, new TEvKeyValue::TEvEraseCollect());
+ ctx.Send(KeyValueActorId, new TEvKeyValue::TEvCompleteGC());
Die(ctx);
return;
}
diff --git a/ydb/core/keyvalue/keyvalue_collector_ut.cpp b/ydb/core/keyvalue/keyvalue_collector_ut.cpp
index 4c7b59b4af5..e1704e81785 100644
--- a/ydb/core/keyvalue/keyvalue_collector_ut.cpp
+++ b/ydb/core/keyvalue/keyvalue_collector_ut.cpp
@@ -129,7 +129,7 @@ Y_UNIT_TEST(TestKeyValueCollectorEmpty) {
}
TAutoPtr<IEventHandle> handle;
- auto eraseCollect = context.GrabEvent<TEvKeyValue::TEvEraseCollect>(handle);
+ auto eraseCollect = context.GrabEvent<TEvKeyValue::TEvCompleteGC>(handle);
UNIT_ASSERT(eraseCollect);
}
@@ -165,7 +165,7 @@ Y_UNIT_TEST(TestKeyValueCollectorSingle) {
UNIT_ASSERT(erased == 1);
TAutoPtr<IEventHandle> handle;
- auto eraseCollect = context.GrabEvent<TEvKeyValue::TEvEraseCollect>(handle);
+ auto eraseCollect = context.GrabEvent<TEvKeyValue::TEvCompleteGC>(handle);
UNIT_ASSERT(eraseCollect);
}
@@ -221,7 +221,7 @@ Y_UNIT_TEST(TestKeyValueCollectorMultiple) {
UNIT_ASSERT(erased == 8);
TAutoPtr<IEventHandle> handle;
- auto eraseCollect = context.GrabEvent<TEvKeyValue::TEvEraseCollect>(handle);
+ auto eraseCollect = context.GrabEvent<TEvKeyValue::TEvCompleteGC>(handle);
UNIT_ASSERT(eraseCollect);
}
diff --git a/ydb/core/keyvalue/keyvalue_events.h b/ydb/core/keyvalue/keyvalue_events.h
index 2c8c23256a7..d2f7931f8a1 100644
--- a/ydb/core/keyvalue/keyvalue_events.h
+++ b/ydb/core/keyvalue/keyvalue_events.h
@@ -23,6 +23,7 @@ struct TEvKeyValue {
EvPeriodicRefresh,
EvReportWriteLatency,
EvUpdateWeights,
+ EvCompleteGC,
EvRead = EvRequest + 16,
EvReadRange,
@@ -193,6 +194,10 @@ struct TEvKeyValue {
struct TEvPeriodicRefresh : public TEventLocal<TEvPeriodicRefresh, EvPeriodicRefresh> {
TEvPeriodicRefresh() { }
};
+
+ struct TEvCompleteGC : public TEventLocal<TEvCompleteGC, EvCompleteGC> {
+ TEvCompleteGC() { }
+ };
};
} // NKikimr
diff --git a/ydb/core/keyvalue/keyvalue_flat_impl.h b/ydb/core/keyvalue/keyvalue_flat_impl.h
index 7ab1c7cd43c..e0d696da488 100644
--- a/ydb/core/keyvalue/keyvalue_flat_impl.h
+++ b/ydb/core/keyvalue/keyvalue_flat_impl.h
@@ -204,69 +204,48 @@ protected:
}
};
- struct TTxStoreCollect : public NTabletFlatExecutor::ITransaction {
- TKeyValueFlat *Self;
-
- TTxStoreCollect(TKeyValueFlat *keyValueFlat)
- : Self(keyValueFlat)
- {}
-
- bool Execute(NTabletFlatExecutor::TTransactionContext &txc, const TActorContext &ctx) override {
- LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << txc.Tablet << " TTxStoreCollect Execute");
- TSimpleDbFlat db(txc.DB);
- Self->State.StoreCollectExecute(db, ctx);
- return true;
- }
+ using TExecuteMethod = void (TKeyValueState::*)(ISimpleDb &db, const TActorContext &ctx);
+ using TCompleteMethod = void (TKeyValueState::*)(const TActorContext &ctx);
- void Complete(const TActorContext &ctx) override {
- LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << Self->TabletID()
- << " TTxStoreCollect Complete");
- Self->State.StoreCollectComplete(ctx);
- }
- };
-
- struct TTxEraseCollect : public NTabletFlatExecutor::ITransaction {
+ template <typename TDerived, TExecuteMethod ExecuteMethod, TCompleteMethod CompleteMethod>
+ struct TTxUniversal : NTabletFlatExecutor::ITransaction {
TKeyValueFlat *Self;
- TTxEraseCollect(TKeyValueFlat *keyValueFlat)
+ TTxUniversal(TKeyValueFlat *keyValueFlat)
: Self(keyValueFlat)
{}
bool Execute(NTabletFlatExecutor::TTransactionContext &txc, const TActorContext &ctx) override {
- LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << txc.Tablet << " TTxEraseCollect Execute");
+ LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << txc.Tablet << ' ' << TDerived::Name << " Execute");
TSimpleDbFlat db(txc.DB);
- Self->State.EraseCollectExecute(db, ctx);
+ (Self->State.*ExecuteMethod)(db, ctx);
return true;
}
void Complete(const TActorContext &ctx) override {
LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << Self->TabletID()
- << " TTxEraseCollect Complete");
- Self->State.EraseCollectComplete(ctx);
+ << ' ' << TDerived::Name << " Complete");
+ (Self->State.*CompleteMethod)(ctx);
}
};
+#ifdef KV_SIMPLE_TX
+#error "KV_SIMPLE_TX already exist"
+#else
+#define KV_SIMPLE_TX(name) \
+ struct TTx ## name : public TTxUniversal<TTx ## name, \
+ &TKeyValueState:: name ## Execute, \
+ &TKeyValueState:: name ## Complete> \
+ { \
+ static constexpr auto Name = "TTx" #name; \
+ using TTxUniversal::TTxUniversal; \
+ }
+#endif
- struct TTxRegisterInitialGCCompletion : public NTabletFlatExecutor::ITransaction {
- TKeyValueFlat *Self;
-
- TTxRegisterInitialGCCompletion(TKeyValueFlat *keyValueFlat)
- : Self(keyValueFlat)
- {}
-
- bool Execute(NTabletFlatExecutor::TTransactionContext &txc, const TActorContext &ctx) override {
- LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << txc.Tablet << " TTxRegisterInitialGCCompletion Execute");
- TSimpleDbFlat db(txc.DB);
- Self->State.RegisterInitialGCCompletionExecute(db, ctx);
- return true;
- }
-
- void Complete(const TActorContext &ctx) override {
- LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << Self->TabletID()
- << " TTxRegisterInitialGCCompletion Complete");
- Self->State.RegisterInitialGCCompletionComplete(ctx);
- }
- };
+ KV_SIMPLE_TX(StoreCollect);
+ KV_SIMPLE_TX(EraseCollect);
+ KV_SIMPLE_TX(RegisterInitialGCCompletion);
+ KV_SIMPLE_TX(CompleteGC);
TKeyValueState State;
TDeque<TAutoPtr<IEventHandle>> InitialEventsQueue;
@@ -332,6 +311,13 @@ protected:
Execute(new TTxEraseCollect(this), ctx);
}
+ void Handle(TEvKeyValue::TEvCompleteGC::TPtr &ev, const TActorContext &ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletID()
+ << " Handle TEvCompleteGC " << ev->Get()->ToString());
+ State.OnEvCompleteGC();
+ Execute(new TTxCompleteGC(this), ctx);
+ }
+
void Handle(TEvKeyValue::TEvCollect::TPtr &ev, const TActorContext &ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletID()
<< " Handle TEvCollect " << ev->Get()->ToString());
@@ -522,6 +508,7 @@ public:
hFunc(TEvKeyValue::TEvAcquireLock, Handle);
HFunc(TEvKeyValue::TEvEraseCollect, Handle);
+ HFunc(TEvKeyValue::TEvCompleteGC, Handle);
HFunc(TEvKeyValue::TEvCollect, Handle);
HFunc(TEvKeyValue::TEvStoreCollect, Handle);
HFunc(TEvKeyValue::TEvRequest, Handle);
diff --git a/ydb/core/keyvalue/keyvalue_state.h b/ydb/core/keyvalue/keyvalue_state.h
index a14dee8f233..418be9d2a54 100644
--- a/ydb/core/keyvalue/keyvalue_state.h
+++ b/ydb/core/keyvalue/keyvalue_state.h
@@ -330,16 +330,22 @@ public:
// garbage collection methods
void PrepareCollectIfNeeded(const TActorContext &ctx);
+ void UpdateGC(ISimpleDb &db, const TActorContext &ctx, bool updateTrash, bool updateState);
void StoreCollectExecute(ISimpleDb &db, const TActorContext &ctx);
void StoreCollectComplete(const TActorContext &ctx);
void EraseCollectExecute(ISimpleDb &db, const TActorContext &ctx);
void EraseCollectComplete(const TActorContext &ctx);
+ void CompleteGCExecute(ISimpleDb &db, const TActorContext &ctx);
+ void CompleteGCComplete(const TActorContext &ctx);
void SendStoreCollect(const TActorContext &ctx, const THelpers::TGenerationStep &genStep,
TVector<TLogoBlobID> &keep, TVector<TLogoBlobID> &doNotKeep);
+ void StartGC(const TActorContext &ctx, const THelpers::TGenerationStep &genStep,
+ TVector<TLogoBlobID> &keep, TVector<TLogoBlobID> &doNotKeep);
void StartCollectingIfPossible(const TActorContext &ctx);
ui64 OnEvCollect(const TActorContext &ctx);
void OnEvCollectDone(ui64 perGenerationCounterStepSize, const TActorContext &ctx);
void OnEvEraseCollect(const TActorContext &ctx);
+ void OnEvCompleteGC();
void Reply(THolder<TIntermediate> &intermediate, const TActorContext &ctx, const TTabletStorageInfo *info);
void ProcessCmd(TIntermediate::TRead &read,
diff --git a/ydb/core/keyvalue/keyvalue_state_collect.cpp b/ydb/core/keyvalue/keyvalue_state_collect.cpp
index 248d62fd3e6..8458532b293 100644
--- a/ydb/core/keyvalue/keyvalue_state_collect.cpp
+++ b/ydb/core/keyvalue/keyvalue_state_collect.cpp
@@ -48,6 +48,50 @@ void TKeyValueState::PrepareCollectIfNeeded(const TActorContext &ctx) {
StartCollectingIfPossible(ctx);
}
+
+void TKeyValueState::UpdateGC(ISimpleDb &db, const TActorContext &ctx, bool updateTrash, bool updateState) {
+ if (IsDamaged) {
+ return;
+ }
+ Y_VERIFY(CollectOperation);
+
+ ui64 collectGeneration = CollectOperation->Header.GetCollectGeneration();
+ ui64 collectStep = CollectOperation->Header.GetCollectStep();
+
+ if (updateTrash) {
+ ui64 storedCollectGeneration = StoredState.GetCollectGeneration();
+ ui64 storedCollectStep = StoredState.GetCollectStep();
+
+ for (TLogoBlobID &id: CollectOperation->DoNotKeep) {
+ THelpers::DbEraseTrash(id, db, ctx);
+ ui32 num = Trash.erase(id);
+ Y_VERIFY(num == 1);
+ CountTrashCollected(id.BlobSize());
+ }
+
+ // remove trash entries that were not marked as 'Keep' before, but which are automatically deleted by this barrier
+ // to prevent them from being added to 'DoNotKeep' list after
+ for (auto it = Trash.begin(); it != Trash.end(); ) {
+ THelpers::TGenerationStep trashGenStep = THelpers::GenerationStep(*it);
+ bool afterStoredSoftBarrier = trashGenStep > THelpers::TGenerationStep(storedCollectGeneration, storedCollectStep);
+ bool beforeSoftBarrier = trashGenStep <= THelpers::TGenerationStep(collectGeneration, collectStep);
+ if (afterStoredSoftBarrier && beforeSoftBarrier) {
+ CountTrashCollected(it->BlobSize());
+ THelpers::DbEraseTrash(*it, db, ctx);
+ it = Trash.erase(it);
+ } else {
+ ++it;
+ }
+ }
+ }
+
+ if (updateState) {
+ StoredState.SetCollectGeneration(collectGeneration);
+ StoredState.SetCollectStep(collectStep);
+ THelpers::DbUpdateState(StoredState, db, ctx);
+ }
+}
+
void TKeyValueState::StoreCollectExecute(ISimpleDb &db, const TActorContext &ctx) {
LOG_TRACE_S(ctx, NKikimrServices::KEYVALUE, "StoreCollectExecute KeyValue# " << TabletId
<< " IsDamaged# " << IsDamaged << " Marker# KV62");
@@ -66,30 +110,7 @@ void TKeyValueState::StoreCollectExecute(ISimpleDb &db, const TActorContext &ctx
CollectOperation->DoNotKeep,
db, ctx);
- // We don't need to keep the trash list anymore
- for (TLogoBlobID &id: CollectOperation->DoNotKeep) {
- THelpers::DbEraseTrash(id, db, ctx);
- const ui32 num = Trash.erase(id);
- Y_VERIFY(num == 1);
- CountTrashCollected(id.BlobSize());
- }
-
- // remove trash entries that were not marked as 'Keep' before, but which are automatically deleted by this barrier
- // to prevent them from being added to 'DoNotKeep' list after
- for (auto it = Trash.begin(); it != Trash.end(); ) {
- const THelpers::TGenerationStep trashGenStep = THelpers::GenerationStep(*it);
- ui64 storedCollectGeneration = StoredState.GetCollectGeneration();
- ui64 storedCollectStep = StoredState.GetCollectStep();
- const bool a = trashGenStep > THelpers::TGenerationStep(storedCollectGeneration, storedCollectStep);
- const bool b = trashGenStep <= THelpers::TGenerationStep(collectGen, collectStep);
- if (a && b) {
- CountTrashCollected(it->BlobSize());
- THelpers::DbEraseTrash(*it, db, ctx);
- it = Trash.erase(it);
- } else {
- ++it;
- }
- }
+ UpdateGC(db, ctx, true, false);
}
void TKeyValueState::StoreCollectComplete(const TActorContext &ctx) {
@@ -102,18 +123,15 @@ void TKeyValueState::EraseCollectExecute(ISimpleDb &db, const TActorContext &ctx
if (IsDamaged) {
return;
}
- Y_VERIFY(CollectOperation.Get());
+ Y_VERIFY(CollectOperation);
// Erase the collect operation
THelpers::DbEraseCollect(db, ctx);
// Update the state
- StoredState.SetCollectGeneration(CollectOperation->Header.GetCollectGeneration());
- StoredState.SetCollectStep(CollectOperation->Header.GetCollectStep());
- THelpers::DbUpdateState(StoredState, db, ctx);
+ UpdateGC(db, ctx, false, true);
}
void TKeyValueState::EraseCollectComplete(const TActorContext &ctx) {
- Y_UNUSED(ctx);
- Y_VERIFY(CollectOperation.Get());
+ Y_VERIFY(CollectOperation);
CollectOperation.Reset(nullptr);
IsCollectEventSent = false;
@@ -121,6 +139,17 @@ void TKeyValueState::EraseCollectComplete(const TActorContext &ctx) {
PrepareCollectIfNeeded(ctx);
}
+void TKeyValueState::CompleteGCExecute(ISimpleDb &db, const TActorContext &ctx) {
+ UpdateGC(db, ctx, true, true);
+}
+
+void TKeyValueState::CompleteGCComplete(const TActorContext &ctx) {
+ Y_VERIFY(CollectOperation);
+ CollectOperation.Reset(nullptr);
+ IsCollectEventSent = false;
+ PrepareCollectIfNeeded(ctx);
+}
+
// Prepare the completely new full collect operation with the same gen/step, but with correct keep & doNotKeep lists
void TKeyValueState::SendStoreCollect(const TActorContext &ctx, const THelpers::TGenerationStep &genStep,
TVector<TLogoBlobID> &keep, TVector<TLogoBlobID> &doNotKeep) {
@@ -130,6 +159,15 @@ void TKeyValueState::SendStoreCollect(const TActorContext &ctx, const THelpers::
ctx.Send(KeyValueActorId, new TEvKeyValue::TEvStoreCollect());
}
+void TKeyValueState::StartGC(const TActorContext &ctx, const THelpers::TGenerationStep &genStep,
+ TVector<TLogoBlobID> &keep, TVector<TLogoBlobID> &doNotKeep)
+{
+ ui32 generation, step;
+ std::tie(generation, step) = genStep;
+ CollectOperation.Reset(new TCollectOperation(generation, step, std::move(keep), std::move(doNotKeep)));
+ ctx.Send(KeyValueActorId, new TEvKeyValue::TEvCollect());
+}
+
void TKeyValueState::StartCollectingIfPossible(const TActorContext &ctx) {
LOG_TRACE_S(ctx, NKikimrServices::KEYVALUE, "StartCollectingIfPossible KeyValue# " << TabletId
<< " IsCollectEventSent# " << IsCollectEventSent << " Marker# KV64");
@@ -188,7 +226,7 @@ void TKeyValueState::StartCollectingIfPossible(const TActorContext &ctx) {
LOG_TRACE_S(ctx, NKikimrServices::KEYVALUE, "StartCollectingIfPossible KeyValue# " << TabletId
<< "Flags Keep.Size# " << keep.size() << " DoNotKeep.Size# " << doNotKeep.size() << " Marker# KV65");
- SendStoreCollect(ctx, collectGenStep, keep, doNotKeep);
+ StartGC(ctx, collectGenStep, keep, doNotKeep);
}
ui64 TKeyValueState::OnEvCollect(const TActorContext &ctx) {
@@ -215,6 +253,10 @@ void TKeyValueState::OnEvEraseCollect(const TActorContext &ctx) {
CountLatencyBsCollect();
}
+void TKeyValueState::OnEvCompleteGC() {
+ CountLatencyBsCollect();
+}
+
} // NKeyValue
} // NKikimr
diff --git a/ydb/core/keyvalue/keyvalue_ut.cpp b/ydb/core/keyvalue/keyvalue_ut.cpp
index 2b5eb69c82a..ca3089ac351 100644
--- a/ydb/core/keyvalue/keyvalue_ut.cpp
+++ b/ydb/core/keyvalue/keyvalue_ut.cpp
@@ -989,13 +989,13 @@ Y_UNIT_TEST(TestWriteReadDeleteWithRestartsAndCatchCollectGarbageEvents) {
tc.Prepare(INITIAL_TEST_DISPATCH_NAME, setup, activeZone);
ExecuteWrite(tc, {{"key", "value"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME);
tc.Runtime->Send(new IEventHandle(*tabletActor, *tabletActor, new TKikimrEvents::TEvPoisonPill));
+ TestLog("After the first death");
ExecuteWrite(tc, {{"key1", "value1"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME);
ExecuteWrite(tc, {{"key2", "value2"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME);
ExecuteRead(tc, "key", "value", 0, 0, 0);
+ TestLog("Before delete range");
ExecuteDeleteRange(tc, "key", EBorderKind::Include, "key", EBorderKind::Include, 0);
- TDispatchOptions options;
- options.FinalEvents.push_back(TKikimrEvents::TEvPoisonPill::EventType);
- tc.Runtime->DispatchEvents(options);
+ TestLog("After delete range");
ExecuteRead<NKikimrKeyValue::Statuses::RSTATUS_NOT_FOUND>(tc, "key", "", 0, 0, 0);
ExecuteRead(tc, "key1", "value1", 0, 0, 0);
}
@@ -1050,10 +1050,6 @@ Y_UNIT_TEST(TestWriteReadDeleteWithRestartsAndCatchCollectGarbageEventsWithSlowI
ExecuteWrite(tc, {{"key2", "value2"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME);
ExecuteRead(tc, "key", "value", 0, 0, 0);
ExecuteDeleteRange(tc, "key", EBorderKind::Include, "key", EBorderKind::Include, 0);
- TDispatchOptions options;
- options.FinalEvents.push_back(TKikimrEvents::TEvPoisonPill::EventType);
- TestLog("First dispatch");
- tc.Runtime->DispatchEvents(options);
ExecuteWrite(tc, {{"key3", "value2"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME);
@@ -1079,7 +1075,7 @@ Y_UNIT_TEST(TestWriteReadDeleteWithRestartsAndCatchCollectGarbageEventsWithSlowI
TestLog("Third dispatch ", collectStep);
UNIT_ASSERT_VALUES_EQUAL(collectStep, 2);
-
+
ExecuteWrite(tc, {{"key4", "value2"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME);
ExecuteWrite(tc, {{"key5", "value2"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME);
ExecuteWrite(tc, {{"key6", "value2"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME);
@@ -2066,7 +2062,7 @@ Y_UNIT_TEST(TestRenameWorks) {
}
-Y_UNIT_TEST(TestRenameWorksewApi) {
+Y_UNIT_TEST(TestRenameWorksNewApi) {
TTestContext tc;
RunTestWithReboots(tc.TabletIds, [&]() {
return tc.InitialEventsFilter.Prepare();