diff options
author | Aleksandr Kriukov <kru.bash.all@gmail.com> | 2022-05-05 13:59:07 +0300 |
---|---|---|
committer | Aleksandr Kriukov <kru.bash.all@gmail.com> | 2022-05-05 13:59:07 +0300 |
commit | da89cd9e274efbfa5515ef1ebf67b617c4ef47ba (patch) | |
tree | a72a49318ef2b9c53f1f8111f69c0471ac2fb49a | |
parent | ef5630e88d8bf4b5ecc3854c7dc8a802adbad1a5 (diff) | |
download | ydb-da89cd9e274efbfa5515ef1ebf67b617c4ef47ba.tar.gz |
Move blobs from collect operation to trash during initialization, KIKIMR-14854
ref:b0023d2f62ef703b753fa82b98475abf22c0028c
-rw-r--r-- | ydb/core/keyvalue/keyvalue_flat_impl.h | 54 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_state.cpp | 84 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_state.h | 4 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_ut.cpp | 26 |
4 files changed, 113 insertions, 55 deletions
diff --git a/ydb/core/keyvalue/keyvalue_flat_impl.h b/ydb/core/keyvalue/keyvalue_flat_impl.h index 859e07d4de9..7ab1c7cd43c 100644 --- a/ydb/core/keyvalue/keyvalue_flat_impl.h +++ b/ydb/core/keyvalue/keyvalue_flat_impl.h @@ -246,6 +246,28 @@ protected: } }; + + struct TTxRegisterInitialGCCompletion : public NTabletFlatExecutor::ITransaction { + TKeyValueFlat *Self; + + TTxRegisterInitialGCCompletion(TKeyValueFlat *keyValueFlat) + : Self(keyValueFlat) + {} + + bool Execute(NTabletFlatExecutor::TTransactionContext &txc, const TActorContext &ctx) override { + LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << txc.Tablet << " TTxRegisterInitialGCCompletion Execute"); + TSimpleDbFlat db(txc.DB); + Self->State.RegisterInitialGCCompletionExecute(db, ctx); + return true; + } + + void Complete(const TActorContext &ctx) override { + LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << Self->TabletID() + << " TTxRegisterInitialGCCompletion Complete"); + Self->State.RegisterInitialGCCompletionComplete(ctx); + } + }; + TKeyValueState State; TDeque<TAutoPtr<IEventHandle>> InitialEventsQueue; TActorId CollectorActorId; @@ -373,23 +395,27 @@ protected: return; } - if (ev->Cookie == (ui64)TKeyValueState::ECollectCookie::SoftInitial) { - NKikimrProto::EReplyStatus status = ev->Get()->Status; - if (status == NKikimrProto::OK) { - State.RegisterInitialCollectResult(ctx); - } else { - LOG_ERROR_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletID() - << " received not ok TEvCollectGarbageResult" - << " Status# " << NKikimrProto::EReplyStatus_Name(status) - << " ErrorReason# " << ev->Get()->ErrorReason); - Send(SelfId(), new TKikimrEvents::TEvPoisonPill); - } + if (ev->Cookie != (ui64)TKeyValueState::ECollectCookie::SoftInitial) { + LOG_CRIT_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletID() + << " received TEvCollectGarbageResult with unexpected Cookie# " << ev->Cookie); + Send(SelfId(), new TKikimrEvents::TEvPoisonPill); return; } - LOG_CRIT_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletID() - << " received TEvCollectGarbageResult with unexpected Cookie# " << ev->Cookie); - Send(SelfId(), new TKikimrEvents::TEvPoisonPill); + NKikimrProto::EReplyStatus status = ev->Get()->Status; + if (status != NKikimrProto::OK) { + LOG_ERROR_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletID() + << " received not ok TEvCollectGarbageResult" + << " Status# " << NKikimrProto::EReplyStatus_Name(status) + << " ErrorReason# " << ev->Get()->ErrorReason); + Send(SelfId(), new TKikimrEvents::TEvPoisonPill); + return; + } + + bool isLast = State.RegisterInitialCollectResult(ctx); + if (isLast) { + Execute(new TTxRegisterInitialGCCompletion(this)); + } } void Handle(TEvKeyValue::TEvRequest::TPtr ev, const TActorContext &ctx) { diff --git a/ydb/core/keyvalue/keyvalue_state.cpp b/ydb/core/keyvalue/keyvalue_state.cpp index 96c00e8c0be..997fd31d1bc 100644 --- a/ydb/core/keyvalue/keyvalue_state.cpp +++ b/ydb/core/keyvalue/keyvalue_state.cpp @@ -575,6 +575,19 @@ void TKeyValueState::InitExecute(ui64 tabletId, TActorId keyValueActorId, ui32 e THashMap<TGroupChannel, THolder<TVector<TLogoBlobID>>> keepForGroupChannel; const ui32 barrierGeneration = executorGeneration - 1; const ui32 barrierStep = Max<ui32>(); + + auto addBlobToKeep = [&] (const TLogoBlobID &id) { + ui32 group = info->GroupFor(id.Channel(), id.Generation()); + Y_VERIFY(group != Max<ui32>(), "RefBlob# %s is mapped to an invalid group (-1)!", + id.ToString().c_str()); + TGroupChannel key(group, id.Channel()); + THolder<TVector<TLogoBlobID>> &ptr = keepForGroupChannel[key]; + if (!ptr) { + ptr = MakeHolder<TVector<TLogoBlobID>>(); + } + ptr->emplace_back(id); + }; + for (const auto &refInfo : RefCounts) { // Extract blob id and validate its channel const TLogoBlobID &id = refInfo.first; @@ -584,19 +597,16 @@ void TKeyValueState::InitExecute(ui64 tabletId, TActorId keyValueActorId, ui32 e const THelpers::TGenerationStep storedGenStep(StoredState.GetCollectGeneration(), StoredState.GetCollectStep()); // Mark with keep flag only new blobs if (storedGenStep < blobGenStep) { - const ui32 group = info->GroupFor(id.Channel(), id.Generation()); - Y_VERIFY(group != Max<ui32>(), "RefBlob# %s is mapped to an invalid group (-1)!", - id.ToString().c_str()); - const TGroupChannel key(group, id.Channel()); - auto it = keepForGroupChannel.find(key); - if (it == keepForGroupChannel.end()) { - bool isInserted = false; - std::tie(it, isInserted) = keepForGroupChannel.emplace(key, MakeHolder<TVector<TLogoBlobID>>()); - Y_VERIFY(isInserted); - } - it->second->emplace_back(id); + addBlobToKeep(id); } } + + if (CollectOperation) { + for (const TLogoBlobID &id : CollectOperation->Keep) { + addBlobToKeep(id); + } + } + for (const auto &channelInfo : info->Channels) { if (channelInfo.Channel < BLOB_CHANNEL) { continue; @@ -676,7 +686,7 @@ void TKeyValueState::InitExecute(ui64 tabletId, TActorId keyValueActorId, ui32 e ChannelRangeSets[id.Channel()].Remove(id.Generation()); } } - if (CollectOperation.Get()) { + if (CollectOperation) { auto &keep = CollectOperation->Keep; for (auto it = keep.begin(); it != keep.end(); ++it) { const TLogoBlobID &id = *it; @@ -696,38 +706,52 @@ void TKeyValueState::InitExecute(ui64 tabletId, TActorId keyValueActorId, ui32 e CollectOperation->Header.SetCollectGeneration(ExecutorGeneration); CollectOperation->Header.SetCollectStep(0); } + + if (CollectOperation) { + for (const TLogoBlobID &id : CollectOperation->DoNotKeep) { + Trash.insert(id); + THelpers::DbUpdateTrash(id, db, ctx); + } + THelpers::DbEraseCollect(db, ctx); + CollectOperation = nullptr; + } + + THelpers::DbUpdateState(StoredState, db, ctx); - IsCollectEventSent = true; // corner case, if no CollectGarbage events were sent if (InitialCollectsSent == 0) { SendCutHistory(ctx); - if (CollectOperation.Get()) { - // finish collect operation from local base - StoreCollectComplete(ctx); - } else { - // initiate collection if trash was loaded from local base - IsCollectEventSent = false; - PrepareCollectIfNeeded(ctx); - } + RegisterInitialGCCompletionComplete(ctx); + } else { + IsCollectEventSent = true; } } -void TKeyValueState::RegisterInitialCollectResult(const TActorContext &ctx) { + + +bool TKeyValueState::RegisterInitialCollectResult(const TActorContext &ctx) { LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId << " InitialCollectsSent# " << InitialCollectsSent << " Marker# KV50"); if (--InitialCollectsSent == 0) { SendCutHistory(ctx); - if (CollectOperation.Get()) { - // finish collect operation from local base - StoreCollectComplete(ctx); - } else { - IsCollectEventSent = false; - // initiate collection if trash was loaded from local base - PrepareCollectIfNeeded(ctx); - } + return true; } + return false; +} + + +void TKeyValueState::RegisterInitialGCCompletionExecute(ISimpleDb &db, const TActorContext &ctx) { + StoredState.SetCollectGeneration(ExecutorGeneration); + StoredState.SetCollectStep(0); + THelpers::DbUpdateState(StoredState, db, ctx); +} + +void TKeyValueState::RegisterInitialGCCompletionComplete(const TActorContext &ctx) { + IsCollectEventSent = false; + // initiate collection if trash was loaded from local base + PrepareCollectIfNeeded(ctx); } void TKeyValueState::SendCutHistory(const TActorContext &ctx) { diff --git a/ydb/core/keyvalue/keyvalue_state.h b/ydb/core/keyvalue/keyvalue_state.h index ffdc1f72504..a14dee8f233 100644 --- a/ydb/core/keyvalue/keyvalue_state.h +++ b/ydb/core/keyvalue/keyvalue_state.h @@ -318,7 +318,9 @@ public: void Load(const TString &key, const TString& value); void InitExecute(ui64 tabletId, TActorId keyValueActorId, ui32 executorGeneration, ISimpleDb &db, const TActorContext &ctx, const TTabletStorageInfo *info); - void RegisterInitialCollectResult(const TActorContext &ctx); + bool RegisterInitialCollectResult(const TActorContext &ctx); + void RegisterInitialGCCompletionExecute(ISimpleDb &db, const TActorContext &ctx); + void RegisterInitialGCCompletionComplete(const TActorContext &ctx); void SendCutHistory(const TActorContext &ctx); void OnInitQueueEmpty(const TActorContext &ctx); void OnStateWork(const TActorContext &ctx); diff --git a/ydb/core/keyvalue/keyvalue_ut.cpp b/ydb/core/keyvalue/keyvalue_ut.cpp index a11c2f6b16c..021e3353bdf 100644 --- a/ydb/core/keyvalue/keyvalue_ut.cpp +++ b/ydb/core/keyvalue/keyvalue_ut.cpp @@ -1,5 +1,6 @@ #include "defs.h" #include "keyvalue.h" +#include "keyvalue_flat_impl.h" #include "keyvalue_state.h" #include <ydb/public/lib/base/msgbus.h> #include <ydb/core/testlib/tablet_helpers.h> @@ -7,13 +8,14 @@ #include <util/random/fast.h> const bool ENABLE_DETAILED_KV_LOG = false; +const bool ENABLE_TESTLOG_OUTPUT = false; namespace NKikimr { namespace { template <typename... TArgs> void TestLog(TArgs&&... args) { - if constexpr (ENABLE_DETAILED_KV_LOG) { + if constexpr (ENABLE_TESTLOG_OUTPUT || ENABLE_DETAILED_KV_LOG) { Cerr << ((TStringBuilder() << ... << args) << Endl); } } @@ -84,7 +86,7 @@ struct TTestContext { Y_UNUSED(dispatchName); outActiveZone = false; Runtime.Reset(new TTestBasicRuntime); - Runtime->SetScheduledLimit(100); + Runtime->SetScheduledLimit(200); Runtime->SetLogPriority(NKikimrServices::KEYVALUE, NLog::PRI_DEBUG); SetupLogging(*Runtime); SetupTabletServices(*Runtime); @@ -1012,15 +1014,15 @@ Y_UNIT_TEST(TestWriteReadDeleteWithRestartsAndCatchCollectGarbageEventsWithSlowI if (tabletActor && *tabletActor == event->Recipient && event->GetTypeRewrite() == TEvBlobStorage::TEvCollectGarbageResult::EventType) { if (collectStep == 2) { savedInitialEvents.push(event); + TestLog("Event drop; saved intial GCresult"); return TTestActorRuntime::EEventAction::DROP; } } - if (tabletActor && *tabletActor == event->Sender && event->GetTypeRewrite() == TEvBlobStorage::TEvCollectGarbage::EventType) { - } if (tabletActor && *tabletActor == event->Recipient && event->GetTypeRewrite() == TEvKeyValue::TEvCollect::EventType) { switch (collectStep++) { case 1: { runtime.Send(new IEventHandle(event->Recipient, event->Recipient, new TKikimrEvents::TEvPoisonPill)); + TestLog("Event drop; Collect; Tablet was poisoned"); return TTestActorRuntime::EEventAction::DROP; } } @@ -1050,6 +1052,7 @@ Y_UNIT_TEST(TestWriteReadDeleteWithRestartsAndCatchCollectGarbageEventsWithSlowI ExecuteDeleteRange(tc, "key", EBorderKind::Include, "key", EBorderKind::Include, 0); TDispatchOptions options; options.FinalEvents.push_back(TKikimrEvents::TEvPoisonPill::EventType); + TestLog("First dispatch"); tc.Runtime->DispatchEvents(options); ExecuteWrite(tc, {{"key3", "value2"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME); @@ -1059,9 +1062,11 @@ Y_UNIT_TEST(TestWriteReadDeleteWithRestartsAndCatchCollectGarbageEventsWithSlowI bool onlyOneCollect = false; try { + TestLog("Second dispatch"); tc.Runtime->DispatchEvents(options2); } catch (NActors::TSchedulingLimitReachedException) { onlyOneCollect = true; + TestLog("Exception was catch"); } while (savedInitialEvents.size()) { @@ -1072,13 +1077,14 @@ Y_UNIT_TEST(TestWriteReadDeleteWithRestartsAndCatchCollectGarbageEventsWithSlowI TDispatchOptions options3; options3.FinalEvents.push_back(TEvKeyValue::TEvEraseCollect::EventType); - tc.Runtime->DispatchEvents(options3); + TestLog("Third dispatch ", collectStep); + UNIT_ASSERT_VALUES_EQUAL(collectStep, 2); + + ExecuteWrite(tc, {{"key4", "value2"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME); + ExecuteWrite(tc, {{"key5", "value2"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME); + ExecuteWrite(tc, {{"key6", "value2"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME); - try { - tc.Runtime->DispatchEvents(options3); - } catch (NActors::TSchedulingLimitReachedException) { - onlyOneCollect = true; - } + UNIT_ASSERT_VALUES_EQUAL(collectStep, 3); UNIT_ASSERT(onlyOneCollect); } |