diff options
author | kruall <kruall@ydb.tech> | 2022-07-08 13:40:28 +0300 |
---|---|---|
committer | kruall <kruall@ydb.tech> | 2022-07-08 13:40:28 +0300 |
commit | 635fcbaa8b2abae1672956b1ff43cd07ec7d0683 (patch) | |
tree | 4e2733a9a765e273ce687ddfe89532a60e951b33 | |
parent | c4d2bb05d4c15976327070de4b7669774da1129e (diff) | |
download | ydb-635fcbaa8b2abae1672956b1ff43cd07ec7d0683.tar.gz |
Limit count of blobs in one GC request,
-rw-r--r-- | ydb/core/keyvalue/keyvalue_collector.cpp | 210 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_collector_ut.cpp | 39 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_events.h | 17 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_flat_impl.h | 10 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_state.h | 14 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_state_collect.cpp | 92 |
6 files changed, 317 insertions, 65 deletions
diff --git a/ydb/core/keyvalue/keyvalue_collector.cpp b/ydb/core/keyvalue/keyvalue_collector.cpp index c4d9b12054..69b1e6fa1c 100644 --- a/ydb/core/keyvalue/keyvalue_collector.cpp +++ b/ydb/core/keyvalue/keyvalue_collector.cpp @@ -1,5 +1,7 @@ #include "keyvalue_flat_impl.h" + +#include <ydb/core/base/counters.h> #include <ydb/core/blobstorage/dsproxy/blobstorage_backoff.h> #include <library/cpp/actors/core/actor_bootstrapped.h> @@ -12,6 +14,7 @@ namespace NKeyValue { struct TGroupCollector { TDeque<TLogoBlobID> Keep; TDeque<TLogoBlobID> DoNotKeep; + ui32 Step = 0; }; class TKeyValueCollector : public TActorBootstrapped<TKeyValueCollector> { @@ -24,7 +27,19 @@ class TKeyValueCollector : public TActorBootstrapped<TKeyValueCollector> { ui64 CollectorErrors; bool IsSpringCleanup; - TMap<ui32, TMap<ui32, TGroupCollector>> CollectorForGroupForChannel; + // [channel][groupId] + TVector<TMap<ui32, TGroupCollector>> CollectorForGroupForChannel; + ui32 EndChannel = 0; + bool IsMultiStepMode = false; + TMap<ui32, TGroupCollector>::iterator CurrentChannelGroup; + + // For Keep + ui32 ChannelIdxInVector = 0; + TMaybe<THelpers::TGenerationStep> MinGenStepInCircle; + + // For DoNotKeep + TVector<TLogoBlobID> CollectedDoNotKeep; + public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::KEYVALUE_ACTOR; @@ -41,69 +56,118 @@ public: , BackoffTimer(CollectorErrorInitialBackoffMs, CollectorErrorMaxBackoffMs) , CollectorErrors(0) , IsSpringCleanup(isSpringCleanup) + , IsMultiStepMode(CollectOperation->Keep.size() + CollectOperation->DoNotKeep.size() > MaxCollectGarbageFlagsPerMessage) { Y_VERIFY(CollectOperation.Get()); } + ui32 GetVecIdxFromChannelIdx(ui32 channelIdx) { + return EndChannel - 1 - channelIdx; + } + + ui32 GetChannelIdxFromVecIdx(ui32 deqIdx) { + return EndChannel - 1 - deqIdx; + } + void Bootstrap(const TActorContext &ctx) { - ui32 endChannel = TabletInfo->Channels.size(); - for (ui32 channelIdx = BLOB_CHANNEL; channelIdx < endChannel; ++channelIdx) { + EndChannel = TabletInfo->Channels.size(); + CollectorForGroupForChannel.resize(EndChannel - BLOB_CHANNEL); + for (ui32 channelIdx = BLOB_CHANNEL; channelIdx < EndChannel; ++channelIdx) { const auto *channelInfo = TabletInfo->ChannelInfo(channelIdx); for (auto historyIt = channelInfo->History.begin(); historyIt != channelInfo->History.end(); ++historyIt) { if (IsSpringCleanup) { - CollectorForGroupForChannel[channelIdx][historyIt->GroupID]; + CollectorForGroupForChannel[GetVecIdxFromChannelIdx(channelIdx)][historyIt->GroupID]; } else { auto nextHistoryIt = historyIt; nextHistoryIt++; if (nextHistoryIt == channelInfo->History.end()) { - CollectorForGroupForChannel[channelIdx][historyIt->GroupID]; + CollectorForGroupForChannel[GetVecIdxFromChannelIdx(channelIdx)][historyIt->GroupID]; } } } } + if (IsMultiStepMode) { + CollectedDoNotKeep.reserve(MaxCollectGarbageFlagsPerMessage); + } + + Sort(CollectOperation->Keep); for (const auto &blob: CollectOperation->Keep) { ui32 groupId = TabletInfo->ChannelInfo(blob.Channel())->GroupForGeneration(blob.Generation()); Y_VERIFY(groupId != Max<ui32>(), "Keep Blob# %s is mapped to an invalid group (-1)!", blob.ToString().c_str()); - CollectorForGroupForChannel[blob.Channel()][groupId].Keep.push_back(blob); + CollectorForGroupForChannel[GetVecIdxFromChannelIdx(blob.Channel())][groupId].Keep.push_back(blob); } for (const auto &blob: CollectOperation->DoNotKeep) { const ui32 groupId = TabletInfo->ChannelInfo(blob.Channel())->GroupForGeneration(blob.Generation()); Y_VERIFY(groupId != Max<ui32>(), "DoNotKeep Blob# %s is mapped to an invalid group (-1)!", blob.ToString().c_str()); - CollectorForGroupForChannel[blob.Channel()][groupId].DoNotKeep.push_back(blob); + CollectorForGroupForChannel[GetVecIdxFromChannelIdx(blob.Channel())][groupId].DoNotKeep.push_back(blob); } + MinGenStepInCircle = THelpers::TGenerationStep(Max<ui32>(), Max<ui32>()); + ChannelIdxInVector = CollectorForGroupForChannel.size() - 1; + CurrentChannelGroup = CollectorForGroupForChannel.back().begin(); SendTheRequest(ctx); Become(&TThis::StateWait); } + bool ChangeGroup(const TActorContext &ctx) { + if (CollectorForGroupForChannel.back().empty()) { + while (CollectorForGroupForChannel.size() && CollectorForGroupForChannel.back().empty()) { + CollectorForGroupForChannel.pop_back(); + } + if (CollectorForGroupForChannel.empty()) { + ctx.Send(KeyValueActorId, new TEvKeyValue::TEvCompleteGC()); + Die(ctx); + return true; + } + ChannelIdxInVector = CollectorForGroupForChannel.size() - 1; + CurrentChannelGroup = CollectorForGroupForChannel[ChannelIdxInVector].begin(); + } else { + do { + if (ChannelIdxInVector) { + ChannelIdxInVector--; + } else { + ChannelIdxInVector = CollectorForGroupForChannel.size() - 1; + CurrentChannelGroup = CollectorForGroupForChannel[ChannelIdxInVector].begin(); + SendPartitialCompleteGC(true); + return true; + } + } while (CollectorForGroupForChannel[ChannelIdxInVector].empty()); + CurrentChannelGroup = CollectorForGroupForChannel[ChannelIdxInVector].begin(); + } + return false; + } + void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr &ev, const TActorContext &ctx) { + NKikimrProto::EReplyStatus status = ev->Get()->Status; if (status == NKikimrProto::OK) { // Success - CollectorForGroupForChannel.begin()->second.erase( - CollectorForGroupForChannel.begin()->second.begin()); - if (CollectorForGroupForChannel.begin()->second.empty()) { - CollectorForGroupForChannel.erase( - CollectorForGroupForChannel.begin()); + + bool isLastRequestInCollector = false; + { + TGroupCollector &collector = CurrentChannelGroup->second; + isLastRequestInCollector = (collector.Step == collector.Keep.size() + collector.DoNotKeep.size()); } - if (CollectorForGroupForChannel.empty()) { - ctx.Send(KeyValueActorId, new TEvKeyValue::TEvCompleteGC()); - Die(ctx); - return; + if (isLastRequestInCollector) { + CurrentChannelGroup = CollectorForGroupForChannel[ChannelIdxInVector].erase(CurrentChannelGroup); + } else { + CurrentChannelGroup++; + } + if (CurrentChannelGroup == CollectorForGroupForChannel[ChannelIdxInVector].end()) { + if (ChangeGroup(ctx)) { + return; + } } SendTheRequest(ctx); return; } - Y_VERIFY(CollectorForGroupForChannel.begin() != CollectorForGroupForChannel.end()); - Y_VERIFY(CollectorForGroupForChannel.begin()->second.begin() != - CollectorForGroupForChannel.begin()->second.end()); - ui32 channelIdx = CollectorForGroupForChannel.begin()->first; - ui32 groupId = CollectorForGroupForChannel.begin()->second.begin()->first; + ui32 channelIdx = GetChannelIdxFromVecIdx(ChannelIdxInVector); + ui32 groupId = CurrentChannelGroup->first; CollectorErrors++; if (status == NKikimrProto::RACE || status == NKikimrProto::BLOCKED || status == NKikimrProto::NO_GROUP || CollectorErrors > CollectorMaxErrors) { @@ -134,11 +198,8 @@ public: void Handle(TEvents::TEvWakeup::TPtr &ev, const TActorContext &ctx) { Y_UNUSED(ev); - Y_VERIFY(CollectorForGroupForChannel.begin() != CollectorForGroupForChannel.end()); - Y_VERIFY(CollectorForGroupForChannel.begin()->second.begin() != - CollectorForGroupForChannel.begin()->second.end()); - ui32 channelIdx = CollectorForGroupForChannel.begin()->first; - ui32 groupId = CollectorForGroupForChannel.begin()->second.begin()->first; + ui32 channelIdx = GetChannelIdxFromVecIdx(ChannelIdxInVector); + ui32 groupId = CurrentChannelGroup->first; LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "Tablet# " << TabletInfo->TabletID << " Collector retrying with" << " Group# " << groupId << " Channel# " << channelIdx @@ -153,36 +214,101 @@ public: return; } - void PrepareVector(const TDeque<TLogoBlobID> &in, THolder<TVector<TLogoBlobID>> &out) { - ui64 size = in.size(); - if (size) { - out.Reset(new TVector<TLogoBlobID>(size)); - ui64 outIdx = 0; - for (const auto &blob: in) { - (*out)[outIdx] = blob; - ++outIdx; - } - Y_VERIFY(outIdx == size); + void Handle(TEvKeyValue::TEvContinueGC::TPtr &ev) { + LOG_DEBUG_S(TActivationContext::AsActorContext(), NKikimrServices::KEYVALUE, "Tablet# " << TabletInfo->TabletID + << " Collector continue GC Marker# KVC04"); + MinGenStepInCircle = {}; + CollectedDoNotKeep = std::move(ev->Get()->Buffer); + CollectedDoNotKeep.clear(); + SendTheRequest(TActivationContext::AsActorContext()); + } + + void SendPartitialCompleteGC(bool endCircle) { + LOG_DEBUG_S(TActivationContext::AsActorContext(), NKikimrServices::KEYVALUE, "Tablet# " << TabletInfo->TabletID + << "end of round# " << (endCircle ? "yes" : "no") + << " Collector send PartitialCompleteGC Marker# KVC05"); + auto ev = std::make_unique<TEvKeyValue::TEvPartitialCompleteGC>(); + if (endCircle && MinGenStepInCircle) { + ev->CollectedGenerationStep = std::move(MinGenStepInCircle); } + ev->CollectedDoNotKeep = std::move(CollectedDoNotKeep); + + TActivationContext::Send(new IEventHandle(KeyValueActorId, SelfId(), ev.release())); } void SendTheRequest(const TActorContext &ctx) { THolder<TVector<TLogoBlobID>> keep; THolder<TVector<TLogoBlobID>> doNotKeep; - PrepareVector(CollectorForGroupForChannel.begin()->second.begin()->second.Keep, keep); - PrepareVector(CollectorForGroupForChannel.begin()->second.begin()->second.DoNotKeep, doNotKeep); - ui32 channelIdx = CollectorForGroupForChannel.begin()->first; - ui32 groupId = CollectorForGroupForChannel.begin()->second.begin()->first; + + TGroupCollector &collector = CurrentChannelGroup->second; + + ui32 doNotKeepSize = collector.DoNotKeep.size(); + if (collector.Step < doNotKeepSize) { + doNotKeepSize -= collector.Step; + } else { + doNotKeepSize = 0; + } + + if (doNotKeepSize && CollectedDoNotKeep.size() + doNotKeepSize > MaxCollectGarbageFlagsPerMessage) { + SendPartitialCompleteGC(false); + return; + } + + if (doNotKeepSize) { + doNotKeepSize = Min(doNotKeepSize, (ui32)MaxCollectGarbageFlagsPerMessage); + doNotKeep.Reset(new TVector<TLogoBlobID>(doNotKeepSize)); + auto begin = collector.DoNotKeep.begin() + collector.Step; + auto end = begin + doNotKeepSize; + + collector.Step += doNotKeepSize; + Copy(begin, end, doNotKeep->begin()); + Copy(doNotKeep->cbegin(), doNotKeep->cend(), std::back_inserter(CollectedDoNotKeep)); + } + + ui32 keepStartIdx = 0; + if (collector.Step >= collector.DoNotKeep.size()) { + keepStartIdx = collector.Step - collector.DoNotKeep.size(); + } + ui32 keepSize = Min(collector.Keep.size() - keepStartIdx, MaxCollectGarbageFlagsPerMessage - doNotKeepSize); + if (keepSize) { + keep.Reset(new TVector<TLogoBlobID>(keepSize)); + TMaybe<THelpers::TGenerationStep> collectedGenStep; + THelpers::TGenerationStep prevGenStep = THelpers::GenerationStep(collector.Keep.front()); + auto begin = collector.Keep.begin() + keepStartIdx; + auto end = begin + keepSize; + ui32 idx = 0; + for (auto it = begin; it != end; ++it, ++idx) { + THelpers::TGenerationStep genStep = THelpers::GenerationStep(*it); + if (prevGenStep != genStep) { + collectedGenStep = prevGenStep; + prevGenStep = genStep; + } + (*keep)[idx] = *it; + } + collector.Step += idx; + if (collectedGenStep && MinGenStepInCircle) { + MinGenStepInCircle = Min(*MinGenStepInCircle, *collectedGenStep); + } else if (collectedGenStep) { + MinGenStepInCircle = collectedGenStep; + } + } + + bool isLast = (collector.Keep.size() + collector.DoNotKeep.size() == collector.Step); + + ui32 collectGeneration = CollectOperation->Header.CollectGeneration; + ui32 collectStep = CollectOperation->Header.CollectStep; + ui32 channelIdx = GetChannelIdxFromVecIdx(CollectorForGroupForChannel.size() - 1); + ui32 groupId = CurrentChannelGroup->first; SendToBSProxy(ctx, groupId, new TEvBlobStorage::TEvCollectGarbage(TabletInfo->TabletID, RecordGeneration, PerGenerationCounter, - channelIdx, true, - CollectOperation->Header.CollectGeneration, CollectOperation->Header.CollectStep, + channelIdx, isLast, collectGeneration, collectStep, keep.Release(), doNotKeep.Release(), TInstant::Max(), true), (ui64)TKeyValueState::ECollectCookie::Soft); } STFUNC(StateWait) { switch (ev->GetTypeRewrite()) { HFunc(TEvBlobStorage::TEvCollectGarbageResult, Handle); + hFunc(TEvKeyValue::TEvContinueGC, Handle); HFunc(TEvents::TEvWakeup, Handle); HFunc(TEvents::TEvPoisonPill, Handle); default: diff --git a/ydb/core/keyvalue/keyvalue_collector_ut.cpp b/ydb/core/keyvalue/keyvalue_collector_ut.cpp index e1704e8178..f414deca5c 100644 --- a/ydb/core/keyvalue/keyvalue_collector_ut.cpp +++ b/ydb/core/keyvalue/keyvalue_collector_ut.cpp @@ -225,5 +225,44 @@ Y_UNIT_TEST(TestKeyValueCollectorMultiple) { UNIT_ASSERT(eraseCollect); } + +Y_UNIT_TEST(TestKeyValueCollectorMany) { + TContext context; + context.Setup(); + + TVector<TLogoBlobID> keep; + TVector<TLogoBlobID> doNotKeep; + doNotKeep.reserve(MaxCollectGarbageFlagsPerMessage + 1); + for (ui32 idx = 1; idx <= MaxCollectGarbageFlagsPerMessage + 1; ++idx) { + doNotKeep.emplace_back(0x10010000001000Bull, idx, 58949, NKeyValue::BLOB_CHANNEL, 1209816, 10); + keep.emplace_back(0x10010000001000Bull, idx, 58949, NKeyValue::BLOB_CHANNEL, 1209816, 10); + + } + + TIntrusivePtr<NKeyValue::TCollectOperation> operation(new NKeyValue::TCollectOperation(100, 100, std::move(keep), std::move(doNotKeep))); + context.SetActor(CreateKeyValueCollector( + context.GetTabletActorId(), operation, context.GetTabletInfo().Get(), 200, 200, true)); + + for (ui32 idx = 0; idx < 7; ++idx) { + TAutoPtr<IEventHandle> handle; + auto collect = context.GrabEvent<TEvBlobStorage::TEvCollectGarbage>(handle); + UNIT_ASSERT(collect); + + context.Send(new TEvBlobStorage::TEvCollectGarbageResult(NKikimrProto::OK, collect->TabletId, + collect->RecordGeneration, collect->PerGenerationCounter, collect->Channel)); + + if (idx == 1 || idx == 5) { + auto complete = context.GrabEvent<TEvKeyValue::TEvPartitialCompleteGC>(handle); + complete->CollectedDoNotKeep.clear(); + auto cont = std::make_unique<TEvKeyValue::TEvContinueGC>(std::move(complete->CollectedDoNotKeep)); + context.Send(cont.release()); + } + } + + TAutoPtr<IEventHandle> handle; + auto eraseCollect = context.GrabEvent<TEvKeyValue::TEvCompleteGC>(handle); + UNIT_ASSERT(eraseCollect); +} + } // TKeyValueCollectorTest } // NKikimr diff --git a/ydb/core/keyvalue/keyvalue_events.h b/ydb/core/keyvalue/keyvalue_events.h index d2f7931f8a..2b9a4f11a0 100644 --- a/ydb/core/keyvalue/keyvalue_events.h +++ b/ydb/core/keyvalue/keyvalue_events.h @@ -2,6 +2,7 @@ #include "defs.h" #include "keyvalue_intermediate.h" #include "keyvalue_request_stat.h" +#include "keyvalue_helpers.h" #include <ydb/public/lib/base/msgbus.h> #include <ydb/core/keyvalue/protos/events.pb.h> @@ -24,6 +25,8 @@ struct TEvKeyValue { EvReportWriteLatency, EvUpdateWeights, EvCompleteGC, + EvPartitialCompleteGC, + EvContinueGC, EvRead = EvRequest + 16, EvReadRange, @@ -198,6 +201,20 @@ struct TEvKeyValue { struct TEvCompleteGC : public TEventLocal<TEvCompleteGC, EvCompleteGC> { TEvCompleteGC() { } }; + + struct TEvPartitialCompleteGC : public TEventLocal<TEvPartitialCompleteGC, EvPartitialCompleteGC> { + TMaybe<NKeyValue::THelpers::TGenerationStep> CollectedGenerationStep; + TVector<TLogoBlobID> CollectedDoNotKeep; + + TEvPartitialCompleteGC() { } + }; + + struct TEvContinueGC : public TEventLocal<TEvContinueGC, EvContinueGC> { + TVector<TLogoBlobID> Buffer; + TEvContinueGC(TVector<TLogoBlobID> &&buffer) + : Buffer(std::move(buffer)) + { } + }; }; } // NKikimr diff --git a/ydb/core/keyvalue/keyvalue_flat_impl.h b/ydb/core/keyvalue/keyvalue_flat_impl.h index e0d696da48..788c78fac9 100644 --- a/ydb/core/keyvalue/keyvalue_flat_impl.h +++ b/ydb/core/keyvalue/keyvalue_flat_impl.h @@ -246,6 +246,7 @@ protected: KV_SIMPLE_TX(EraseCollect); KV_SIMPLE_TX(RegisterInitialGCCompletion); KV_SIMPLE_TX(CompleteGC); + KV_SIMPLE_TX(PartitialCompleteGC); TKeyValueState State; TDeque<TAutoPtr<IEventHandle>> InitialEventsQueue; @@ -318,6 +319,12 @@ protected: Execute(new TTxCompleteGC(this), ctx); } + void Handle(TEvKeyValue::TEvPartitialCompleteGC::TPtr &ev, const TActorContext &ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletID() + << " Handle TEvPartitialCompleteGC " << ev->Get()->ToString()); + Execute(new TTxPartitialCompleteGC(this), ctx); + } + void Handle(TEvKeyValue::TEvCollect::TPtr &ev, const TActorContext &ctx) { LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletID() << " Handle TEvCollect " << ev->Get()->ToString()); @@ -332,7 +339,7 @@ protected: } CollectorActorId = ctx.RegisterWithSameMailbox(CreateKeyValueCollector(ctx.SelfID, State.GetCollectOperation(), Info(), Executor()->Generation(), State.GetPerGenerationCounter(), isSpringCleanup)); - State.OnEvCollectDone(perGenerationCounterStepSize, ctx); + State.OnEvCollectDone(perGenerationCounterStepSize, CollectorActorId, ctx); } else { LOG_ERROR_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletID() << " Handle TEvCollect: PerGenerationCounter overflow prevention restart."); @@ -509,6 +516,7 @@ public: HFunc(TEvKeyValue::TEvEraseCollect, Handle); HFunc(TEvKeyValue::TEvCompleteGC, Handle); + HFunc(TEvKeyValue::TEvPartitialCompleteGC, Handle); HFunc(TEvKeyValue::TEvCollect, Handle); HFunc(TEvKeyValue::TEvStoreCollect, Handle); HFunc(TEvKeyValue::TEvRequest, Handle); diff --git a/ydb/core/keyvalue/keyvalue_state.h b/ydb/core/keyvalue/keyvalue_state.h index 418be9d2a5..2c3b0e431f 100644 --- a/ydb/core/keyvalue/keyvalue_state.h +++ b/ydb/core/keyvalue/keyvalue_state.h @@ -265,6 +265,7 @@ protected: ui64 TabletId; TActorId KeyValueActorId; + TActorId CollectorActorId; ui32 ExecutorGeneration; bool IsStatePresent; bool IsEmptyDbStart; @@ -289,6 +290,9 @@ protected: NMetrics::TResourceMetrics* ResourceMetrics; + TMaybe<NKeyValue::THelpers::TGenerationStep> PartitialCollectedGenerationStep; + TVector<TLogoBlobID> PartitialCollectedDoNotKeep; + public: TKeyValueState(); void Clear(); @@ -330,22 +334,30 @@ public: // garbage collection methods void PrepareCollectIfNeeded(const TActorContext &ctx); + void RemoveFromTrashDoNotKeep(ISimpleDb &db, const TActorContext &ctx, const TVector<TLogoBlobID> &collectedDoNotKeep); + void RemoveFromTrashBySoftBarrier(ISimpleDb &db, const TActorContext &ctx, const NKeyValue::THelpers::TGenerationStep &genStep); + void UpdateStoredState(ISimpleDb &db, const TActorContext &ctx, const NKeyValue::THelpers::TGenerationStep &genStep); void UpdateGC(ISimpleDb &db, const TActorContext &ctx, bool updateTrash, bool updateState); + void UpdateAfterPartitialGC(ISimpleDb &db, const TActorContext &ctx); void StoreCollectExecute(ISimpleDb &db, const TActorContext &ctx); void StoreCollectComplete(const TActorContext &ctx); void EraseCollectExecute(ISimpleDb &db, const TActorContext &ctx); void EraseCollectComplete(const TActorContext &ctx); void CompleteGCExecute(ISimpleDb &db, const TActorContext &ctx); void CompleteGCComplete(const TActorContext &ctx); + void PartitialCompleteGCExecute(ISimpleDb &db, const TActorContext &ctx); + void PartitialCompleteGCComplete(const TActorContext &ctx); void SendStoreCollect(const TActorContext &ctx, const THelpers::TGenerationStep &genStep, TVector<TLogoBlobID> &keep, TVector<TLogoBlobID> &doNotKeep); void StartGC(const TActorContext &ctx, const THelpers::TGenerationStep &genStep, TVector<TLogoBlobID> &keep, TVector<TLogoBlobID> &doNotKeep); void StartCollectingIfPossible(const TActorContext &ctx); ui64 OnEvCollect(const TActorContext &ctx); - void OnEvCollectDone(ui64 perGenerationCounterStepSize, const TActorContext &ctx); + void OnEvCollectDone(ui64 perGenerationCounterStepSize, TActorId collector, const TActorContext &ctx); void OnEvEraseCollect(const TActorContext &ctx); void OnEvCompleteGC(); + void OnEvPartitialCompleteGC(TEvKeyValue::TEvPartitialCompleteGC *ev); + void Reply(THolder<TIntermediate> &intermediate, const TActorContext &ctx, const TTabletStorageInfo *info); void ProcessCmd(TIntermediate::TRead &read, diff --git a/ydb/core/keyvalue/keyvalue_state_collect.cpp b/ydb/core/keyvalue/keyvalue_state_collect.cpp index 8458532b29..4651509c4c 100644 --- a/ydb/core/keyvalue/keyvalue_state_collect.cpp +++ b/ydb/core/keyvalue/keyvalue_state_collect.cpp @@ -49,6 +49,56 @@ void TKeyValueState::PrepareCollectIfNeeded(const TActorContext &ctx) { } + +void TKeyValueState::RemoveFromTrashDoNotKeep(ISimpleDb &db, const TActorContext &ctx, + const TVector<TLogoBlobID> &collectedDoNotKeep) +{ + for (const TLogoBlobID &id : collectedDoNotKeep) { + THelpers::DbEraseTrash(id, db, ctx); + ui32 num = Trash.erase(id); + Y_VERIFY(num == 1); + CountTrashCollected(id.BlobSize()); + } +} + +void TKeyValueState::RemoveFromTrashBySoftBarrier(ISimpleDb &db, const TActorContext &ctx, + const NKeyValue::THelpers::TGenerationStep &genStep) +{ + ui64 storedCollectGeneration = StoredState.GetCollectGeneration(); + ui64 storedCollectStep = StoredState.GetCollectStep(); + + // remove trash entries that were not marked as 'Keep' before, but which are automatically deleted by this barrier + // to prevent them from being added to 'DoNotKeep' list after + for (auto it = Trash.begin(); it != Trash.end(); ) { + THelpers::TGenerationStep trashGenStep = THelpers::GenerationStep(*it); + bool afterStoredSoftBarrier = trashGenStep > THelpers::TGenerationStep(storedCollectGeneration, storedCollectStep); + bool beforeSoftBarrier = trashGenStep <= genStep; + if (afterStoredSoftBarrier && beforeSoftBarrier) { + CountTrashCollected(it->BlobSize()); + THelpers::DbEraseTrash(*it, db, ctx); + it = Trash.erase(it); + } else { + ++it; + } + } +} + +void TKeyValueState::UpdateStoredState(ISimpleDb &db, const TActorContext &ctx, + const NKeyValue::THelpers::TGenerationStep &genStep) +{ + StoredState.SetCollectGeneration(std::get<0>(genStep)); + StoredState.SetCollectStep(std::get<1>(genStep)); + THelpers::DbUpdateState(StoredState, db, ctx); +} + +void TKeyValueState::UpdateAfterPartitialGC(ISimpleDb &db, const TActorContext &ctx) { + RemoveFromTrashDoNotKeep(db, ctx, PartitialCollectedDoNotKeep); + if (PartitialCollectedGenerationStep) { + RemoveFromTrashBySoftBarrier(db, ctx, *PartitialCollectedGenerationStep); + UpdateStoredState(db, ctx, *PartitialCollectedGenerationStep); + } +} + void TKeyValueState::UpdateGC(ISimpleDb &db, const TActorContext &ctx, bool updateTrash, bool updateState) { if (IsDamaged) { return; @@ -57,11 +107,9 @@ void TKeyValueState::UpdateGC(ISimpleDb &db, const TActorContext &ctx, bool upda ui64 collectGeneration = CollectOperation->Header.GetCollectGeneration(); ui64 collectStep = CollectOperation->Header.GetCollectStep(); + auto collectGenStep = THelpers::TGenerationStep(collectGeneration, collectStep); if (updateTrash) { - ui64 storedCollectGeneration = StoredState.GetCollectGeneration(); - ui64 storedCollectStep = StoredState.GetCollectStep(); - for (TLogoBlobID &id: CollectOperation->DoNotKeep) { THelpers::DbEraseTrash(id, db, ctx); ui32 num = Trash.erase(id); @@ -69,27 +117,13 @@ void TKeyValueState::UpdateGC(ISimpleDb &db, const TActorContext &ctx, bool upda CountTrashCollected(id.BlobSize()); } - // remove trash entries that were not marked as 'Keep' before, but which are automatically deleted by this barrier - // to prevent them from being added to 'DoNotKeep' list after - for (auto it = Trash.begin(); it != Trash.end(); ) { - THelpers::TGenerationStep trashGenStep = THelpers::GenerationStep(*it); - bool afterStoredSoftBarrier = trashGenStep > THelpers::TGenerationStep(storedCollectGeneration, storedCollectStep); - bool beforeSoftBarrier = trashGenStep <= THelpers::TGenerationStep(collectGeneration, collectStep); - if (afterStoredSoftBarrier && beforeSoftBarrier) { - CountTrashCollected(it->BlobSize()); - THelpers::DbEraseTrash(*it, db, ctx); - it = Trash.erase(it); - } else { - ++it; - } - } + RemoveFromTrashBySoftBarrier(db, ctx, collectGenStep); } if (updateState) { - StoredState.SetCollectGeneration(collectGeneration); - StoredState.SetCollectStep(collectStep); - THelpers::DbUpdateState(StoredState, db, ctx); + UpdateStoredState(db, ctx, collectGenStep); } + } void TKeyValueState::StoreCollectExecute(ISimpleDb &db, const TActorContext &ctx) { @@ -150,6 +184,16 @@ void TKeyValueState::CompleteGCComplete(const TActorContext &ctx) { PrepareCollectIfNeeded(ctx); } + +void TKeyValueState::PartitialCompleteGCExecute(ISimpleDb &db, const TActorContext &ctx) { + UpdateAfterPartitialGC(db, ctx); +} + +void TKeyValueState::PartitialCompleteGCComplete(const TActorContext &ctx) { + PartitialCollectedDoNotKeep.clear(); + ctx.Send(CollectorActorId, new TEvKeyValue::TEvContinueGC(std::move(PartitialCollectedDoNotKeep))); +} + // Prepare the completely new full collect operation with the same gen/step, but with correct keep & doNotKeep lists void TKeyValueState::SendStoreCollect(const TActorContext &ctx, const THelpers::TGenerationStep &genStep, TVector<TLogoBlobID> &keep, TVector<TLogoBlobID> &doNotKeep) { @@ -242,10 +286,11 @@ ui64 TKeyValueState::OnEvCollect(const TActorContext &ctx) { return perGenerationCounterStepSize; } -void TKeyValueState::OnEvCollectDone(ui64 perGenerationCounterStepSize, const TActorContext &ctx) { +void TKeyValueState::OnEvCollectDone(ui64 perGenerationCounterStepSize, TActorId collector, const TActorContext &ctx) { Y_UNUSED(ctx); Y_VERIFY(perGenerationCounterStepSize >= 1); PerGenerationCounter += perGenerationCounterStepSize; + CollectorActorId = collector; } void TKeyValueState::OnEvEraseCollect(const TActorContext &ctx) { @@ -257,6 +302,11 @@ void TKeyValueState::OnEvCompleteGC() { CountLatencyBsCollect(); } +void TKeyValueState::OnEvPartitialCompleteGC(TEvKeyValue::TEvPartitialCompleteGC *ev) { + PartitialCollectedGenerationStep = std::move(ev->CollectedGenerationStep); + PartitialCollectedDoNotKeep = std::move(ev->CollectedDoNotKeep); +} + } // NKeyValue } // NKikimr |