aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkruall <kruall@ydb.tech>2022-09-14 15:13:40 +0300
committerkruall <kruall@ydb.tech>2022-09-14 15:13:40 +0300
commit93f18fccde2eeff3db4784ee8e37bad102a8d5da (patch)
tree11e1ba2c3740bcbb5e1512d4436f01aae87cb90c
parent211d3af96ecbc839df06d53a1868a66bd6741323 (diff)
downloadydb-93f18fccde2eeff3db4784ee8e37bad102a8d5da.tar.gz
Split GC with blob without keep flag,
-rw-r--r--ydb/core/keyvalue/keyvalue_collector.cpp300
-rw-r--r--ydb/core/keyvalue/keyvalue_collector_ut.cpp34
-rw-r--r--ydb/core/keyvalue/keyvalue_events.h11
-rw-r--r--ydb/core/keyvalue/keyvalue_flat_impl.h9
-rw-r--r--ydb/core/keyvalue/keyvalue_state.h13
-rw-r--r--ydb/core/keyvalue/keyvalue_state_collect.cpp54
-rw-r--r--ydb/core/keyvalue/keyvalue_ut.cpp19
7 files changed, 237 insertions, 203 deletions
diff --git a/ydb/core/keyvalue/keyvalue_collector.cpp b/ydb/core/keyvalue/keyvalue_collector.cpp
index 0cac380251..25dc777d24 100644
--- a/ydb/core/keyvalue/keyvalue_collector.cpp
+++ b/ydb/core/keyvalue/keyvalue_collector.cpp
@@ -31,12 +31,6 @@ class TKeyValueCollector : public TActorBootstrapped<TKeyValueCollector> {
// [channel][groupId]
TVector<TMap<ui32, TGroupCollector>> CollectorForGroupForChannel;
ui32 EndChannel = 0;
- bool IsMultiStepMode = false;
- TMap<ui32, TGroupCollector>::iterator CurrentChannelGroup;
-
- // For Keep
- ui32 ChannelIdxInVector = 0;
- TMaybe<THelpers::TGenerationStep> MinGenStepInCircle;
// For DoNotKeep
TVector<TLogoBlobID> CollectedDoNotKeep;
@@ -57,7 +51,6 @@ public:
, BackoffTimer(CollectorErrorInitialBackoffMs, CollectorErrorMaxBackoffMs)
, CollectorErrors(0)
, IsSpringCleanup(isSpringCleanup)
- , IsMultiStepMode(CollectOperation->Keep.size() + CollectOperation->DoNotKeep.size() > MaxCollectGarbageFlagsPerMessage)
{
Y_VERIFY(CollectOperation.Get());
}
@@ -66,11 +59,11 @@ public:
return EndChannel - 1 - channelIdx;
}
- ui32 GetChannelIdxFromVecIdx(ui32 deqIdx) {
+ ui32 GetChannelIdFromVecIdx(ui32 deqIdx) {
return EndChannel - 1 - deqIdx;
}
- void Bootstrap(const TActorContext &ctx) {
+ void Bootstrap() {
EndChannel = TabletInfo->Channels.size();
CollectorForGroupForChannel.resize(EndChannel - BLOB_CHANNEL);
for (ui32 channelIdx = BLOB_CHANNEL; channelIdx < EndChannel; ++channelIdx) {
@@ -88,12 +81,8 @@ public:
}
}
- if (IsMultiStepMode) {
- CollectedDoNotKeep.reserve(MaxCollectGarbageFlagsPerMessage);
- }
-
STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC04, "Start KeyValueCollector",
- (TabletId, TabletInfo->TabletID), (IsMultiStepMode, IsMultiStepMode),
+ (TabletId, TabletInfo->TabletID),
(Keep, CollectOperation->Keep.size()), (DoNotKeep, CollectOperation->DoNotKeep.size()));
Sort(CollectOperation->Keep);
@@ -109,171 +98,138 @@ public:
blob.ToString().c_str());
CollectorForGroupForChannel[GetVecIdxFromChannelIdx(blob.Channel())][groupId].DoNotKeep.push_back(blob);
}
+ ui64 maxDoNotKeepSizeInGroupChannel = 0;
+ for (auto &groups : CollectorForGroupForChannel) {
+ for (auto &[groupId, collector] : groups) {
+ maxDoNotKeepSizeInGroupChannel = Max(maxDoNotKeepSizeInGroupChannel, collector.DoNotKeep.size());
+ }
+ }
+ CollectedDoNotKeep.reserve(Min(maxDoNotKeepSizeInGroupChannel, MaxCollectGarbageFlagsPerMessage));
- MinGenStepInCircle = THelpers::TGenerationStep(Max<ui32>(), Max<ui32>());
- ChannelIdxInVector = CollectorForGroupForChannel.size() - 1;
- CurrentChannelGroup = CollectorForGroupForChannel.back().begin();
- SendTheRequest(ctx);
+ SendTheRequest();
Become(&TThis::StateWait);
}
- bool ChangeChannel(const TActorContext &ctx) {
- if (CollectorForGroupForChannel.back().empty()) {
- while (CollectorForGroupForChannel.size() && CollectorForGroupForChannel.back().empty()) {
- STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC09, "Empty channel, it's erased",
- (TabletId, TabletInfo->TabletID),
- (Channel, GetChannelIdxFromVecIdx(ChannelIdxInVector)));
- CollectorForGroupForChannel.pop_back();
- }
- if (CollectorForGroupForChannel.empty()) {
- STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC06, "Send TEvCompleteGC and die",
- (TabletId, TabletInfo->TabletID),
- (ErasedGroupId, CurrentChannelGroup->first),
- (Channel, GetChannelIdxFromVecIdx(ChannelIdxInVector)));
- ctx.Send(KeyValueActorId, new TEvKeyValue::TEvCompleteGC());
- Die(ctx);
- return true;
- }
- ChannelIdxInVector = CollectorForGroupForChannel.size() - 1;
- CurrentChannelGroup = CollectorForGroupForChannel[ChannelIdxInVector].begin();
- } else {
- do {
- if (ChannelIdxInVector) {
- STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC07, "Move to next channel",
- (TabletId, TabletInfo->TabletID),
- (ErasedGroupId, CurrentChannelGroup->first),
- (Channel, GetChannelIdxFromVecIdx(ChannelIdxInVector)));
- ChannelIdxInVector--;
- } else {
- STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC10, "End of round; Send PartitialCompleteGC",
- (TabletId, TabletInfo->TabletID),
- (ErasedGroupId, CurrentChannelGroup->first),
- (Channel, GetChannelIdxFromVecIdx(ChannelIdxInVector)),
- (CollectedDoNotKeep, CollectedDoNotKeep.size()));
- ChannelIdxInVector = CollectorForGroupForChannel.size() - 1;
- CurrentChannelGroup = CollectorForGroupForChannel[ChannelIdxInVector].begin();
- SendPartitialCompleteGC(true);
- return true;
- }
- } while (CollectorForGroupForChannel[ChannelIdxInVector].empty());
- CurrentChannelGroup = CollectorForGroupForChannel[ChannelIdxInVector].begin();
- }
-
- return false;
+ ui32 GetCurretChannelId() {
+ return GetChannelIdFromVecIdx(CollectorForGroupForChannel.size() - 1);
}
- void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr &ev, const TActorContext &ctx) {
+ void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr &ev) {
NKikimrProto::EReplyStatus status = ev->Get()->Status;
STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC11, "Receive TEvCollectGarbageResult",
(TabletId, TabletInfo->TabletID),
(Status, status));
+ auto collectorsOfCurrentChannel = CollectorForGroupForChannel.rbegin();
+ if (collectorsOfCurrentChannel == CollectorForGroupForChannel.rend()) {
+ STLOG(NLog::PRI_ERROR, NKikimrServices::KEYVALUE_GC, KVC17,
+ "Collectors must be exist when we recieve TEvCollectGarbageResult",
+ (TabletId, TabletInfo->TabletID), (CollectorErrors, CollectorErrors));
+ HandleErrorAndDie();
+ return;
+ }
+
+ auto currentCollectorIterator = collectorsOfCurrentChannel->begin();
+ if (currentCollectorIterator == collectorsOfCurrentChannel->end()) {
+ STLOG(NLog::PRI_ERROR, NKikimrServices::KEYVALUE_GC, KVC17,
+ "Collectors must be exist in the current channel when we recieve TEvCollectGarbageResult",
+ (TabletId, TabletInfo->TabletID), (Channel, GetCurretChannelId()), (CollectorErrors, CollectorErrors));
+ HandleErrorAndDie();
+ return;
+ }
+
if (status == NKikimrProto::OK) {
// Success
-
bool isLastRequestInCollector = false;
{
- TGroupCollector &collector = CurrentChannelGroup->second;
+ TGroupCollector &collector = currentCollectorIterator->second;
isLastRequestInCollector = (collector.Step == collector.Keep.size() + collector.DoNotKeep.size());
}
if (isLastRequestInCollector) {
STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC08, "Last group was empty, it's erased",
- (TabletId, TabletInfo->TabletID),
- (Status, status),
- (ErasedGroupId, CurrentChannelGroup->first));
- CurrentChannelGroup = CollectorForGroupForChannel[ChannelIdxInVector].erase(CurrentChannelGroup);
- } else {
- STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC08, "Move to next group, it's erased",
- (TabletId, TabletInfo->TabletID),
- (Status, status),
- (ErasedGroupId, CurrentChannelGroup->first));
- CurrentChannelGroup++;
+ (TabletId, TabletInfo->TabletID), (GroupId, currentCollectorIterator->first), (Channel, GetCurretChannelId()));
+ currentCollectorIterator = collectorsOfCurrentChannel->erase(currentCollectorIterator);
}
- if (CurrentChannelGroup == CollectorForGroupForChannel[ChannelIdxInVector].end()) {
- if (ChangeChannel(ctx)) {
- return;
- }
+ if (currentCollectorIterator == collectorsOfCurrentChannel->end()) {
+ STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC08, "Last channel was empty, it's erased",
+ (TabletId, TabletInfo->TabletID), (Channel, GetCurretChannelId()));
+ CollectorForGroupForChannel.pop_back();
}
- SendTheRequest(ctx);
+ if (CollectedDoNotKeep.size()) {
+ SendPartialCompleteGC();
+ return;
+ }
+ if (CollectorForGroupForChannel.empty()) {
+ SendCompleteGCAndDie();
+ return;
+ }
+ SendTheRequest();
return;
}
- ui32 channelIdx = GetChannelIdxFromVecIdx(ChannelIdxInVector);
- ui32 groupId = CurrentChannelGroup->first;
+ ui32 channelId = GetCurretChannelId();
+ ui32 groupId = currentCollectorIterator->first;
CollectorErrors++;
if (status == NKikimrProto::RACE || status == NKikimrProto::BLOCKED || status == NKikimrProto::NO_GROUP || CollectorErrors > CollectorMaxErrors) {
- LOG_ERROR_S(ctx, NKikimrServices::KEYVALUE_GC, "Tablet# " << TabletInfo->TabletID
- << " Collector got Status# " << NKikimrProto::EReplyStatus_Name(status)
- << " from Group# " << groupId << " Channel# " << channelIdx
- << " CollectorErrors# " << CollectorErrors
- << " Marker# KVC01");
- // Die
- ctx.Send(KeyValueActorId, new TEvents::TEvPoisonPill());
- Die(ctx);
+ STLOG(NLog::PRI_ERROR, NKikimrServices::KEYVALUE_GC, KVC01, "Collector got not OK status",
+ (TabletId, TabletInfo->TabletID), (GroupId, groupId), (Channel, channelId), (Status, status),
+ (CollectorErrors, CollectorErrors), (CollectorMaxErrors, CollectorMaxErrors));
+ HandleErrorAndDie();
return;
}
// Rertry
ui64 backoffMs = BackoffTimer.NextBackoffMs();
+ STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC02, "Collector got not OK status, retry",
+ (TabletId, TabletInfo->TabletID), (GroupId, groupId), (Channel, channelId),
+ (Status, status), (BackoffMs, backoffMs), (RetryingImmediately, (backoffMs ? "no" : "yes")));
if (backoffMs) {
const TDuration &timeout = TDuration::MilliSeconds(backoffMs);
- ctx.Schedule(timeout, new TEvents::TEvWakeup());
+ TActivationContext::Schedule(timeout, new IEventHandle(TEvents::TEvWakeup::EventType, 0, SelfId(), SelfId(), nullptr, 0));
} else {
- LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE_GC, "Tablet# " << TabletInfo->TabletID
- << " Collector got Status# " << NKikimrProto::EReplyStatus_Name(status)
- << " from Group# " << groupId << " Channel# " << channelIdx
- << " Retrying immediately. Marker# KVC02");
- SendTheRequest(ctx);
+ SendTheRequest();
}
}
- void Handle(TEvents::TEvWakeup::TPtr &ev, const TActorContext &ctx) {
- Y_UNUSED(ev);
- ui32 channelIdx = GetChannelIdxFromVecIdx(ChannelIdxInVector);
- ui32 groupId = CurrentChannelGroup->first;
- LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE_GC, "Tablet# " << TabletInfo->TabletID
- << " Collector retrying with"
- << " Group# " << groupId << " Channel# " << channelIdx
- << " Marker# KVC03");
- SendTheRequest(ctx);
- return;
- }
-
- void Handle(TEvents::TEvPoisonPill::TPtr &ev, const TActorContext &ctx) {
- STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC12, "Poisoned",
- (TabletId, TabletInfo->TabletID));
- Y_UNUSED(ev);
- Die(ctx);
- return;
+ void SendPartialCompleteGC() {
+ STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC14, "Collector send PartialCompleteGC",
+ (TabletId, TabletInfo->TabletID),
+ (FirstDoNotKeep, (CollectedDoNotKeep.size() ? CollectedDoNotKeep[0].ToString() : "none")),
+ (CollectedDoNotKeepSize, CollectedDoNotKeep.size()));
+ Send(KeyValueActorId, new TEvKeyValue::TEvPartialCompleteGC(std::move(CollectedDoNotKeep)));
}
- void Handle(TEvKeyValue::TEvContinueGC::TPtr &ev) {
- STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE, KVC13, "Collector continue GC",
+ void SendCompleteGCAndDie() {
+ STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC19, "Collector send CompleteGC",
(TabletId, TabletInfo->TabletID));
- MinGenStepInCircle = {};
- CollectedDoNotKeep = std::move(ev->Get()->Buffer);
- CollectedDoNotKeep.clear();
- SendTheRequest(TActivationContext::AsActorContext());
+ Send(KeyValueActorId, new TEvKeyValue::TEvCompleteGC());
+ PassAway();
}
- void SendPartitialCompleteGC(bool endCircle) {
- STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE, KVC14, "Collector send PartitialCompleteGC",
- (TabletId, TabletInfo->TabletID), (EndOfRound, (endCircle ? "yes" : "no")));
- auto ev = std::make_unique<TEvKeyValue::TEvPartitialCompleteGC>();
- if (endCircle && MinGenStepInCircle) {
- ev->CollectedGenerationStep = std::move(MinGenStepInCircle);
+ void SendTheRequest() {
+ auto collectorsOfCurrentChannel = CollectorForGroupForChannel.rbegin();
+ if (collectorsOfCurrentChannel == CollectorForGroupForChannel.rend()) {
+ STLOG(NLog::PRI_ERROR, NKikimrServices::KEYVALUE_GC, KVC30,
+ "Collectors must be exist when we try to send the request",
+ (TabletId, TabletInfo->TabletID), (CollectorErrors, CollectorErrors));
+ HandleErrorAndDie();
+ return;
}
- ev->CollectedDoNotKeep = std::move(CollectedDoNotKeep);
- TActivationContext::Send(new IEventHandle(KeyValueActorId, SelfId(), ev.release()));
- }
+ auto currentCollectorIterator = collectorsOfCurrentChannel->begin();
+ if (currentCollectorIterator == collectorsOfCurrentChannel->end()) {
+ STLOG(NLog::PRI_ERROR, NKikimrServices::KEYVALUE_GC, KVC31,
+ "Collectors must be exist in the current channel we try to send the request",
+ (TabletId, TabletInfo->TabletID), (Channel, GetCurretChannelId()), (CollectorErrors, CollectorErrors));
+ HandleErrorAndDie();
+ return;
+ }
- void SendTheRequest(const TActorContext &ctx) {
THolder<TVector<TLogoBlobID>> keep;
THolder<TVector<TLogoBlobID>> doNotKeep;
- TGroupCollector &collector = CurrentChannelGroup->second;
+ TGroupCollector &collector = currentCollectorIterator->second;
ui32 doNotKeepSize = collector.DoNotKeep.size();
if (collector.Step < doNotKeepSize) {
@@ -282,16 +238,6 @@ public:
doNotKeepSize = 0;
}
- if (CollectedDoNotKeep.size() && doNotKeepSize && CollectedDoNotKeep.size() + doNotKeepSize > MaxCollectGarbageFlagsPerMessage) {
- STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE, KVC15, "CollectedDoNotKeep was oevrflow; Send PartitialCompleteGC",
- (TabletId, TabletInfo->TabletID),
- (doNotKeepSize, doNotKeepSize),
- (CollectedDoNotKeep.size, CollectedDoNotKeep.size()),
- (MaxCollectGarbageFlagsPerMessage, MaxCollectGarbageFlagsPerMessage));
- SendPartitialCompleteGC(false);
- return;
- }
-
if (doNotKeepSize) {
doNotKeepSize = Min(doNotKeepSize, (ui32)MaxCollectGarbageFlagsPerMessage);
doNotKeep.Reset(new TVector<TLogoBlobID>(doNotKeepSize));
@@ -324,37 +270,83 @@ public:
(*keep)[idx] = *it;
}
collector.Step += idx;
- if (collectedGenStep && MinGenStepInCircle) {
- MinGenStepInCircle = Min(*MinGenStepInCircle, *collectedGenStep);
- } else if (collectedGenStep) {
- MinGenStepInCircle = collectedGenStep;
- }
}
bool isLast = (collector.Keep.size() + collector.DoNotKeep.size() == collector.Step);
ui32 collectGeneration = CollectOperation->Header.CollectGeneration;
ui32 collectStep = CollectOperation->Header.CollectStep;
- ui32 channelIdx = GetChannelIdxFromVecIdx(CollectorForGroupForChannel.size() - 1);
- ui32 groupId = CurrentChannelGroup->first;
+ ui32 channelIdx = GetChannelIdFromVecIdx(CollectorForGroupForChannel.size() - 1);
+ ui32 groupId = currentCollectorIterator->first;
- STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE, KVC16, "Send GC request",
+ STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC16, "Send GC request",
(TabletId, TabletInfo->TabletID), (CollectGeneration, collectGeneration),
(CollectStep, collectStep), (ChannelIdx, channelIdx), (GroupId, groupId),
(KeepSize, keepSize), (DoNotKeepSize, doNotKeepSize), (IsLast, isLast));
- SendToBSProxy(ctx, groupId,
+ SendToBSProxy(TActivationContext::AsActorContext(), groupId,
new TEvBlobStorage::TEvCollectGarbage(TabletInfo->TabletID, RecordGeneration, PerGenerationCounter,
channelIdx, isLast, collectGeneration, collectStep,
keep.Release(), doNotKeep.Release(), TInstant::Max(), true), (ui64)TKeyValueState::ECollectCookie::Soft);
}
- STFUNC(StateWait) {
+ void HandleContinueGC(TEvKeyValue::TEvContinueGC::TPtr &ev) {
+ STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC13, "Collector continue GC",
+ (TabletId, TabletInfo->TabletID));
+ if (CollectorForGroupForChannel.empty()) {
+ SendCompleteGCAndDie();
+ return;
+ }
+ CollectedDoNotKeep = std::move(ev->Get()->Buffer);
+ CollectedDoNotKeep.clear();
+ SendTheRequest();
+ }
+
+ void HandleWakeUp() {
+ auto collectorsOfCurrentChannel = CollectorForGroupForChannel.rbegin();
+ if (collectorsOfCurrentChannel == CollectorForGroupForChannel.rend()) {
+ STLOG(NLog::PRI_ERROR, NKikimrServices::KEYVALUE_GC, KVC17,
+ "Collectors must be exist when we try to resend the request",
+ (TabletId, TabletInfo->TabletID), (CollectorErrors, CollectorErrors));
+ HandleErrorAndDie();
+ return;
+ }
+
+ auto currentCollectorIterator = collectorsOfCurrentChannel->begin();
+ if (currentCollectorIterator == collectorsOfCurrentChannel->end()) {
+ STLOG(NLog::PRI_ERROR, NKikimrServices::KEYVALUE_GC, KVC17,
+ "Collectors must be exist in the current channel we try to resend the request",
+ (TabletId, TabletInfo->TabletID), (Channel, GetCurretChannelId()), (CollectorErrors, CollectorErrors));
+ HandleErrorAndDie();
+ return;
+ }
+
+ ui32 channelId = GetCurretChannelId();
+ ui32 groupId = currentCollectorIterator->first;
+ STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC03, "Collector retrying",
+ (TabletId, TabletInfo->TabletID), (GroupId, groupId), (Channel, channelId));
+ SendTheRequest();
+ }
+
+ void HandlePoisonPill() {
+ STLOG(NLog::PRI_ERROR, NKikimrServices::KEYVALUE_GC, KVC12, "Poisoned",
+ (TabletId, TabletInfo->TabletID));
+ PassAway();
+ }
+
+ void HandleErrorAndDie() {
+ STLOG(NLog::PRI_ERROR, NKikimrServices::KEYVALUE_GC, KVC18, "Garbage Collector catch the error, send PoisonPill to the tablet",
+ (TabletId, TabletInfo->TabletID));
+ Send(KeyValueActorId, new TEvents::TEvPoisonPill());
+ PassAway();
+ }
+
+ STATEFN(StateWait) {
switch (ev->GetTypeRewrite()) {
- HFunc(TEvBlobStorage::TEvCollectGarbageResult, Handle);
- hFunc(TEvKeyValue::TEvContinueGC, Handle);
- HFunc(TEvents::TEvWakeup, Handle);
- HFunc(TEvents::TEvPoisonPill, Handle);
+ hFunc(TEvBlobStorage::TEvCollectGarbageResult, Handle);
+ hFunc(TEvKeyValue::TEvContinueGC, HandleContinueGC);
+ cFunc(TEvents::TEvWakeup::EventType, HandleWakeUp);
+ cFunc(TEvents::TEvPoisonPill::EventType, HandlePoisonPill);
default:
break;
}
diff --git a/ydb/core/keyvalue/keyvalue_collector_ut.cpp b/ydb/core/keyvalue/keyvalue_collector_ut.cpp
index f414deca5c..835f1be056 100644
--- a/ydb/core/keyvalue/keyvalue_collector_ut.cpp
+++ b/ydb/core/keyvalue/keyvalue_collector_ut.cpp
@@ -217,6 +217,13 @@ Y_UNIT_TEST(TestKeyValueCollectorMultiple) {
context.Send(new TEvBlobStorage::TEvCollectGarbageResult(NKikimrProto::OK, collect->TabletId,
collect->RecordGeneration, collect->PerGenerationCounter, collect->Channel));
+
+ if (collect && collect->DoNotKeep && collect->DoNotKeep->size()) {
+ auto complete = context.GrabEvent<TEvKeyValue::TEvPartialCompleteGC>(handle);
+ complete->CollectedDoNotKeep.clear();
+ auto cont = std::make_unique<TEvKeyValue::TEvContinueGC>(std::move(complete->CollectedDoNotKeep));
+ context.Send(cont.release());
+ }
}
UNIT_ASSERT(erased == 8);
@@ -232,32 +239,43 @@ Y_UNIT_TEST(TestKeyValueCollectorMany) {
TVector<TLogoBlobID> keep;
TVector<TLogoBlobID> doNotKeep;
- doNotKeep.reserve(MaxCollectGarbageFlagsPerMessage + 1);
- for (ui32 idx = 1; idx <= MaxCollectGarbageFlagsPerMessage + 1; ++idx) {
+ doNotKeep.reserve(MaxCollectGarbageFlagsPerMessage * 2);
+ doNotKeep.reserve(MaxCollectGarbageFlagsPerMessage * 2);
+ for (ui32 idx = 0; idx < MaxCollectGarbageFlagsPerMessage * 2; ++idx) {
doNotKeep.emplace_back(0x10010000001000Bull, idx, 58949, NKeyValue::BLOB_CHANNEL, 1209816, 10);
keep.emplace_back(0x10010000001000Bull, idx, 58949, NKeyValue::BLOB_CHANNEL, 1209816, 10);
-
}
TIntrusivePtr<NKeyValue::TCollectOperation> operation(new NKeyValue::TCollectOperation(100, 100, std::move(keep), std::move(doNotKeep)));
context.SetActor(CreateKeyValueCollector(
context.GetTabletActorId(), operation, context.GetTabletInfo().Get(), 200, 200, true));
- for (ui32 idx = 0; idx < 7; ++idx) {
+ auto handleGC = [&](bool withContinueGC, ui32 keepSize, ui32 doNotKeepSize) {
TAutoPtr<IEventHandle> handle;
auto collect = context.GrabEvent<TEvBlobStorage::TEvCollectGarbage>(handle);
UNIT_ASSERT(collect);
-
+ UNIT_ASSERT_VALUES_EQUAL((collect->Keep ? collect->Keep->size() : 0), keepSize);
+ UNIT_ASSERT_VALUES_EQUAL((collect->DoNotKeep ? collect->DoNotKeep->size() : 0), doNotKeepSize);
context.Send(new TEvBlobStorage::TEvCollectGarbageResult(NKikimrProto::OK, collect->TabletId,
collect->RecordGeneration, collect->PerGenerationCounter, collect->Channel));
- if (idx == 1 || idx == 5) {
- auto complete = context.GrabEvent<TEvKeyValue::TEvPartitialCompleteGC>(handle);
+ if (withContinueGC) {
+ auto complete = context.GrabEvent<TEvKeyValue::TEvPartialCompleteGC>(handle);
complete->CollectedDoNotKeep.clear();
auto cont = std::make_unique<TEvKeyValue::TEvContinueGC>(std::move(complete->CollectedDoNotKeep));
context.Send(cont.release());
}
- }
+ };
+
+ handleGC(true, 20, 20); // group 0
+ handleGC(true, 10, 10); // group 1
+ handleGC(true, 0, 10'000); // group 2 DoNotKeep 30..10029
+ handleGC(true, 30, 9'970); // group 2 DoNotKeep 10030..19999 Keep 30.59
+ handleGC(false, 10'000, 0); // group 2 Keep 60..10059
+ handleGC(false, 9'940, 0); // group 2 Keep 10060..20000
+ handleGC(false, 0, 0); // group 3
+ handleGC(false, 0, 0); // group 4
+ handleGC(false, 0, 0); // group 5
TAutoPtr<IEventHandle> handle;
auto eraseCollect = context.GrabEvent<TEvKeyValue::TEvCompleteGC>(handle);
diff --git a/ydb/core/keyvalue/keyvalue_events.h b/ydb/core/keyvalue/keyvalue_events.h
index 2b9a4f11a0..e504291f51 100644
--- a/ydb/core/keyvalue/keyvalue_events.h
+++ b/ydb/core/keyvalue/keyvalue_events.h
@@ -25,7 +25,7 @@ struct TEvKeyValue {
EvReportWriteLatency,
EvUpdateWeights,
EvCompleteGC,
- EvPartitialCompleteGC,
+ EvPartialCompleteGC,
EvContinueGC,
EvRead = EvRequest + 16,
@@ -202,11 +202,14 @@ struct TEvKeyValue {
TEvCompleteGC() { }
};
- struct TEvPartitialCompleteGC : public TEventLocal<TEvPartitialCompleteGC, EvPartitialCompleteGC> {
- TMaybe<NKeyValue::THelpers::TGenerationStep> CollectedGenerationStep;
+ struct TEvPartialCompleteGC : public TEventLocal<TEvPartialCompleteGC, EvPartialCompleteGC> {
TVector<TLogoBlobID> CollectedDoNotKeep;
- TEvPartitialCompleteGC() { }
+ TEvPartialCompleteGC() { }
+
+ TEvPartialCompleteGC(TVector<TLogoBlobID> &&doNotKeeps)
+ : CollectedDoNotKeep(std::move(doNotKeeps))
+ { }
};
struct TEvContinueGC : public TEventLocal<TEvContinueGC, EvContinueGC> {
diff --git a/ydb/core/keyvalue/keyvalue_flat_impl.h b/ydb/core/keyvalue/keyvalue_flat_impl.h
index 788c78fac9..ae2ce8331c 100644
--- a/ydb/core/keyvalue/keyvalue_flat_impl.h
+++ b/ydb/core/keyvalue/keyvalue_flat_impl.h
@@ -246,7 +246,7 @@ protected:
KV_SIMPLE_TX(EraseCollect);
KV_SIMPLE_TX(RegisterInitialGCCompletion);
KV_SIMPLE_TX(CompleteGC);
- KV_SIMPLE_TX(PartitialCompleteGC);
+ KV_SIMPLE_TX(PartialCompleteGC);
TKeyValueState State;
TDeque<TAutoPtr<IEventHandle>> InitialEventsQueue;
@@ -319,10 +319,11 @@ protected:
Execute(new TTxCompleteGC(this), ctx);
}
- void Handle(TEvKeyValue::TEvPartitialCompleteGC::TPtr &ev, const TActorContext &ctx) {
+ void Handle(TEvKeyValue::TEvPartialCompleteGC::TPtr &ev, const TActorContext &ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletID()
<< " Handle TEvPartitialCompleteGC " << ev->Get()->ToString());
- Execute(new TTxPartitialCompleteGC(this), ctx);
+ State.OnEvPartialCompleteGC(ev->Get());
+ Execute(new TTxPartialCompleteGC(this), ctx);
}
void Handle(TEvKeyValue::TEvCollect::TPtr &ev, const TActorContext &ctx) {
@@ -516,7 +517,7 @@ public:
HFunc(TEvKeyValue::TEvEraseCollect, Handle);
HFunc(TEvKeyValue::TEvCompleteGC, Handle);
- HFunc(TEvKeyValue::TEvPartitialCompleteGC, Handle);
+ HFunc(TEvKeyValue::TEvPartialCompleteGC, Handle);
HFunc(TEvKeyValue::TEvCollect, Handle);
HFunc(TEvKeyValue::TEvStoreCollect, Handle);
HFunc(TEvKeyValue::TEvRequest, Handle);
diff --git a/ydb/core/keyvalue/keyvalue_state.h b/ydb/core/keyvalue/keyvalue_state.h
index 2559ef7671..882bfe5680 100644
--- a/ydb/core/keyvalue/keyvalue_state.h
+++ b/ydb/core/keyvalue/keyvalue_state.h
@@ -290,8 +290,9 @@ protected:
NMetrics::TResourceMetrics* ResourceMetrics;
- TMaybe<NKeyValue::THelpers::TGenerationStep> PartitialCollectedGenerationStep;
- TVector<TLogoBlobID> PartitialCollectedDoNotKeep;
+ TMaybe<NKeyValue::THelpers::TGenerationStep> PartialCollectedGenerationStep;
+ TVector<TLogoBlobID> PartialCollectedDoNotKeep;
+ bool RepeatGCTX = false;
public:
TKeyValueState();
@@ -338,15 +339,15 @@ public:
void RemoveFromTrashBySoftBarrier(ISimpleDb &db, const TActorContext &ctx, const NKeyValue::THelpers::TGenerationStep &genStep);
void UpdateStoredState(ISimpleDb &db, const TActorContext &ctx, const NKeyValue::THelpers::TGenerationStep &genStep);
void UpdateGC(ISimpleDb &db, const TActorContext &ctx, bool updateTrash, bool updateState);
- void UpdateAfterPartitialGC(ISimpleDb &db, const TActorContext &ctx);
+ void UpdateAfterPartialGC(ISimpleDb &db, const TActorContext &ctx);
void StoreCollectExecute(ISimpleDb &db, const TActorContext &ctx);
void StoreCollectComplete(const TActorContext &ctx);
void EraseCollectExecute(ISimpleDb &db, const TActorContext &ctx);
void EraseCollectComplete(const TActorContext &ctx);
void CompleteGCExecute(ISimpleDb &db, const TActorContext &ctx);
void CompleteGCComplete(const TActorContext &ctx);
- void PartitialCompleteGCExecute(ISimpleDb &db, const TActorContext &ctx);
- void PartitialCompleteGCComplete(const TActorContext &ctx);
+ void PartialCompleteGCExecute(ISimpleDb &db, const TActorContext &ctx);
+ void PartialCompleteGCComplete(const TActorContext &ctx);
void SendStoreCollect(const TActorContext &ctx, const THelpers::TGenerationStep &genStep,
TVector<TLogoBlobID> &keep, TVector<TLogoBlobID> &doNotKeep);
void StartGC(const TActorContext &ctx, const THelpers::TGenerationStep &genStep,
@@ -356,7 +357,7 @@ public:
void OnEvCollectDone(ui64 perGenerationCounterStepSize, TActorId collector, const TActorContext &ctx);
void OnEvEraseCollect(const TActorContext &ctx);
void OnEvCompleteGC();
- void OnEvPartitialCompleteGC(TEvKeyValue::TEvPartitialCompleteGC *ev);
+ void OnEvPartialCompleteGC(TEvKeyValue::TEvPartialCompleteGC *ev);
void Reply(THolder<TIntermediate> &intermediate, const TActorContext &ctx, const TTabletStorageInfo *info);
diff --git a/ydb/core/keyvalue/keyvalue_state_collect.cpp b/ydb/core/keyvalue/keyvalue_state_collect.cpp
index 4651509c4c..9ca39f19fd 100644
--- a/ydb/core/keyvalue/keyvalue_state_collect.cpp
+++ b/ydb/core/keyvalue/keyvalue_state_collect.cpp
@@ -1,4 +1,5 @@
#include "keyvalue_state.h"
+#include <ydb/core/util/stlog.h>
namespace NKikimr {
namespace NKeyValue {
@@ -59,6 +60,10 @@ void TKeyValueState::RemoveFromTrashDoNotKeep(ISimpleDb &db, const TActorContext
Y_VERIFY(num == 1);
CountTrashCollected(id.BlobSize());
}
+ STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC24, "Remove from Trash",
+ (TabletId, TabletId),
+ (RemovedCount, collectedDoNotKeep.size()),
+ (TrashCount, Trash.size()));
}
void TKeyValueState::RemoveFromTrashBySoftBarrier(ISimpleDb &db, const TActorContext &ctx,
@@ -69,6 +74,7 @@ void TKeyValueState::RemoveFromTrashBySoftBarrier(ISimpleDb &db, const TActorCon
// remove trash entries that were not marked as 'Keep' before, but which are automatically deleted by this barrier
// to prevent them from being added to 'DoNotKeep' list after
+ ui32 counter = 0;
for (auto it = Trash.begin(); it != Trash.end(); ) {
THelpers::TGenerationStep trashGenStep = THelpers::GenerationStep(*it);
bool afterStoredSoftBarrier = trashGenStep > THelpers::TGenerationStep(storedCollectGeneration, storedCollectStep);
@@ -77,6 +83,10 @@ void TKeyValueState::RemoveFromTrashBySoftBarrier(ISimpleDb &db, const TActorCon
CountTrashCollected(it->BlobSize());
THelpers::DbEraseTrash(*it, db, ctx);
it = Trash.erase(it);
+ if (++counter >= MaxCollectGarbageFlagsPerMessage) {
+ RepeatGCTX = true;
+ break;
+ }
} else {
++it;
}
@@ -91,12 +101,8 @@ void TKeyValueState::UpdateStoredState(ISimpleDb &db, const TActorContext &ctx,
THelpers::DbUpdateState(StoredState, db, ctx);
}
-void TKeyValueState::UpdateAfterPartitialGC(ISimpleDb &db, const TActorContext &ctx) {
- RemoveFromTrashDoNotKeep(db, ctx, PartitialCollectedDoNotKeep);
- if (PartitialCollectedGenerationStep) {
- RemoveFromTrashBySoftBarrier(db, ctx, *PartitialCollectedGenerationStep);
- UpdateStoredState(db, ctx, *PartitialCollectedGenerationStep);
- }
+void TKeyValueState::UpdateAfterPartialGC(ISimpleDb &db, const TActorContext &ctx) {
+ RemoveFromTrashDoNotKeep(db, ctx, PartialCollectedDoNotKeep);
}
void TKeyValueState::UpdateGC(ISimpleDb &db, const TActorContext &ctx, bool updateTrash, bool updateState) {
@@ -174,24 +180,42 @@ void TKeyValueState::EraseCollectComplete(const TActorContext &ctx) {
}
void TKeyValueState::CompleteGCExecute(ISimpleDb &db, const TActorContext &ctx) {
- UpdateGC(db, ctx, true, true);
+ ui64 collectGeneration = CollectOperation->Header.GetCollectGeneration();
+ ui64 collectStep = CollectOperation->Header.GetCollectStep();
+ auto collectGenStep = THelpers::TGenerationStep(collectGeneration, collectStep);
+ RemoveFromTrashBySoftBarrier(db, ctx, collectGenStep);
+ UpdateStoredState(db, ctx, collectGenStep);
}
void TKeyValueState::CompleteGCComplete(const TActorContext &ctx) {
+ if (RepeatGCTX) {
+ STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC20, "Repeat CompleteGC",
+ (TabletId, TabletId),
+ (TrashCount, Trash.size()));
+ ctx.Send(ctx.SelfID, new TEvKeyValue::TEvCompleteGC());
+ RepeatGCTX = false;
+ return;
+ }
Y_VERIFY(CollectOperation);
CollectOperation.Reset(nullptr);
IsCollectEventSent = false;
+ STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC22, "CompleteGC Complete",
+ (TabletId, TabletId),
+ (TrashCount, Trash.size()));
PrepareCollectIfNeeded(ctx);
}
-void TKeyValueState::PartitialCompleteGCExecute(ISimpleDb &db, const TActorContext &ctx) {
- UpdateAfterPartitialGC(db, ctx);
+void TKeyValueState::PartialCompleteGCExecute(ISimpleDb &db, const TActorContext &ctx) {
+ UpdateAfterPartialGC(db, ctx);
}
-void TKeyValueState::PartitialCompleteGCComplete(const TActorContext &ctx) {
- PartitialCollectedDoNotKeep.clear();
- ctx.Send(CollectorActorId, new TEvKeyValue::TEvContinueGC(std::move(PartitialCollectedDoNotKeep)));
+void TKeyValueState::PartialCompleteGCComplete(const TActorContext &ctx) {
+ PartialCollectedDoNotKeep.clear();
+ STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC23, "Send ContinueGC",
+ (TabletId, TabletId),
+ (TrashCount, Trash.size()));
+ ctx.Send(CollectorActorId, new TEvKeyValue::TEvContinueGC(std::move(PartialCollectedDoNotKeep)));
}
// Prepare the completely new full collect operation with the same gen/step, but with correct keep & doNotKeep lists
@@ -302,11 +326,9 @@ void TKeyValueState::OnEvCompleteGC() {
CountLatencyBsCollect();
}
-void TKeyValueState::OnEvPartitialCompleteGC(TEvKeyValue::TEvPartitialCompleteGC *ev) {
- PartitialCollectedGenerationStep = std::move(ev->CollectedGenerationStep);
- PartitialCollectedDoNotKeep = std::move(ev->CollectedDoNotKeep);
+void TKeyValueState::OnEvPartialCompleteGC(TEvKeyValue::TEvPartialCompleteGC *ev) {
+ PartialCollectedDoNotKeep = std::move(ev->CollectedDoNotKeep);
}
} // NKeyValue
} // NKikimr
-
diff --git a/ydb/core/keyvalue/keyvalue_ut.cpp b/ydb/core/keyvalue/keyvalue_ut.cpp
index 53e265aceb..96b9347b7f 100644
--- a/ydb/core/keyvalue/keyvalue_ut.cpp
+++ b/ydb/core/keyvalue/keyvalue_ut.cpp
@@ -25,6 +25,7 @@ void SetupLogging(TTestActorRuntime& runtime) {
NActors::NLog::EPriority otherPriority = NLog::PRI_ERROR;
runtime.SetLogPriority(NKikimrServices::KEYVALUE, priority);
+ runtime.SetLogPriority(NKikimrServices::KEYVALUE_GC, priority);
runtime.SetLogPriority(NKikimrServices::BOOTSTRAPPER, priority);
runtime.SetLogPriority(NKikimrServices::TABLET_MAIN, priority);
runtime.SetLogPriority(NKikimrServices::TABLET_EXECUTOR, priority);
@@ -88,6 +89,7 @@ struct TTestContext {
Runtime.Reset(new TTestBasicRuntime);
Runtime->SetScheduledLimit(200);
Runtime->SetLogPriority(NKikimrServices::KEYVALUE, NLog::PRI_DEBUG);
+ Runtime->SetDispatchedEventsLimit(25'000'000);
SetupLogging(*Runtime);
SetupTabletServices(*Runtime);
setup(*Runtime);
@@ -2376,17 +2378,12 @@ Y_UNIT_TEST(TestLargeWriteAndDelete) {
tc.Prepare(dispatchName, setup, activeZone);
ExecuteObtainLock(tc, 1);
ui32 iteration = 0;
- // for (ui32 iteration = 0; iteration < 10; ++iteration) {
- TDeque<TKeyValuePair> keys;
- for (ui32 idx = 0; idx < 15'000; ++idx) {
- TString key = TStringBuilder() << iteration << ':' << idx;
- keys.push_back({key, "value"});
- }
- ExecuteWrite(tc, keys, 1, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME);
-
- TString fromKey = TStringBuilder() << iteration << ':' << 1'000;
- ExecuteDeleteRange(tc, fromKey, EBorderKind::Include, "", EBorderKind::Without, 1);
- // }
+ TDeque<TKeyValuePair> keys;
+ for (ui32 idx = 0; idx < 15'000; ++idx) {
+ TString key = TStringBuilder() << iteration << ':' << idx;
+ keys.push_back({key, ""});
+ }
+ ExecuteWrite(tc, keys, 1, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME);
ExecuteDeleteRange(tc, "", EBorderKind::Without, "", EBorderKind::Without, 1);
});
}