diff options
author | kruall <kruall@ydb.tech> | 2022-09-14 15:13:40 +0300 |
---|---|---|
committer | kruall <kruall@ydb.tech> | 2022-09-14 15:13:40 +0300 |
commit | 93f18fccde2eeff3db4784ee8e37bad102a8d5da (patch) | |
tree | 11e1ba2c3740bcbb5e1512d4436f01aae87cb90c | |
parent | 211d3af96ecbc839df06d53a1868a66bd6741323 (diff) | |
download | ydb-93f18fccde2eeff3db4784ee8e37bad102a8d5da.tar.gz |
Split GC with blob without keep flag,
-rw-r--r-- | ydb/core/keyvalue/keyvalue_collector.cpp | 300 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_collector_ut.cpp | 34 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_events.h | 11 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_flat_impl.h | 9 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_state.h | 13 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_state_collect.cpp | 54 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_ut.cpp | 19 |
7 files changed, 237 insertions, 203 deletions
diff --git a/ydb/core/keyvalue/keyvalue_collector.cpp b/ydb/core/keyvalue/keyvalue_collector.cpp index 0cac380251..25dc777d24 100644 --- a/ydb/core/keyvalue/keyvalue_collector.cpp +++ b/ydb/core/keyvalue/keyvalue_collector.cpp @@ -31,12 +31,6 @@ class TKeyValueCollector : public TActorBootstrapped<TKeyValueCollector> { // [channel][groupId] TVector<TMap<ui32, TGroupCollector>> CollectorForGroupForChannel; ui32 EndChannel = 0; - bool IsMultiStepMode = false; - TMap<ui32, TGroupCollector>::iterator CurrentChannelGroup; - - // For Keep - ui32 ChannelIdxInVector = 0; - TMaybe<THelpers::TGenerationStep> MinGenStepInCircle; // For DoNotKeep TVector<TLogoBlobID> CollectedDoNotKeep; @@ -57,7 +51,6 @@ public: , BackoffTimer(CollectorErrorInitialBackoffMs, CollectorErrorMaxBackoffMs) , CollectorErrors(0) , IsSpringCleanup(isSpringCleanup) - , IsMultiStepMode(CollectOperation->Keep.size() + CollectOperation->DoNotKeep.size() > MaxCollectGarbageFlagsPerMessage) { Y_VERIFY(CollectOperation.Get()); } @@ -66,11 +59,11 @@ public: return EndChannel - 1 - channelIdx; } - ui32 GetChannelIdxFromVecIdx(ui32 deqIdx) { + ui32 GetChannelIdFromVecIdx(ui32 deqIdx) { return EndChannel - 1 - deqIdx; } - void Bootstrap(const TActorContext &ctx) { + void Bootstrap() { EndChannel = TabletInfo->Channels.size(); CollectorForGroupForChannel.resize(EndChannel - BLOB_CHANNEL); for (ui32 channelIdx = BLOB_CHANNEL; channelIdx < EndChannel; ++channelIdx) { @@ -88,12 +81,8 @@ public: } } - if (IsMultiStepMode) { - CollectedDoNotKeep.reserve(MaxCollectGarbageFlagsPerMessage); - } - STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC04, "Start KeyValueCollector", - (TabletId, TabletInfo->TabletID), (IsMultiStepMode, IsMultiStepMode), + (TabletId, TabletInfo->TabletID), (Keep, CollectOperation->Keep.size()), (DoNotKeep, CollectOperation->DoNotKeep.size())); Sort(CollectOperation->Keep); @@ -109,171 +98,138 @@ public: blob.ToString().c_str()); CollectorForGroupForChannel[GetVecIdxFromChannelIdx(blob.Channel())][groupId].DoNotKeep.push_back(blob); } + ui64 maxDoNotKeepSizeInGroupChannel = 0; + for (auto &groups : CollectorForGroupForChannel) { + for (auto &[groupId, collector] : groups) { + maxDoNotKeepSizeInGroupChannel = Max(maxDoNotKeepSizeInGroupChannel, collector.DoNotKeep.size()); + } + } + CollectedDoNotKeep.reserve(Min(maxDoNotKeepSizeInGroupChannel, MaxCollectGarbageFlagsPerMessage)); - MinGenStepInCircle = THelpers::TGenerationStep(Max<ui32>(), Max<ui32>()); - ChannelIdxInVector = CollectorForGroupForChannel.size() - 1; - CurrentChannelGroup = CollectorForGroupForChannel.back().begin(); - SendTheRequest(ctx); + SendTheRequest(); Become(&TThis::StateWait); } - bool ChangeChannel(const TActorContext &ctx) { - if (CollectorForGroupForChannel.back().empty()) { - while (CollectorForGroupForChannel.size() && CollectorForGroupForChannel.back().empty()) { - STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC09, "Empty channel, it's erased", - (TabletId, TabletInfo->TabletID), - (Channel, GetChannelIdxFromVecIdx(ChannelIdxInVector))); - CollectorForGroupForChannel.pop_back(); - } - if (CollectorForGroupForChannel.empty()) { - STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC06, "Send TEvCompleteGC and die", - (TabletId, TabletInfo->TabletID), - (ErasedGroupId, CurrentChannelGroup->first), - (Channel, GetChannelIdxFromVecIdx(ChannelIdxInVector))); - ctx.Send(KeyValueActorId, new TEvKeyValue::TEvCompleteGC()); - Die(ctx); - return true; - } - ChannelIdxInVector = CollectorForGroupForChannel.size() - 1; - CurrentChannelGroup = CollectorForGroupForChannel[ChannelIdxInVector].begin(); - } else { - do { - if (ChannelIdxInVector) { - STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC07, "Move to next channel", - (TabletId, TabletInfo->TabletID), - (ErasedGroupId, CurrentChannelGroup->first), - (Channel, GetChannelIdxFromVecIdx(ChannelIdxInVector))); - ChannelIdxInVector--; - } else { - STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC10, "End of round; Send PartitialCompleteGC", - (TabletId, TabletInfo->TabletID), - (ErasedGroupId, CurrentChannelGroup->first), - (Channel, GetChannelIdxFromVecIdx(ChannelIdxInVector)), - (CollectedDoNotKeep, CollectedDoNotKeep.size())); - ChannelIdxInVector = CollectorForGroupForChannel.size() - 1; - CurrentChannelGroup = CollectorForGroupForChannel[ChannelIdxInVector].begin(); - SendPartitialCompleteGC(true); - return true; - } - } while (CollectorForGroupForChannel[ChannelIdxInVector].empty()); - CurrentChannelGroup = CollectorForGroupForChannel[ChannelIdxInVector].begin(); - } - - return false; + ui32 GetCurretChannelId() { + return GetChannelIdFromVecIdx(CollectorForGroupForChannel.size() - 1); } - void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr &ev, const TActorContext &ctx) { + void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr &ev) { NKikimrProto::EReplyStatus status = ev->Get()->Status; STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC11, "Receive TEvCollectGarbageResult", (TabletId, TabletInfo->TabletID), (Status, status)); + auto collectorsOfCurrentChannel = CollectorForGroupForChannel.rbegin(); + if (collectorsOfCurrentChannel == CollectorForGroupForChannel.rend()) { + STLOG(NLog::PRI_ERROR, NKikimrServices::KEYVALUE_GC, KVC17, + "Collectors must be exist when we recieve TEvCollectGarbageResult", + (TabletId, TabletInfo->TabletID), (CollectorErrors, CollectorErrors)); + HandleErrorAndDie(); + return; + } + + auto currentCollectorIterator = collectorsOfCurrentChannel->begin(); + if (currentCollectorIterator == collectorsOfCurrentChannel->end()) { + STLOG(NLog::PRI_ERROR, NKikimrServices::KEYVALUE_GC, KVC17, + "Collectors must be exist in the current channel when we recieve TEvCollectGarbageResult", + (TabletId, TabletInfo->TabletID), (Channel, GetCurretChannelId()), (CollectorErrors, CollectorErrors)); + HandleErrorAndDie(); + return; + } + if (status == NKikimrProto::OK) { // Success - bool isLastRequestInCollector = false; { - TGroupCollector &collector = CurrentChannelGroup->second; + TGroupCollector &collector = currentCollectorIterator->second; isLastRequestInCollector = (collector.Step == collector.Keep.size() + collector.DoNotKeep.size()); } if (isLastRequestInCollector) { STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC08, "Last group was empty, it's erased", - (TabletId, TabletInfo->TabletID), - (Status, status), - (ErasedGroupId, CurrentChannelGroup->first)); - CurrentChannelGroup = CollectorForGroupForChannel[ChannelIdxInVector].erase(CurrentChannelGroup); - } else { - STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC08, "Move to next group, it's erased", - (TabletId, TabletInfo->TabletID), - (Status, status), - (ErasedGroupId, CurrentChannelGroup->first)); - CurrentChannelGroup++; + (TabletId, TabletInfo->TabletID), (GroupId, currentCollectorIterator->first), (Channel, GetCurretChannelId())); + currentCollectorIterator = collectorsOfCurrentChannel->erase(currentCollectorIterator); } - if (CurrentChannelGroup == CollectorForGroupForChannel[ChannelIdxInVector].end()) { - if (ChangeChannel(ctx)) { - return; - } + if (currentCollectorIterator == collectorsOfCurrentChannel->end()) { + STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC08, "Last channel was empty, it's erased", + (TabletId, TabletInfo->TabletID), (Channel, GetCurretChannelId())); + CollectorForGroupForChannel.pop_back(); } - SendTheRequest(ctx); + if (CollectedDoNotKeep.size()) { + SendPartialCompleteGC(); + return; + } + if (CollectorForGroupForChannel.empty()) { + SendCompleteGCAndDie(); + return; + } + SendTheRequest(); return; } - ui32 channelIdx = GetChannelIdxFromVecIdx(ChannelIdxInVector); - ui32 groupId = CurrentChannelGroup->first; + ui32 channelId = GetCurretChannelId(); + ui32 groupId = currentCollectorIterator->first; CollectorErrors++; if (status == NKikimrProto::RACE || status == NKikimrProto::BLOCKED || status == NKikimrProto::NO_GROUP || CollectorErrors > CollectorMaxErrors) { - LOG_ERROR_S(ctx, NKikimrServices::KEYVALUE_GC, "Tablet# " << TabletInfo->TabletID - << " Collector got Status# " << NKikimrProto::EReplyStatus_Name(status) - << " from Group# " << groupId << " Channel# " << channelIdx - << " CollectorErrors# " << CollectorErrors - << " Marker# KVC01"); - // Die - ctx.Send(KeyValueActorId, new TEvents::TEvPoisonPill()); - Die(ctx); + STLOG(NLog::PRI_ERROR, NKikimrServices::KEYVALUE_GC, KVC01, "Collector got not OK status", + (TabletId, TabletInfo->TabletID), (GroupId, groupId), (Channel, channelId), (Status, status), + (CollectorErrors, CollectorErrors), (CollectorMaxErrors, CollectorMaxErrors)); + HandleErrorAndDie(); return; } // Rertry ui64 backoffMs = BackoffTimer.NextBackoffMs(); + STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC02, "Collector got not OK status, retry", + (TabletId, TabletInfo->TabletID), (GroupId, groupId), (Channel, channelId), + (Status, status), (BackoffMs, backoffMs), (RetryingImmediately, (backoffMs ? "no" : "yes"))); if (backoffMs) { const TDuration &timeout = TDuration::MilliSeconds(backoffMs); - ctx.Schedule(timeout, new TEvents::TEvWakeup()); + TActivationContext::Schedule(timeout, new IEventHandle(TEvents::TEvWakeup::EventType, 0, SelfId(), SelfId(), nullptr, 0)); } else { - LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE_GC, "Tablet# " << TabletInfo->TabletID - << " Collector got Status# " << NKikimrProto::EReplyStatus_Name(status) - << " from Group# " << groupId << " Channel# " << channelIdx - << " Retrying immediately. Marker# KVC02"); - SendTheRequest(ctx); + SendTheRequest(); } } - void Handle(TEvents::TEvWakeup::TPtr &ev, const TActorContext &ctx) { - Y_UNUSED(ev); - ui32 channelIdx = GetChannelIdxFromVecIdx(ChannelIdxInVector); - ui32 groupId = CurrentChannelGroup->first; - LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE_GC, "Tablet# " << TabletInfo->TabletID - << " Collector retrying with" - << " Group# " << groupId << " Channel# " << channelIdx - << " Marker# KVC03"); - SendTheRequest(ctx); - return; - } - - void Handle(TEvents::TEvPoisonPill::TPtr &ev, const TActorContext &ctx) { - STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC12, "Poisoned", - (TabletId, TabletInfo->TabletID)); - Y_UNUSED(ev); - Die(ctx); - return; + void SendPartialCompleteGC() { + STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC14, "Collector send PartialCompleteGC", + (TabletId, TabletInfo->TabletID), + (FirstDoNotKeep, (CollectedDoNotKeep.size() ? CollectedDoNotKeep[0].ToString() : "none")), + (CollectedDoNotKeepSize, CollectedDoNotKeep.size())); + Send(KeyValueActorId, new TEvKeyValue::TEvPartialCompleteGC(std::move(CollectedDoNotKeep))); } - void Handle(TEvKeyValue::TEvContinueGC::TPtr &ev) { - STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE, KVC13, "Collector continue GC", + void SendCompleteGCAndDie() { + STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC19, "Collector send CompleteGC", (TabletId, TabletInfo->TabletID)); - MinGenStepInCircle = {}; - CollectedDoNotKeep = std::move(ev->Get()->Buffer); - CollectedDoNotKeep.clear(); - SendTheRequest(TActivationContext::AsActorContext()); + Send(KeyValueActorId, new TEvKeyValue::TEvCompleteGC()); + PassAway(); } - void SendPartitialCompleteGC(bool endCircle) { - STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE, KVC14, "Collector send PartitialCompleteGC", - (TabletId, TabletInfo->TabletID), (EndOfRound, (endCircle ? "yes" : "no"))); - auto ev = std::make_unique<TEvKeyValue::TEvPartitialCompleteGC>(); - if (endCircle && MinGenStepInCircle) { - ev->CollectedGenerationStep = std::move(MinGenStepInCircle); + void SendTheRequest() { + auto collectorsOfCurrentChannel = CollectorForGroupForChannel.rbegin(); + if (collectorsOfCurrentChannel == CollectorForGroupForChannel.rend()) { + STLOG(NLog::PRI_ERROR, NKikimrServices::KEYVALUE_GC, KVC30, + "Collectors must be exist when we try to send the request", + (TabletId, TabletInfo->TabletID), (CollectorErrors, CollectorErrors)); + HandleErrorAndDie(); + return; } - ev->CollectedDoNotKeep = std::move(CollectedDoNotKeep); - TActivationContext::Send(new IEventHandle(KeyValueActorId, SelfId(), ev.release())); - } + auto currentCollectorIterator = collectorsOfCurrentChannel->begin(); + if (currentCollectorIterator == collectorsOfCurrentChannel->end()) { + STLOG(NLog::PRI_ERROR, NKikimrServices::KEYVALUE_GC, KVC31, + "Collectors must be exist in the current channel we try to send the request", + (TabletId, TabletInfo->TabletID), (Channel, GetCurretChannelId()), (CollectorErrors, CollectorErrors)); + HandleErrorAndDie(); + return; + } - void SendTheRequest(const TActorContext &ctx) { THolder<TVector<TLogoBlobID>> keep; THolder<TVector<TLogoBlobID>> doNotKeep; - TGroupCollector &collector = CurrentChannelGroup->second; + TGroupCollector &collector = currentCollectorIterator->second; ui32 doNotKeepSize = collector.DoNotKeep.size(); if (collector.Step < doNotKeepSize) { @@ -282,16 +238,6 @@ public: doNotKeepSize = 0; } - if (CollectedDoNotKeep.size() && doNotKeepSize && CollectedDoNotKeep.size() + doNotKeepSize > MaxCollectGarbageFlagsPerMessage) { - STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE, KVC15, "CollectedDoNotKeep was oevrflow; Send PartitialCompleteGC", - (TabletId, TabletInfo->TabletID), - (doNotKeepSize, doNotKeepSize), - (CollectedDoNotKeep.size, CollectedDoNotKeep.size()), - (MaxCollectGarbageFlagsPerMessage, MaxCollectGarbageFlagsPerMessage)); - SendPartitialCompleteGC(false); - return; - } - if (doNotKeepSize) { doNotKeepSize = Min(doNotKeepSize, (ui32)MaxCollectGarbageFlagsPerMessage); doNotKeep.Reset(new TVector<TLogoBlobID>(doNotKeepSize)); @@ -324,37 +270,83 @@ public: (*keep)[idx] = *it; } collector.Step += idx; - if (collectedGenStep && MinGenStepInCircle) { - MinGenStepInCircle = Min(*MinGenStepInCircle, *collectedGenStep); - } else if (collectedGenStep) { - MinGenStepInCircle = collectedGenStep; - } } bool isLast = (collector.Keep.size() + collector.DoNotKeep.size() == collector.Step); ui32 collectGeneration = CollectOperation->Header.CollectGeneration; ui32 collectStep = CollectOperation->Header.CollectStep; - ui32 channelIdx = GetChannelIdxFromVecIdx(CollectorForGroupForChannel.size() - 1); - ui32 groupId = CurrentChannelGroup->first; + ui32 channelIdx = GetChannelIdFromVecIdx(CollectorForGroupForChannel.size() - 1); + ui32 groupId = currentCollectorIterator->first; - STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE, KVC16, "Send GC request", + STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC16, "Send GC request", (TabletId, TabletInfo->TabletID), (CollectGeneration, collectGeneration), (CollectStep, collectStep), (ChannelIdx, channelIdx), (GroupId, groupId), (KeepSize, keepSize), (DoNotKeepSize, doNotKeepSize), (IsLast, isLast)); - SendToBSProxy(ctx, groupId, + SendToBSProxy(TActivationContext::AsActorContext(), groupId, new TEvBlobStorage::TEvCollectGarbage(TabletInfo->TabletID, RecordGeneration, PerGenerationCounter, channelIdx, isLast, collectGeneration, collectStep, keep.Release(), doNotKeep.Release(), TInstant::Max(), true), (ui64)TKeyValueState::ECollectCookie::Soft); } - STFUNC(StateWait) { + void HandleContinueGC(TEvKeyValue::TEvContinueGC::TPtr &ev) { + STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC13, "Collector continue GC", + (TabletId, TabletInfo->TabletID)); + if (CollectorForGroupForChannel.empty()) { + SendCompleteGCAndDie(); + return; + } + CollectedDoNotKeep = std::move(ev->Get()->Buffer); + CollectedDoNotKeep.clear(); + SendTheRequest(); + } + + void HandleWakeUp() { + auto collectorsOfCurrentChannel = CollectorForGroupForChannel.rbegin(); + if (collectorsOfCurrentChannel == CollectorForGroupForChannel.rend()) { + STLOG(NLog::PRI_ERROR, NKikimrServices::KEYVALUE_GC, KVC17, + "Collectors must be exist when we try to resend the request", + (TabletId, TabletInfo->TabletID), (CollectorErrors, CollectorErrors)); + HandleErrorAndDie(); + return; + } + + auto currentCollectorIterator = collectorsOfCurrentChannel->begin(); + if (currentCollectorIterator == collectorsOfCurrentChannel->end()) { + STLOG(NLog::PRI_ERROR, NKikimrServices::KEYVALUE_GC, KVC17, + "Collectors must be exist in the current channel we try to resend the request", + (TabletId, TabletInfo->TabletID), (Channel, GetCurretChannelId()), (CollectorErrors, CollectorErrors)); + HandleErrorAndDie(); + return; + } + + ui32 channelId = GetCurretChannelId(); + ui32 groupId = currentCollectorIterator->first; + STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC03, "Collector retrying", + (TabletId, TabletInfo->TabletID), (GroupId, groupId), (Channel, channelId)); + SendTheRequest(); + } + + void HandlePoisonPill() { + STLOG(NLog::PRI_ERROR, NKikimrServices::KEYVALUE_GC, KVC12, "Poisoned", + (TabletId, TabletInfo->TabletID)); + PassAway(); + } + + void HandleErrorAndDie() { + STLOG(NLog::PRI_ERROR, NKikimrServices::KEYVALUE_GC, KVC18, "Garbage Collector catch the error, send PoisonPill to the tablet", + (TabletId, TabletInfo->TabletID)); + Send(KeyValueActorId, new TEvents::TEvPoisonPill()); + PassAway(); + } + + STATEFN(StateWait) { switch (ev->GetTypeRewrite()) { - HFunc(TEvBlobStorage::TEvCollectGarbageResult, Handle); - hFunc(TEvKeyValue::TEvContinueGC, Handle); - HFunc(TEvents::TEvWakeup, Handle); - HFunc(TEvents::TEvPoisonPill, Handle); + hFunc(TEvBlobStorage::TEvCollectGarbageResult, Handle); + hFunc(TEvKeyValue::TEvContinueGC, HandleContinueGC); + cFunc(TEvents::TEvWakeup::EventType, HandleWakeUp); + cFunc(TEvents::TEvPoisonPill::EventType, HandlePoisonPill); default: break; } diff --git a/ydb/core/keyvalue/keyvalue_collector_ut.cpp b/ydb/core/keyvalue/keyvalue_collector_ut.cpp index f414deca5c..835f1be056 100644 --- a/ydb/core/keyvalue/keyvalue_collector_ut.cpp +++ b/ydb/core/keyvalue/keyvalue_collector_ut.cpp @@ -217,6 +217,13 @@ Y_UNIT_TEST(TestKeyValueCollectorMultiple) { context.Send(new TEvBlobStorage::TEvCollectGarbageResult(NKikimrProto::OK, collect->TabletId, collect->RecordGeneration, collect->PerGenerationCounter, collect->Channel)); + + if (collect && collect->DoNotKeep && collect->DoNotKeep->size()) { + auto complete = context.GrabEvent<TEvKeyValue::TEvPartialCompleteGC>(handle); + complete->CollectedDoNotKeep.clear(); + auto cont = std::make_unique<TEvKeyValue::TEvContinueGC>(std::move(complete->CollectedDoNotKeep)); + context.Send(cont.release()); + } } UNIT_ASSERT(erased == 8); @@ -232,32 +239,43 @@ Y_UNIT_TEST(TestKeyValueCollectorMany) { TVector<TLogoBlobID> keep; TVector<TLogoBlobID> doNotKeep; - doNotKeep.reserve(MaxCollectGarbageFlagsPerMessage + 1); - for (ui32 idx = 1; idx <= MaxCollectGarbageFlagsPerMessage + 1; ++idx) { + doNotKeep.reserve(MaxCollectGarbageFlagsPerMessage * 2); + doNotKeep.reserve(MaxCollectGarbageFlagsPerMessage * 2); + for (ui32 idx = 0; idx < MaxCollectGarbageFlagsPerMessage * 2; ++idx) { doNotKeep.emplace_back(0x10010000001000Bull, idx, 58949, NKeyValue::BLOB_CHANNEL, 1209816, 10); keep.emplace_back(0x10010000001000Bull, idx, 58949, NKeyValue::BLOB_CHANNEL, 1209816, 10); - } TIntrusivePtr<NKeyValue::TCollectOperation> operation(new NKeyValue::TCollectOperation(100, 100, std::move(keep), std::move(doNotKeep))); context.SetActor(CreateKeyValueCollector( context.GetTabletActorId(), operation, context.GetTabletInfo().Get(), 200, 200, true)); - for (ui32 idx = 0; idx < 7; ++idx) { + auto handleGC = [&](bool withContinueGC, ui32 keepSize, ui32 doNotKeepSize) { TAutoPtr<IEventHandle> handle; auto collect = context.GrabEvent<TEvBlobStorage::TEvCollectGarbage>(handle); UNIT_ASSERT(collect); - + UNIT_ASSERT_VALUES_EQUAL((collect->Keep ? collect->Keep->size() : 0), keepSize); + UNIT_ASSERT_VALUES_EQUAL((collect->DoNotKeep ? collect->DoNotKeep->size() : 0), doNotKeepSize); context.Send(new TEvBlobStorage::TEvCollectGarbageResult(NKikimrProto::OK, collect->TabletId, collect->RecordGeneration, collect->PerGenerationCounter, collect->Channel)); - if (idx == 1 || idx == 5) { - auto complete = context.GrabEvent<TEvKeyValue::TEvPartitialCompleteGC>(handle); + if (withContinueGC) { + auto complete = context.GrabEvent<TEvKeyValue::TEvPartialCompleteGC>(handle); complete->CollectedDoNotKeep.clear(); auto cont = std::make_unique<TEvKeyValue::TEvContinueGC>(std::move(complete->CollectedDoNotKeep)); context.Send(cont.release()); } - } + }; + + handleGC(true, 20, 20); // group 0 + handleGC(true, 10, 10); // group 1 + handleGC(true, 0, 10'000); // group 2 DoNotKeep 30..10029 + handleGC(true, 30, 9'970); // group 2 DoNotKeep 10030..19999 Keep 30.59 + handleGC(false, 10'000, 0); // group 2 Keep 60..10059 + handleGC(false, 9'940, 0); // group 2 Keep 10060..20000 + handleGC(false, 0, 0); // group 3 + handleGC(false, 0, 0); // group 4 + handleGC(false, 0, 0); // group 5 TAutoPtr<IEventHandle> handle; auto eraseCollect = context.GrabEvent<TEvKeyValue::TEvCompleteGC>(handle); diff --git a/ydb/core/keyvalue/keyvalue_events.h b/ydb/core/keyvalue/keyvalue_events.h index 2b9a4f11a0..e504291f51 100644 --- a/ydb/core/keyvalue/keyvalue_events.h +++ b/ydb/core/keyvalue/keyvalue_events.h @@ -25,7 +25,7 @@ struct TEvKeyValue { EvReportWriteLatency, EvUpdateWeights, EvCompleteGC, - EvPartitialCompleteGC, + EvPartialCompleteGC, EvContinueGC, EvRead = EvRequest + 16, @@ -202,11 +202,14 @@ struct TEvKeyValue { TEvCompleteGC() { } }; - struct TEvPartitialCompleteGC : public TEventLocal<TEvPartitialCompleteGC, EvPartitialCompleteGC> { - TMaybe<NKeyValue::THelpers::TGenerationStep> CollectedGenerationStep; + struct TEvPartialCompleteGC : public TEventLocal<TEvPartialCompleteGC, EvPartialCompleteGC> { TVector<TLogoBlobID> CollectedDoNotKeep; - TEvPartitialCompleteGC() { } + TEvPartialCompleteGC() { } + + TEvPartialCompleteGC(TVector<TLogoBlobID> &&doNotKeeps) + : CollectedDoNotKeep(std::move(doNotKeeps)) + { } }; struct TEvContinueGC : public TEventLocal<TEvContinueGC, EvContinueGC> { diff --git a/ydb/core/keyvalue/keyvalue_flat_impl.h b/ydb/core/keyvalue/keyvalue_flat_impl.h index 788c78fac9..ae2ce8331c 100644 --- a/ydb/core/keyvalue/keyvalue_flat_impl.h +++ b/ydb/core/keyvalue/keyvalue_flat_impl.h @@ -246,7 +246,7 @@ protected: KV_SIMPLE_TX(EraseCollect); KV_SIMPLE_TX(RegisterInitialGCCompletion); KV_SIMPLE_TX(CompleteGC); - KV_SIMPLE_TX(PartitialCompleteGC); + KV_SIMPLE_TX(PartialCompleteGC); TKeyValueState State; TDeque<TAutoPtr<IEventHandle>> InitialEventsQueue; @@ -319,10 +319,11 @@ protected: Execute(new TTxCompleteGC(this), ctx); } - void Handle(TEvKeyValue::TEvPartitialCompleteGC::TPtr &ev, const TActorContext &ctx) { + void Handle(TEvKeyValue::TEvPartialCompleteGC::TPtr &ev, const TActorContext &ctx) { LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletID() << " Handle TEvPartitialCompleteGC " << ev->Get()->ToString()); - Execute(new TTxPartitialCompleteGC(this), ctx); + State.OnEvPartialCompleteGC(ev->Get()); + Execute(new TTxPartialCompleteGC(this), ctx); } void Handle(TEvKeyValue::TEvCollect::TPtr &ev, const TActorContext &ctx) { @@ -516,7 +517,7 @@ public: HFunc(TEvKeyValue::TEvEraseCollect, Handle); HFunc(TEvKeyValue::TEvCompleteGC, Handle); - HFunc(TEvKeyValue::TEvPartitialCompleteGC, Handle); + HFunc(TEvKeyValue::TEvPartialCompleteGC, Handle); HFunc(TEvKeyValue::TEvCollect, Handle); HFunc(TEvKeyValue::TEvStoreCollect, Handle); HFunc(TEvKeyValue::TEvRequest, Handle); diff --git a/ydb/core/keyvalue/keyvalue_state.h b/ydb/core/keyvalue/keyvalue_state.h index 2559ef7671..882bfe5680 100644 --- a/ydb/core/keyvalue/keyvalue_state.h +++ b/ydb/core/keyvalue/keyvalue_state.h @@ -290,8 +290,9 @@ protected: NMetrics::TResourceMetrics* ResourceMetrics; - TMaybe<NKeyValue::THelpers::TGenerationStep> PartitialCollectedGenerationStep; - TVector<TLogoBlobID> PartitialCollectedDoNotKeep; + TMaybe<NKeyValue::THelpers::TGenerationStep> PartialCollectedGenerationStep; + TVector<TLogoBlobID> PartialCollectedDoNotKeep; + bool RepeatGCTX = false; public: TKeyValueState(); @@ -338,15 +339,15 @@ public: void RemoveFromTrashBySoftBarrier(ISimpleDb &db, const TActorContext &ctx, const NKeyValue::THelpers::TGenerationStep &genStep); void UpdateStoredState(ISimpleDb &db, const TActorContext &ctx, const NKeyValue::THelpers::TGenerationStep &genStep); void UpdateGC(ISimpleDb &db, const TActorContext &ctx, bool updateTrash, bool updateState); - void UpdateAfterPartitialGC(ISimpleDb &db, const TActorContext &ctx); + void UpdateAfterPartialGC(ISimpleDb &db, const TActorContext &ctx); void StoreCollectExecute(ISimpleDb &db, const TActorContext &ctx); void StoreCollectComplete(const TActorContext &ctx); void EraseCollectExecute(ISimpleDb &db, const TActorContext &ctx); void EraseCollectComplete(const TActorContext &ctx); void CompleteGCExecute(ISimpleDb &db, const TActorContext &ctx); void CompleteGCComplete(const TActorContext &ctx); - void PartitialCompleteGCExecute(ISimpleDb &db, const TActorContext &ctx); - void PartitialCompleteGCComplete(const TActorContext &ctx); + void PartialCompleteGCExecute(ISimpleDb &db, const TActorContext &ctx); + void PartialCompleteGCComplete(const TActorContext &ctx); void SendStoreCollect(const TActorContext &ctx, const THelpers::TGenerationStep &genStep, TVector<TLogoBlobID> &keep, TVector<TLogoBlobID> &doNotKeep); void StartGC(const TActorContext &ctx, const THelpers::TGenerationStep &genStep, @@ -356,7 +357,7 @@ public: void OnEvCollectDone(ui64 perGenerationCounterStepSize, TActorId collector, const TActorContext &ctx); void OnEvEraseCollect(const TActorContext &ctx); void OnEvCompleteGC(); - void OnEvPartitialCompleteGC(TEvKeyValue::TEvPartitialCompleteGC *ev); + void OnEvPartialCompleteGC(TEvKeyValue::TEvPartialCompleteGC *ev); void Reply(THolder<TIntermediate> &intermediate, const TActorContext &ctx, const TTabletStorageInfo *info); diff --git a/ydb/core/keyvalue/keyvalue_state_collect.cpp b/ydb/core/keyvalue/keyvalue_state_collect.cpp index 4651509c4c..9ca39f19fd 100644 --- a/ydb/core/keyvalue/keyvalue_state_collect.cpp +++ b/ydb/core/keyvalue/keyvalue_state_collect.cpp @@ -1,4 +1,5 @@ #include "keyvalue_state.h" +#include <ydb/core/util/stlog.h> namespace NKikimr { namespace NKeyValue { @@ -59,6 +60,10 @@ void TKeyValueState::RemoveFromTrashDoNotKeep(ISimpleDb &db, const TActorContext Y_VERIFY(num == 1); CountTrashCollected(id.BlobSize()); } + STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC24, "Remove from Trash", + (TabletId, TabletId), + (RemovedCount, collectedDoNotKeep.size()), + (TrashCount, Trash.size())); } void TKeyValueState::RemoveFromTrashBySoftBarrier(ISimpleDb &db, const TActorContext &ctx, @@ -69,6 +74,7 @@ void TKeyValueState::RemoveFromTrashBySoftBarrier(ISimpleDb &db, const TActorCon // remove trash entries that were not marked as 'Keep' before, but which are automatically deleted by this barrier // to prevent them from being added to 'DoNotKeep' list after + ui32 counter = 0; for (auto it = Trash.begin(); it != Trash.end(); ) { THelpers::TGenerationStep trashGenStep = THelpers::GenerationStep(*it); bool afterStoredSoftBarrier = trashGenStep > THelpers::TGenerationStep(storedCollectGeneration, storedCollectStep); @@ -77,6 +83,10 @@ void TKeyValueState::RemoveFromTrashBySoftBarrier(ISimpleDb &db, const TActorCon CountTrashCollected(it->BlobSize()); THelpers::DbEraseTrash(*it, db, ctx); it = Trash.erase(it); + if (++counter >= MaxCollectGarbageFlagsPerMessage) { + RepeatGCTX = true; + break; + } } else { ++it; } @@ -91,12 +101,8 @@ void TKeyValueState::UpdateStoredState(ISimpleDb &db, const TActorContext &ctx, THelpers::DbUpdateState(StoredState, db, ctx); } -void TKeyValueState::UpdateAfterPartitialGC(ISimpleDb &db, const TActorContext &ctx) { - RemoveFromTrashDoNotKeep(db, ctx, PartitialCollectedDoNotKeep); - if (PartitialCollectedGenerationStep) { - RemoveFromTrashBySoftBarrier(db, ctx, *PartitialCollectedGenerationStep); - UpdateStoredState(db, ctx, *PartitialCollectedGenerationStep); - } +void TKeyValueState::UpdateAfterPartialGC(ISimpleDb &db, const TActorContext &ctx) { + RemoveFromTrashDoNotKeep(db, ctx, PartialCollectedDoNotKeep); } void TKeyValueState::UpdateGC(ISimpleDb &db, const TActorContext &ctx, bool updateTrash, bool updateState) { @@ -174,24 +180,42 @@ void TKeyValueState::EraseCollectComplete(const TActorContext &ctx) { } void TKeyValueState::CompleteGCExecute(ISimpleDb &db, const TActorContext &ctx) { - UpdateGC(db, ctx, true, true); + ui64 collectGeneration = CollectOperation->Header.GetCollectGeneration(); + ui64 collectStep = CollectOperation->Header.GetCollectStep(); + auto collectGenStep = THelpers::TGenerationStep(collectGeneration, collectStep); + RemoveFromTrashBySoftBarrier(db, ctx, collectGenStep); + UpdateStoredState(db, ctx, collectGenStep); } void TKeyValueState::CompleteGCComplete(const TActorContext &ctx) { + if (RepeatGCTX) { + STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC20, "Repeat CompleteGC", + (TabletId, TabletId), + (TrashCount, Trash.size())); + ctx.Send(ctx.SelfID, new TEvKeyValue::TEvCompleteGC()); + RepeatGCTX = false; + return; + } Y_VERIFY(CollectOperation); CollectOperation.Reset(nullptr); IsCollectEventSent = false; + STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC22, "CompleteGC Complete", + (TabletId, TabletId), + (TrashCount, Trash.size())); PrepareCollectIfNeeded(ctx); } -void TKeyValueState::PartitialCompleteGCExecute(ISimpleDb &db, const TActorContext &ctx) { - UpdateAfterPartitialGC(db, ctx); +void TKeyValueState::PartialCompleteGCExecute(ISimpleDb &db, const TActorContext &ctx) { + UpdateAfterPartialGC(db, ctx); } -void TKeyValueState::PartitialCompleteGCComplete(const TActorContext &ctx) { - PartitialCollectedDoNotKeep.clear(); - ctx.Send(CollectorActorId, new TEvKeyValue::TEvContinueGC(std::move(PartitialCollectedDoNotKeep))); +void TKeyValueState::PartialCompleteGCComplete(const TActorContext &ctx) { + PartialCollectedDoNotKeep.clear(); + STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC23, "Send ContinueGC", + (TabletId, TabletId), + (TrashCount, Trash.size())); + ctx.Send(CollectorActorId, new TEvKeyValue::TEvContinueGC(std::move(PartialCollectedDoNotKeep))); } // Prepare the completely new full collect operation with the same gen/step, but with correct keep & doNotKeep lists @@ -302,11 +326,9 @@ void TKeyValueState::OnEvCompleteGC() { CountLatencyBsCollect(); } -void TKeyValueState::OnEvPartitialCompleteGC(TEvKeyValue::TEvPartitialCompleteGC *ev) { - PartitialCollectedGenerationStep = std::move(ev->CollectedGenerationStep); - PartitialCollectedDoNotKeep = std::move(ev->CollectedDoNotKeep); +void TKeyValueState::OnEvPartialCompleteGC(TEvKeyValue::TEvPartialCompleteGC *ev) { + PartialCollectedDoNotKeep = std::move(ev->CollectedDoNotKeep); } } // NKeyValue } // NKikimr - diff --git a/ydb/core/keyvalue/keyvalue_ut.cpp b/ydb/core/keyvalue/keyvalue_ut.cpp index 53e265aceb..96b9347b7f 100644 --- a/ydb/core/keyvalue/keyvalue_ut.cpp +++ b/ydb/core/keyvalue/keyvalue_ut.cpp @@ -25,6 +25,7 @@ void SetupLogging(TTestActorRuntime& runtime) { NActors::NLog::EPriority otherPriority = NLog::PRI_ERROR; runtime.SetLogPriority(NKikimrServices::KEYVALUE, priority); + runtime.SetLogPriority(NKikimrServices::KEYVALUE_GC, priority); runtime.SetLogPriority(NKikimrServices::BOOTSTRAPPER, priority); runtime.SetLogPriority(NKikimrServices::TABLET_MAIN, priority); runtime.SetLogPriority(NKikimrServices::TABLET_EXECUTOR, priority); @@ -88,6 +89,7 @@ struct TTestContext { Runtime.Reset(new TTestBasicRuntime); Runtime->SetScheduledLimit(200); Runtime->SetLogPriority(NKikimrServices::KEYVALUE, NLog::PRI_DEBUG); + Runtime->SetDispatchedEventsLimit(25'000'000); SetupLogging(*Runtime); SetupTabletServices(*Runtime); setup(*Runtime); @@ -2376,17 +2378,12 @@ Y_UNIT_TEST(TestLargeWriteAndDelete) { tc.Prepare(dispatchName, setup, activeZone); ExecuteObtainLock(tc, 1); ui32 iteration = 0; - // for (ui32 iteration = 0; iteration < 10; ++iteration) { - TDeque<TKeyValuePair> keys; - for (ui32 idx = 0; idx < 15'000; ++idx) { - TString key = TStringBuilder() << iteration << ':' << idx; - keys.push_back({key, "value"}); - } - ExecuteWrite(tc, keys, 1, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME); - - TString fromKey = TStringBuilder() << iteration << ':' << 1'000; - ExecuteDeleteRange(tc, fromKey, EBorderKind::Include, "", EBorderKind::Without, 1); - // } + TDeque<TKeyValuePair> keys; + for (ui32 idx = 0; idx < 15'000; ++idx) { + TString key = TStringBuilder() << iteration << ':' << idx; + keys.push_back({key, ""}); + } + ExecuteWrite(tc, keys, 1, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME); ExecuteDeleteRange(tc, "", EBorderKind::Without, "", EBorderKind::Without, 1); }); } |