aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkruall <kruall@ydb.tech>2022-07-08 13:40:28 +0300
committerkruall <kruall@ydb.tech>2022-07-08 13:40:28 +0300
commit635fcbaa8b2abae1672956b1ff43cd07ec7d0683 (patch)
tree4e2733a9a765e273ce687ddfe89532a60e951b33
parentc4d2bb05d4c15976327070de4b7669774da1129e (diff)
downloadydb-635fcbaa8b2abae1672956b1ff43cd07ec7d0683.tar.gz
Limit count of blobs in one GC request,
-rw-r--r--ydb/core/keyvalue/keyvalue_collector.cpp210
-rw-r--r--ydb/core/keyvalue/keyvalue_collector_ut.cpp39
-rw-r--r--ydb/core/keyvalue/keyvalue_events.h17
-rw-r--r--ydb/core/keyvalue/keyvalue_flat_impl.h10
-rw-r--r--ydb/core/keyvalue/keyvalue_state.h14
-rw-r--r--ydb/core/keyvalue/keyvalue_state_collect.cpp92
6 files changed, 317 insertions, 65 deletions
diff --git a/ydb/core/keyvalue/keyvalue_collector.cpp b/ydb/core/keyvalue/keyvalue_collector.cpp
index c4d9b12054..69b1e6fa1c 100644
--- a/ydb/core/keyvalue/keyvalue_collector.cpp
+++ b/ydb/core/keyvalue/keyvalue_collector.cpp
@@ -1,5 +1,7 @@
#include "keyvalue_flat_impl.h"
+
+#include <ydb/core/base/counters.h>
#include <ydb/core/blobstorage/dsproxy/blobstorage_backoff.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
@@ -12,6 +14,7 @@ namespace NKeyValue {
struct TGroupCollector {
TDeque<TLogoBlobID> Keep;
TDeque<TLogoBlobID> DoNotKeep;
+ ui32 Step = 0;
};
class TKeyValueCollector : public TActorBootstrapped<TKeyValueCollector> {
@@ -24,7 +27,19 @@ class TKeyValueCollector : public TActorBootstrapped<TKeyValueCollector> {
ui64 CollectorErrors;
bool IsSpringCleanup;
- TMap<ui32, TMap<ui32, TGroupCollector>> CollectorForGroupForChannel;
+ // [channel][groupId]
+ TVector<TMap<ui32, TGroupCollector>> CollectorForGroupForChannel;
+ ui32 EndChannel = 0;
+ bool IsMultiStepMode = false;
+ TMap<ui32, TGroupCollector>::iterator CurrentChannelGroup;
+
+ // For Keep
+ ui32 ChannelIdxInVector = 0;
+ TMaybe<THelpers::TGenerationStep> MinGenStepInCircle;
+
+ // For DoNotKeep
+ TVector<TLogoBlobID> CollectedDoNotKeep;
+
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::KEYVALUE_ACTOR;
@@ -41,69 +56,118 @@ public:
, BackoffTimer(CollectorErrorInitialBackoffMs, CollectorErrorMaxBackoffMs)
, CollectorErrors(0)
, IsSpringCleanup(isSpringCleanup)
+ , IsMultiStepMode(CollectOperation->Keep.size() + CollectOperation->DoNotKeep.size() > MaxCollectGarbageFlagsPerMessage)
{
Y_VERIFY(CollectOperation.Get());
}
+ ui32 GetVecIdxFromChannelIdx(ui32 channelIdx) {
+ return EndChannel - 1 - channelIdx;
+ }
+
+ ui32 GetChannelIdxFromVecIdx(ui32 deqIdx) {
+ return EndChannel - 1 - deqIdx;
+ }
+
void Bootstrap(const TActorContext &ctx) {
- ui32 endChannel = TabletInfo->Channels.size();
- for (ui32 channelIdx = BLOB_CHANNEL; channelIdx < endChannel; ++channelIdx) {
+ EndChannel = TabletInfo->Channels.size();
+ CollectorForGroupForChannel.resize(EndChannel - BLOB_CHANNEL);
+ for (ui32 channelIdx = BLOB_CHANNEL; channelIdx < EndChannel; ++channelIdx) {
const auto *channelInfo = TabletInfo->ChannelInfo(channelIdx);
for (auto historyIt = channelInfo->History.begin(); historyIt != channelInfo->History.end(); ++historyIt) {
if (IsSpringCleanup) {
- CollectorForGroupForChannel[channelIdx][historyIt->GroupID];
+ CollectorForGroupForChannel[GetVecIdxFromChannelIdx(channelIdx)][historyIt->GroupID];
} else {
auto nextHistoryIt = historyIt;
nextHistoryIt++;
if (nextHistoryIt == channelInfo->History.end()) {
- CollectorForGroupForChannel[channelIdx][historyIt->GroupID];
+ CollectorForGroupForChannel[GetVecIdxFromChannelIdx(channelIdx)][historyIt->GroupID];
}
}
}
}
+ if (IsMultiStepMode) {
+ CollectedDoNotKeep.reserve(MaxCollectGarbageFlagsPerMessage);
+ }
+
+ Sort(CollectOperation->Keep);
for (const auto &blob: CollectOperation->Keep) {
ui32 groupId = TabletInfo->ChannelInfo(blob.Channel())->GroupForGeneration(blob.Generation());
Y_VERIFY(groupId != Max<ui32>(), "Keep Blob# %s is mapped to an invalid group (-1)!",
blob.ToString().c_str());
- CollectorForGroupForChannel[blob.Channel()][groupId].Keep.push_back(blob);
+ CollectorForGroupForChannel[GetVecIdxFromChannelIdx(blob.Channel())][groupId].Keep.push_back(blob);
}
for (const auto &blob: CollectOperation->DoNotKeep) {
const ui32 groupId = TabletInfo->ChannelInfo(blob.Channel())->GroupForGeneration(blob.Generation());
Y_VERIFY(groupId != Max<ui32>(), "DoNotKeep Blob# %s is mapped to an invalid group (-1)!",
blob.ToString().c_str());
- CollectorForGroupForChannel[blob.Channel()][groupId].DoNotKeep.push_back(blob);
+ CollectorForGroupForChannel[GetVecIdxFromChannelIdx(blob.Channel())][groupId].DoNotKeep.push_back(blob);
}
+ MinGenStepInCircle = THelpers::TGenerationStep(Max<ui32>(), Max<ui32>());
+ ChannelIdxInVector = CollectorForGroupForChannel.size() - 1;
+ CurrentChannelGroup = CollectorForGroupForChannel.back().begin();
SendTheRequest(ctx);
Become(&TThis::StateWait);
}
+ bool ChangeGroup(const TActorContext &ctx) {
+ if (CollectorForGroupForChannel.back().empty()) {
+ while (CollectorForGroupForChannel.size() && CollectorForGroupForChannel.back().empty()) {
+ CollectorForGroupForChannel.pop_back();
+ }
+ if (CollectorForGroupForChannel.empty()) {
+ ctx.Send(KeyValueActorId, new TEvKeyValue::TEvCompleteGC());
+ Die(ctx);
+ return true;
+ }
+ ChannelIdxInVector = CollectorForGroupForChannel.size() - 1;
+ CurrentChannelGroup = CollectorForGroupForChannel[ChannelIdxInVector].begin();
+ } else {
+ do {
+ if (ChannelIdxInVector) {
+ ChannelIdxInVector--;
+ } else {
+ ChannelIdxInVector = CollectorForGroupForChannel.size() - 1;
+ CurrentChannelGroup = CollectorForGroupForChannel[ChannelIdxInVector].begin();
+ SendPartitialCompleteGC(true);
+ return true;
+ }
+ } while (CollectorForGroupForChannel[ChannelIdxInVector].empty());
+ CurrentChannelGroup = CollectorForGroupForChannel[ChannelIdxInVector].begin();
+ }
+ return false;
+ }
+
void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr &ev, const TActorContext &ctx) {
+
NKikimrProto::EReplyStatus status = ev->Get()->Status;
if (status == NKikimrProto::OK) {
// Success
- CollectorForGroupForChannel.begin()->second.erase(
- CollectorForGroupForChannel.begin()->second.begin());
- if (CollectorForGroupForChannel.begin()->second.empty()) {
- CollectorForGroupForChannel.erase(
- CollectorForGroupForChannel.begin());
+
+ bool isLastRequestInCollector = false;
+ {
+ TGroupCollector &collector = CurrentChannelGroup->second;
+ isLastRequestInCollector = (collector.Step == collector.Keep.size() + collector.DoNotKeep.size());
}
- if (CollectorForGroupForChannel.empty()) {
- ctx.Send(KeyValueActorId, new TEvKeyValue::TEvCompleteGC());
- Die(ctx);
- return;
+ if (isLastRequestInCollector) {
+ CurrentChannelGroup = CollectorForGroupForChannel[ChannelIdxInVector].erase(CurrentChannelGroup);
+ } else {
+ CurrentChannelGroup++;
+ }
+ if (CurrentChannelGroup == CollectorForGroupForChannel[ChannelIdxInVector].end()) {
+ if (ChangeGroup(ctx)) {
+ return;
+ }
}
SendTheRequest(ctx);
return;
}
- Y_VERIFY(CollectorForGroupForChannel.begin() != CollectorForGroupForChannel.end());
- Y_VERIFY(CollectorForGroupForChannel.begin()->second.begin() !=
- CollectorForGroupForChannel.begin()->second.end());
- ui32 channelIdx = CollectorForGroupForChannel.begin()->first;
- ui32 groupId = CollectorForGroupForChannel.begin()->second.begin()->first;
+ ui32 channelIdx = GetChannelIdxFromVecIdx(ChannelIdxInVector);
+ ui32 groupId = CurrentChannelGroup->first;
CollectorErrors++;
if (status == NKikimrProto::RACE || status == NKikimrProto::BLOCKED || status == NKikimrProto::NO_GROUP || CollectorErrors > CollectorMaxErrors) {
@@ -134,11 +198,8 @@ public:
void Handle(TEvents::TEvWakeup::TPtr &ev, const TActorContext &ctx) {
Y_UNUSED(ev);
- Y_VERIFY(CollectorForGroupForChannel.begin() != CollectorForGroupForChannel.end());
- Y_VERIFY(CollectorForGroupForChannel.begin()->second.begin() !=
- CollectorForGroupForChannel.begin()->second.end());
- ui32 channelIdx = CollectorForGroupForChannel.begin()->first;
- ui32 groupId = CollectorForGroupForChannel.begin()->second.begin()->first;
+ ui32 channelIdx = GetChannelIdxFromVecIdx(ChannelIdxInVector);
+ ui32 groupId = CurrentChannelGroup->first;
LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "Tablet# " << TabletInfo->TabletID
<< " Collector retrying with"
<< " Group# " << groupId << " Channel# " << channelIdx
@@ -153,36 +214,101 @@ public:
return;
}
- void PrepareVector(const TDeque<TLogoBlobID> &in, THolder<TVector<TLogoBlobID>> &out) {
- ui64 size = in.size();
- if (size) {
- out.Reset(new TVector<TLogoBlobID>(size));
- ui64 outIdx = 0;
- for (const auto &blob: in) {
- (*out)[outIdx] = blob;
- ++outIdx;
- }
- Y_VERIFY(outIdx == size);
+ void Handle(TEvKeyValue::TEvContinueGC::TPtr &ev) {
+ LOG_DEBUG_S(TActivationContext::AsActorContext(), NKikimrServices::KEYVALUE, "Tablet# " << TabletInfo->TabletID
+ << " Collector continue GC Marker# KVC04");
+ MinGenStepInCircle = {};
+ CollectedDoNotKeep = std::move(ev->Get()->Buffer);
+ CollectedDoNotKeep.clear();
+ SendTheRequest(TActivationContext::AsActorContext());
+ }
+
+ void SendPartitialCompleteGC(bool endCircle) {
+ LOG_DEBUG_S(TActivationContext::AsActorContext(), NKikimrServices::KEYVALUE, "Tablet# " << TabletInfo->TabletID
+ << "end of round# " << (endCircle ? "yes" : "no")
+ << " Collector send PartitialCompleteGC Marker# KVC05");
+ auto ev = std::make_unique<TEvKeyValue::TEvPartitialCompleteGC>();
+ if (endCircle && MinGenStepInCircle) {
+ ev->CollectedGenerationStep = std::move(MinGenStepInCircle);
}
+ ev->CollectedDoNotKeep = std::move(CollectedDoNotKeep);
+
+ TActivationContext::Send(new IEventHandle(KeyValueActorId, SelfId(), ev.release()));
}
void SendTheRequest(const TActorContext &ctx) {
THolder<TVector<TLogoBlobID>> keep;
THolder<TVector<TLogoBlobID>> doNotKeep;
- PrepareVector(CollectorForGroupForChannel.begin()->second.begin()->second.Keep, keep);
- PrepareVector(CollectorForGroupForChannel.begin()->second.begin()->second.DoNotKeep, doNotKeep);
- ui32 channelIdx = CollectorForGroupForChannel.begin()->first;
- ui32 groupId = CollectorForGroupForChannel.begin()->second.begin()->first;
+
+ TGroupCollector &collector = CurrentChannelGroup->second;
+
+ ui32 doNotKeepSize = collector.DoNotKeep.size();
+ if (collector.Step < doNotKeepSize) {
+ doNotKeepSize -= collector.Step;
+ } else {
+ doNotKeepSize = 0;
+ }
+
+ if (doNotKeepSize && CollectedDoNotKeep.size() + doNotKeepSize > MaxCollectGarbageFlagsPerMessage) {
+ SendPartitialCompleteGC(false);
+ return;
+ }
+
+ if (doNotKeepSize) {
+ doNotKeepSize = Min(doNotKeepSize, (ui32)MaxCollectGarbageFlagsPerMessage);
+ doNotKeep.Reset(new TVector<TLogoBlobID>(doNotKeepSize));
+ auto begin = collector.DoNotKeep.begin() + collector.Step;
+ auto end = begin + doNotKeepSize;
+
+ collector.Step += doNotKeepSize;
+ Copy(begin, end, doNotKeep->begin());
+ Copy(doNotKeep->cbegin(), doNotKeep->cend(), std::back_inserter(CollectedDoNotKeep));
+ }
+
+ ui32 keepStartIdx = 0;
+ if (collector.Step >= collector.DoNotKeep.size()) {
+ keepStartIdx = collector.Step - collector.DoNotKeep.size();
+ }
+ ui32 keepSize = Min(collector.Keep.size() - keepStartIdx, MaxCollectGarbageFlagsPerMessage - doNotKeepSize);
+ if (keepSize) {
+ keep.Reset(new TVector<TLogoBlobID>(keepSize));
+ TMaybe<THelpers::TGenerationStep> collectedGenStep;
+ THelpers::TGenerationStep prevGenStep = THelpers::GenerationStep(collector.Keep.front());
+ auto begin = collector.Keep.begin() + keepStartIdx;
+ auto end = begin + keepSize;
+ ui32 idx = 0;
+ for (auto it = begin; it != end; ++it, ++idx) {
+ THelpers::TGenerationStep genStep = THelpers::GenerationStep(*it);
+ if (prevGenStep != genStep) {
+ collectedGenStep = prevGenStep;
+ prevGenStep = genStep;
+ }
+ (*keep)[idx] = *it;
+ }
+ collector.Step += idx;
+ if (collectedGenStep && MinGenStepInCircle) {
+ MinGenStepInCircle = Min(*MinGenStepInCircle, *collectedGenStep);
+ } else if (collectedGenStep) {
+ MinGenStepInCircle = collectedGenStep;
+ }
+ }
+
+ bool isLast = (collector.Keep.size() + collector.DoNotKeep.size() == collector.Step);
+
+ ui32 collectGeneration = CollectOperation->Header.CollectGeneration;
+ ui32 collectStep = CollectOperation->Header.CollectStep;
+ ui32 channelIdx = GetChannelIdxFromVecIdx(CollectorForGroupForChannel.size() - 1);
+ ui32 groupId = CurrentChannelGroup->first;
SendToBSProxy(ctx, groupId,
new TEvBlobStorage::TEvCollectGarbage(TabletInfo->TabletID, RecordGeneration, PerGenerationCounter,
- channelIdx, true,
- CollectOperation->Header.CollectGeneration, CollectOperation->Header.CollectStep,
+ channelIdx, isLast, collectGeneration, collectStep,
keep.Release(), doNotKeep.Release(), TInstant::Max(), true), (ui64)TKeyValueState::ECollectCookie::Soft);
}
STFUNC(StateWait) {
switch (ev->GetTypeRewrite()) {
HFunc(TEvBlobStorage::TEvCollectGarbageResult, Handle);
+ hFunc(TEvKeyValue::TEvContinueGC, Handle);
HFunc(TEvents::TEvWakeup, Handle);
HFunc(TEvents::TEvPoisonPill, Handle);
default:
diff --git a/ydb/core/keyvalue/keyvalue_collector_ut.cpp b/ydb/core/keyvalue/keyvalue_collector_ut.cpp
index e1704e8178..f414deca5c 100644
--- a/ydb/core/keyvalue/keyvalue_collector_ut.cpp
+++ b/ydb/core/keyvalue/keyvalue_collector_ut.cpp
@@ -225,5 +225,44 @@ Y_UNIT_TEST(TestKeyValueCollectorMultiple) {
UNIT_ASSERT(eraseCollect);
}
+
+Y_UNIT_TEST(TestKeyValueCollectorMany) {
+ TContext context;
+ context.Setup();
+
+ TVector<TLogoBlobID> keep;
+ TVector<TLogoBlobID> doNotKeep;
+ doNotKeep.reserve(MaxCollectGarbageFlagsPerMessage + 1);
+ for (ui32 idx = 1; idx <= MaxCollectGarbageFlagsPerMessage + 1; ++idx) {
+ doNotKeep.emplace_back(0x10010000001000Bull, idx, 58949, NKeyValue::BLOB_CHANNEL, 1209816, 10);
+ keep.emplace_back(0x10010000001000Bull, idx, 58949, NKeyValue::BLOB_CHANNEL, 1209816, 10);
+
+ }
+
+ TIntrusivePtr<NKeyValue::TCollectOperation> operation(new NKeyValue::TCollectOperation(100, 100, std::move(keep), std::move(doNotKeep)));
+ context.SetActor(CreateKeyValueCollector(
+ context.GetTabletActorId(), operation, context.GetTabletInfo().Get(), 200, 200, true));
+
+ for (ui32 idx = 0; idx < 7; ++idx) {
+ TAutoPtr<IEventHandle> handle;
+ auto collect = context.GrabEvent<TEvBlobStorage::TEvCollectGarbage>(handle);
+ UNIT_ASSERT(collect);
+
+ context.Send(new TEvBlobStorage::TEvCollectGarbageResult(NKikimrProto::OK, collect->TabletId,
+ collect->RecordGeneration, collect->PerGenerationCounter, collect->Channel));
+
+ if (idx == 1 || idx == 5) {
+ auto complete = context.GrabEvent<TEvKeyValue::TEvPartitialCompleteGC>(handle);
+ complete->CollectedDoNotKeep.clear();
+ auto cont = std::make_unique<TEvKeyValue::TEvContinueGC>(std::move(complete->CollectedDoNotKeep));
+ context.Send(cont.release());
+ }
+ }
+
+ TAutoPtr<IEventHandle> handle;
+ auto eraseCollect = context.GrabEvent<TEvKeyValue::TEvCompleteGC>(handle);
+ UNIT_ASSERT(eraseCollect);
+}
+
} // TKeyValueCollectorTest
} // NKikimr
diff --git a/ydb/core/keyvalue/keyvalue_events.h b/ydb/core/keyvalue/keyvalue_events.h
index d2f7931f8a..2b9a4f11a0 100644
--- a/ydb/core/keyvalue/keyvalue_events.h
+++ b/ydb/core/keyvalue/keyvalue_events.h
@@ -2,6 +2,7 @@
#include "defs.h"
#include "keyvalue_intermediate.h"
#include "keyvalue_request_stat.h"
+#include "keyvalue_helpers.h"
#include <ydb/public/lib/base/msgbus.h>
#include <ydb/core/keyvalue/protos/events.pb.h>
@@ -24,6 +25,8 @@ struct TEvKeyValue {
EvReportWriteLatency,
EvUpdateWeights,
EvCompleteGC,
+ EvPartitialCompleteGC,
+ EvContinueGC,
EvRead = EvRequest + 16,
EvReadRange,
@@ -198,6 +201,20 @@ struct TEvKeyValue {
struct TEvCompleteGC : public TEventLocal<TEvCompleteGC, EvCompleteGC> {
TEvCompleteGC() { }
};
+
+ struct TEvPartitialCompleteGC : public TEventLocal<TEvPartitialCompleteGC, EvPartitialCompleteGC> {
+ TMaybe<NKeyValue::THelpers::TGenerationStep> CollectedGenerationStep;
+ TVector<TLogoBlobID> CollectedDoNotKeep;
+
+ TEvPartitialCompleteGC() { }
+ };
+
+ struct TEvContinueGC : public TEventLocal<TEvContinueGC, EvContinueGC> {
+ TVector<TLogoBlobID> Buffer;
+ TEvContinueGC(TVector<TLogoBlobID> &&buffer)
+ : Buffer(std::move(buffer))
+ { }
+ };
};
} // NKikimr
diff --git a/ydb/core/keyvalue/keyvalue_flat_impl.h b/ydb/core/keyvalue/keyvalue_flat_impl.h
index e0d696da48..788c78fac9 100644
--- a/ydb/core/keyvalue/keyvalue_flat_impl.h
+++ b/ydb/core/keyvalue/keyvalue_flat_impl.h
@@ -246,6 +246,7 @@ protected:
KV_SIMPLE_TX(EraseCollect);
KV_SIMPLE_TX(RegisterInitialGCCompletion);
KV_SIMPLE_TX(CompleteGC);
+ KV_SIMPLE_TX(PartitialCompleteGC);
TKeyValueState State;
TDeque<TAutoPtr<IEventHandle>> InitialEventsQueue;
@@ -318,6 +319,12 @@ protected:
Execute(new TTxCompleteGC(this), ctx);
}
+ void Handle(TEvKeyValue::TEvPartitialCompleteGC::TPtr &ev, const TActorContext &ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletID()
+ << " Handle TEvPartitialCompleteGC " << ev->Get()->ToString());
+ Execute(new TTxPartitialCompleteGC(this), ctx);
+ }
+
void Handle(TEvKeyValue::TEvCollect::TPtr &ev, const TActorContext &ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletID()
<< " Handle TEvCollect " << ev->Get()->ToString());
@@ -332,7 +339,7 @@ protected:
}
CollectorActorId = ctx.RegisterWithSameMailbox(CreateKeyValueCollector(ctx.SelfID, State.GetCollectOperation(),
Info(), Executor()->Generation(), State.GetPerGenerationCounter(), isSpringCleanup));
- State.OnEvCollectDone(perGenerationCounterStepSize, ctx);
+ State.OnEvCollectDone(perGenerationCounterStepSize, CollectorActorId, ctx);
} else {
LOG_ERROR_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletID()
<< " Handle TEvCollect: PerGenerationCounter overflow prevention restart.");
@@ -509,6 +516,7 @@ public:
HFunc(TEvKeyValue::TEvEraseCollect, Handle);
HFunc(TEvKeyValue::TEvCompleteGC, Handle);
+ HFunc(TEvKeyValue::TEvPartitialCompleteGC, Handle);
HFunc(TEvKeyValue::TEvCollect, Handle);
HFunc(TEvKeyValue::TEvStoreCollect, Handle);
HFunc(TEvKeyValue::TEvRequest, Handle);
diff --git a/ydb/core/keyvalue/keyvalue_state.h b/ydb/core/keyvalue/keyvalue_state.h
index 418be9d2a5..2c3b0e431f 100644
--- a/ydb/core/keyvalue/keyvalue_state.h
+++ b/ydb/core/keyvalue/keyvalue_state.h
@@ -265,6 +265,7 @@ protected:
ui64 TabletId;
TActorId KeyValueActorId;
+ TActorId CollectorActorId;
ui32 ExecutorGeneration;
bool IsStatePresent;
bool IsEmptyDbStart;
@@ -289,6 +290,9 @@ protected:
NMetrics::TResourceMetrics* ResourceMetrics;
+ TMaybe<NKeyValue::THelpers::TGenerationStep> PartitialCollectedGenerationStep;
+ TVector<TLogoBlobID> PartitialCollectedDoNotKeep;
+
public:
TKeyValueState();
void Clear();
@@ -330,22 +334,30 @@ public:
// garbage collection methods
void PrepareCollectIfNeeded(const TActorContext &ctx);
+ void RemoveFromTrashDoNotKeep(ISimpleDb &db, const TActorContext &ctx, const TVector<TLogoBlobID> &collectedDoNotKeep);
+ void RemoveFromTrashBySoftBarrier(ISimpleDb &db, const TActorContext &ctx, const NKeyValue::THelpers::TGenerationStep &genStep);
+ void UpdateStoredState(ISimpleDb &db, const TActorContext &ctx, const NKeyValue::THelpers::TGenerationStep &genStep);
void UpdateGC(ISimpleDb &db, const TActorContext &ctx, bool updateTrash, bool updateState);
+ void UpdateAfterPartitialGC(ISimpleDb &db, const TActorContext &ctx);
void StoreCollectExecute(ISimpleDb &db, const TActorContext &ctx);
void StoreCollectComplete(const TActorContext &ctx);
void EraseCollectExecute(ISimpleDb &db, const TActorContext &ctx);
void EraseCollectComplete(const TActorContext &ctx);
void CompleteGCExecute(ISimpleDb &db, const TActorContext &ctx);
void CompleteGCComplete(const TActorContext &ctx);
+ void PartitialCompleteGCExecute(ISimpleDb &db, const TActorContext &ctx);
+ void PartitialCompleteGCComplete(const TActorContext &ctx);
void SendStoreCollect(const TActorContext &ctx, const THelpers::TGenerationStep &genStep,
TVector<TLogoBlobID> &keep, TVector<TLogoBlobID> &doNotKeep);
void StartGC(const TActorContext &ctx, const THelpers::TGenerationStep &genStep,
TVector<TLogoBlobID> &keep, TVector<TLogoBlobID> &doNotKeep);
void StartCollectingIfPossible(const TActorContext &ctx);
ui64 OnEvCollect(const TActorContext &ctx);
- void OnEvCollectDone(ui64 perGenerationCounterStepSize, const TActorContext &ctx);
+ void OnEvCollectDone(ui64 perGenerationCounterStepSize, TActorId collector, const TActorContext &ctx);
void OnEvEraseCollect(const TActorContext &ctx);
void OnEvCompleteGC();
+ void OnEvPartitialCompleteGC(TEvKeyValue::TEvPartitialCompleteGC *ev);
+
void Reply(THolder<TIntermediate> &intermediate, const TActorContext &ctx, const TTabletStorageInfo *info);
void ProcessCmd(TIntermediate::TRead &read,
diff --git a/ydb/core/keyvalue/keyvalue_state_collect.cpp b/ydb/core/keyvalue/keyvalue_state_collect.cpp
index 8458532b29..4651509c4c 100644
--- a/ydb/core/keyvalue/keyvalue_state_collect.cpp
+++ b/ydb/core/keyvalue/keyvalue_state_collect.cpp
@@ -49,6 +49,56 @@ void TKeyValueState::PrepareCollectIfNeeded(const TActorContext &ctx) {
}
+
+void TKeyValueState::RemoveFromTrashDoNotKeep(ISimpleDb &db, const TActorContext &ctx,
+ const TVector<TLogoBlobID> &collectedDoNotKeep)
+{
+ for (const TLogoBlobID &id : collectedDoNotKeep) {
+ THelpers::DbEraseTrash(id, db, ctx);
+ ui32 num = Trash.erase(id);
+ Y_VERIFY(num == 1);
+ CountTrashCollected(id.BlobSize());
+ }
+}
+
+void TKeyValueState::RemoveFromTrashBySoftBarrier(ISimpleDb &db, const TActorContext &ctx,
+ const NKeyValue::THelpers::TGenerationStep &genStep)
+{
+ ui64 storedCollectGeneration = StoredState.GetCollectGeneration();
+ ui64 storedCollectStep = StoredState.GetCollectStep();
+
+ // remove trash entries that were not marked as 'Keep' before, but which are automatically deleted by this barrier
+ // to prevent them from being added to 'DoNotKeep' list after
+ for (auto it = Trash.begin(); it != Trash.end(); ) {
+ THelpers::TGenerationStep trashGenStep = THelpers::GenerationStep(*it);
+ bool afterStoredSoftBarrier = trashGenStep > THelpers::TGenerationStep(storedCollectGeneration, storedCollectStep);
+ bool beforeSoftBarrier = trashGenStep <= genStep;
+ if (afterStoredSoftBarrier && beforeSoftBarrier) {
+ CountTrashCollected(it->BlobSize());
+ THelpers::DbEraseTrash(*it, db, ctx);
+ it = Trash.erase(it);
+ } else {
+ ++it;
+ }
+ }
+}
+
+void TKeyValueState::UpdateStoredState(ISimpleDb &db, const TActorContext &ctx,
+ const NKeyValue::THelpers::TGenerationStep &genStep)
+{
+ StoredState.SetCollectGeneration(std::get<0>(genStep));
+ StoredState.SetCollectStep(std::get<1>(genStep));
+ THelpers::DbUpdateState(StoredState, db, ctx);
+}
+
+void TKeyValueState::UpdateAfterPartitialGC(ISimpleDb &db, const TActorContext &ctx) {
+ RemoveFromTrashDoNotKeep(db, ctx, PartitialCollectedDoNotKeep);
+ if (PartitialCollectedGenerationStep) {
+ RemoveFromTrashBySoftBarrier(db, ctx, *PartitialCollectedGenerationStep);
+ UpdateStoredState(db, ctx, *PartitialCollectedGenerationStep);
+ }
+}
+
void TKeyValueState::UpdateGC(ISimpleDb &db, const TActorContext &ctx, bool updateTrash, bool updateState) {
if (IsDamaged) {
return;
@@ -57,11 +107,9 @@ void TKeyValueState::UpdateGC(ISimpleDb &db, const TActorContext &ctx, bool upda
ui64 collectGeneration = CollectOperation->Header.GetCollectGeneration();
ui64 collectStep = CollectOperation->Header.GetCollectStep();
+ auto collectGenStep = THelpers::TGenerationStep(collectGeneration, collectStep);
if (updateTrash) {
- ui64 storedCollectGeneration = StoredState.GetCollectGeneration();
- ui64 storedCollectStep = StoredState.GetCollectStep();
-
for (TLogoBlobID &id: CollectOperation->DoNotKeep) {
THelpers::DbEraseTrash(id, db, ctx);
ui32 num = Trash.erase(id);
@@ -69,27 +117,13 @@ void TKeyValueState::UpdateGC(ISimpleDb &db, const TActorContext &ctx, bool upda
CountTrashCollected(id.BlobSize());
}
- // remove trash entries that were not marked as 'Keep' before, but which are automatically deleted by this barrier
- // to prevent them from being added to 'DoNotKeep' list after
- for (auto it = Trash.begin(); it != Trash.end(); ) {
- THelpers::TGenerationStep trashGenStep = THelpers::GenerationStep(*it);
- bool afterStoredSoftBarrier = trashGenStep > THelpers::TGenerationStep(storedCollectGeneration, storedCollectStep);
- bool beforeSoftBarrier = trashGenStep <= THelpers::TGenerationStep(collectGeneration, collectStep);
- if (afterStoredSoftBarrier && beforeSoftBarrier) {
- CountTrashCollected(it->BlobSize());
- THelpers::DbEraseTrash(*it, db, ctx);
- it = Trash.erase(it);
- } else {
- ++it;
- }
- }
+ RemoveFromTrashBySoftBarrier(db, ctx, collectGenStep);
}
if (updateState) {
- StoredState.SetCollectGeneration(collectGeneration);
- StoredState.SetCollectStep(collectStep);
- THelpers::DbUpdateState(StoredState, db, ctx);
+ UpdateStoredState(db, ctx, collectGenStep);
}
+
}
void TKeyValueState::StoreCollectExecute(ISimpleDb &db, const TActorContext &ctx) {
@@ -150,6 +184,16 @@ void TKeyValueState::CompleteGCComplete(const TActorContext &ctx) {
PrepareCollectIfNeeded(ctx);
}
+
+void TKeyValueState::PartitialCompleteGCExecute(ISimpleDb &db, const TActorContext &ctx) {
+ UpdateAfterPartitialGC(db, ctx);
+}
+
+void TKeyValueState::PartitialCompleteGCComplete(const TActorContext &ctx) {
+ PartitialCollectedDoNotKeep.clear();
+ ctx.Send(CollectorActorId, new TEvKeyValue::TEvContinueGC(std::move(PartitialCollectedDoNotKeep)));
+}
+
// Prepare the completely new full collect operation with the same gen/step, but with correct keep & doNotKeep lists
void TKeyValueState::SendStoreCollect(const TActorContext &ctx, const THelpers::TGenerationStep &genStep,
TVector<TLogoBlobID> &keep, TVector<TLogoBlobID> &doNotKeep) {
@@ -242,10 +286,11 @@ ui64 TKeyValueState::OnEvCollect(const TActorContext &ctx) {
return perGenerationCounterStepSize;
}
-void TKeyValueState::OnEvCollectDone(ui64 perGenerationCounterStepSize, const TActorContext &ctx) {
+void TKeyValueState::OnEvCollectDone(ui64 perGenerationCounterStepSize, TActorId collector, const TActorContext &ctx) {
Y_UNUSED(ctx);
Y_VERIFY(perGenerationCounterStepSize >= 1);
PerGenerationCounter += perGenerationCounterStepSize;
+ CollectorActorId = collector;
}
void TKeyValueState::OnEvEraseCollect(const TActorContext &ctx) {
@@ -257,6 +302,11 @@ void TKeyValueState::OnEvCompleteGC() {
CountLatencyBsCollect();
}
+void TKeyValueState::OnEvPartitialCompleteGC(TEvKeyValue::TEvPartitialCompleteGC *ev) {
+ PartitialCollectedGenerationStep = std::move(ev->CollectedGenerationStep);
+ PartitialCollectedDoNotKeep = std::move(ev->CollectedDoNotKeep);
+}
+
} // NKeyValue
} // NKikimr