aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-07-20 15:59:55 +0300
committeralexvru <alexvru@ydb.tech>2023-07-20 15:59:55 +0300
commitcb49856f9541f9089e06bbd91524ba418919037d (patch)
treee38f111ea3919a34de7b5c779381eb9e5ce1451a
parent629dca0f6d0e10c2a76afdebaa75bd9042b8eaaa (diff)
downloadydb-cb49856f9541f9089e06bbd91524ba418919037d.tar.gz
Fix blob leakage KIKIMR-18784
-rw-r--r--ydb/core/keyvalue/keyvalue_collect_operation.h4
-rw-r--r--ydb/core/keyvalue/keyvalue_collector.cpp32
-rw-r--r--ydb/core/keyvalue/keyvalue_collector_ut.cpp47
-rw-r--r--ydb/core/keyvalue/keyvalue_events.h28
-rw-r--r--ydb/core/keyvalue/keyvalue_flat_impl.h26
-rw-r--r--ydb/core/keyvalue/keyvalue_state.cpp107
-rw-r--r--ydb/core/keyvalue/keyvalue_state.h26
-rw-r--r--ydb/core/keyvalue/keyvalue_state_collect.cpp258
-rw-r--r--ydb/core/keyvalue/keyvalue_ut.cpp3
9 files changed, 126 insertions, 405 deletions
diff --git a/ydb/core/keyvalue/keyvalue_collect_operation.h b/ydb/core/keyvalue/keyvalue_collect_operation.h
index 49c67c34cf7..1fd19da441a 100644
--- a/ydb/core/keyvalue/keyvalue_collect_operation.h
+++ b/ydb/core/keyvalue/keyvalue_collect_operation.h
@@ -50,12 +50,14 @@ struct TCollectOperation : public TThrRefBase {
TCollectOperationHeader Header;
TVector<TLogoBlobID> Keep;
TVector<TLogoBlobID> DoNotKeep;
+ TVector<TLogoBlobID> TrashGoingToCollect;
TCollectOperation(ui64 collectGeneration, ui64 collectStep,
- TVector<TLogoBlobID> &&keep, TVector<TLogoBlobID> &&doNotKeep)
+ TVector<TLogoBlobID> &&keep, TVector<TLogoBlobID> &&doNotKeep, TVector<TLogoBlobID>&& trashGoingToCollect)
: Header(collectGeneration, collectStep, keep, doNotKeep)
, Keep(std::move(keep))
, DoNotKeep(std::move(doNotKeep))
+ , TrashGoingToCollect(std::move(trashGoingToCollect))
{}
};
diff --git a/ydb/core/keyvalue/keyvalue_collector.cpp b/ydb/core/keyvalue/keyvalue_collector.cpp
index b4c72dbd148..eeb2585ff84 100644
--- a/ydb/core/keyvalue/keyvalue_collector.cpp
+++ b/ydb/core/keyvalue/keyvalue_collector.cpp
@@ -34,9 +34,6 @@ class TKeyValueCollector : public TActorBootstrapped<TKeyValueCollector> {
TVector<TMap<ui32, TGroupCollector>> CollectorForGroupForChannel;
ui32 EndChannel = 0;
- // For DoNotKeep
- TVector<TLogoBlobID> CollectedDoNotKeep;
-
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::KEYVALUE_ACTOR;
@@ -106,7 +103,6 @@ public:
maxDoNotKeepSizeInGroupChannel = Max(maxDoNotKeepSizeInGroupChannel, collector.DoNotKeep.size());
}
}
- CollectedDoNotKeep.reserve(Min(maxDoNotKeepSizeInGroupChannel, MaxCollectGarbageFlagsPerMessage));
SendTheRequest();
Become(&TThis::StateWait);
@@ -158,10 +154,6 @@ public:
(TabletId, TabletInfo->TabletID), (Channel, GetCurretChannelId()));
CollectorForGroupForChannel.pop_back();
}
- if (CollectedDoNotKeep.size()) {
- SendPartialCompleteGC();
- return;
- }
if (CollectorForGroupForChannel.empty()) {
SendCompleteGCAndDie();
return;
@@ -196,14 +188,6 @@ public:
}
}
- void SendPartialCompleteGC() {
- STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC14, "Collector send PartialCompleteGC",
- (TabletId, TabletInfo->TabletID),
- (FirstDoNotKeep, (CollectedDoNotKeep.size() ? CollectedDoNotKeep[0].ToString() : "none")),
- (CollectedDoNotKeepSize, CollectedDoNotKeep.size()));
- Send(KeyValueActorId, new TEvKeyValue::TEvPartialCompleteGC(std::move(CollectedDoNotKeep)));
- }
-
void SendCompleteGCAndDie() {
STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC19, "Collector send CompleteGC",
(TabletId, TabletInfo->TabletID));
@@ -251,9 +235,6 @@ public:
collector.NextCountOfSentFlags += doNotKeepSize;
Copy(begin, end, doNotKeep->begin());
- if (!IsRepeatedRequest) {
- Copy(doNotKeep->cbegin(), doNotKeep->cend(), std::back_inserter(CollectedDoNotKeep));
- }
}
ui32 keepStartIdx = 0;
@@ -298,18 +279,6 @@ public:
IsRepeatedRequest = false;
}
- void HandleContinueGC(TEvKeyValue::TEvContinueGC::TPtr &ev) {
- STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC13, "Collector continue GC",
- (TabletId, TabletInfo->TabletID));
- if (CollectorForGroupForChannel.empty()) {
- SendCompleteGCAndDie();
- return;
- }
- CollectedDoNotKeep = std::move(ev->Get()->Buffer);
- CollectedDoNotKeep.clear();
- SendTheRequest();
- }
-
void HandleWakeUp() {
auto collectorsOfCurrentChannel = CollectorForGroupForChannel.rbegin();
if (collectorsOfCurrentChannel == CollectorForGroupForChannel.rend()) {
@@ -352,7 +321,6 @@ public:
STATEFN(StateWait) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvBlobStorage::TEvCollectGarbageResult, Handle);
- hFunc(TEvKeyValue::TEvContinueGC, HandleContinueGC);
cFunc(TEvents::TEvWakeup::EventType, HandleWakeUp);
cFunc(TEvents::TEvPoisonPill::EventType, HandlePoisonPill);
default:
diff --git a/ydb/core/keyvalue/keyvalue_collector_ut.cpp b/ydb/core/keyvalue/keyvalue_collector_ut.cpp
index 94d8314b8e9..a7926ca3b8c 100644
--- a/ydb/core/keyvalue/keyvalue_collector_ut.cpp
+++ b/ydb/core/keyvalue/keyvalue_collector_ut.cpp
@@ -117,7 +117,7 @@ Y_UNIT_TEST(TestKeyValueCollectorEmpty) {
TVector<TLogoBlobID> keep;
TVector<TLogoBlobID> doNotKeep;
- TIntrusivePtr<NKeyValue::TCollectOperation> operation(new NKeyValue::TCollectOperation(100, 100, std::move(keep), std::move(doNotKeep)));
+ TIntrusivePtr<NKeyValue::TCollectOperation> operation(new NKeyValue::TCollectOperation(100, 100, std::move(keep), std::move(doNotKeep), {}));
context.SetActor(CreateKeyValueCollector(
context.GetTabletActorId(), operation, context.GetTabletInfo().Get(), 200, 200, true));
@@ -144,7 +144,7 @@ Y_UNIT_TEST(TestKeyValueCollectorSingle) {
TVector<TLogoBlobID> keep;
keep.emplace_back(0x10010000001000Bull, 5, 58949, NKeyValue::BLOB_CHANNEL, 1209816, 10);
TVector<TLogoBlobID> doNotKeep;
- TIntrusivePtr<NKeyValue::TCollectOperation> operation(new NKeyValue::TCollectOperation(100, 100, std::move(keep), std::move(doNotKeep)));
+ TIntrusivePtr<NKeyValue::TCollectOperation> operation(new NKeyValue::TCollectOperation(100, 100, std::move(keep), std::move(doNotKeep), {}));
context.SetActor(CreateKeyValueCollector(
context.GetTabletActorId(), operation, context.GetTabletInfo().Get(), 200, 200, true));
@@ -180,7 +180,7 @@ Y_UNIT_TEST(TestKeyValueCollectorSingleWithOneError) {
TVector<TLogoBlobID> keep;
keep.emplace_back(0x10010000001000Bull, 5, 58949, NKeyValue::BLOB_CHANNEL, 1209816, 10);
TVector<TLogoBlobID> doNotKeep;
- TIntrusivePtr<NKeyValue::TCollectOperation> operation(new NKeyValue::TCollectOperation(100, 100, std::move(keep), std::move(doNotKeep)));
+ TIntrusivePtr<NKeyValue::TCollectOperation> operation(new NKeyValue::TCollectOperation(100, 100, std::move(keep), std::move(doNotKeep), {}));
context.SetActor(CreateKeyValueCollector(
context.GetTabletActorId(), operation, context.GetTabletInfo().Get(), 200, 200, true));
@@ -242,7 +242,7 @@ Y_UNIT_TEST(TestKeyValueCollectorMultiple) {
ids.insert(doNotKeep[i]);
}
- TIntrusivePtr<NKeyValue::TCollectOperation> operation(new NKeyValue::TCollectOperation(100, 100, std::move(keep), std::move(doNotKeep)));
+ TIntrusivePtr<NKeyValue::TCollectOperation> operation(new NKeyValue::TCollectOperation(100, 100, std::move(keep), std::move(doNotKeep), {}));
context.SetActor(CreateKeyValueCollector(
context.GetTabletActorId(), operation, context.GetTabletInfo().Get(), 200, 200, true));
@@ -275,16 +275,6 @@ Y_UNIT_TEST(TestKeyValueCollectorMultiple) {
context.Send(new TEvBlobStorage::TEvCollectGarbageResult(NKikimrProto::OK, collect->TabletId,
collect->RecordGeneration, collect->PerGenerationCounter, collect->Channel));
-
- if (collect && collect->DoNotKeep && collect->DoNotKeep->size()) {
- auto complete = context.GrabEvent<TEvKeyValue::TEvPartialCompleteGC>(handle);
- THashSet<TLogoBlobID> uniqueBlobs;
- uniqueBlobs.insert(complete->CollectedDoNotKeep.begin(), complete->CollectedDoNotKeep.end());
- UNIT_ASSERT_VALUES_EQUAL(uniqueBlobs.size(), complete->CollectedDoNotKeep.size());
- complete->CollectedDoNotKeep.clear();
- auto cont = std::make_unique<TEvKeyValue::TEvContinueGC>(std::move(complete->CollectedDoNotKeep));
- context.Send(cont.release());
- }
}
UNIT_ASSERT(erased == 8);
@@ -307,11 +297,11 @@ Y_UNIT_TEST(TestKeyValueCollectorMany) {
keep.emplace_back(0x10010000001000Bull, idx, 58949, NKeyValue::BLOB_CHANNEL, 1209816, 10);
}
- TIntrusivePtr<NKeyValue::TCollectOperation> operation(new NKeyValue::TCollectOperation(100, 100, std::move(keep), std::move(doNotKeep)));
+ TIntrusivePtr<NKeyValue::TCollectOperation> operation(new NKeyValue::TCollectOperation(100, 100, std::move(keep), std::move(doNotKeep), {}));
context.SetActor(CreateKeyValueCollector(
context.GetTabletActorId(), operation, context.GetTabletInfo().Get(), 200, 200, true));
- auto handleGC = [&](bool withContinueGC, ui32 keepSize, ui32 doNotKeepSize) {
+ auto handleGC = [&](ui32 keepSize, ui32 doNotKeepSize) {
TAutoPtr<IEventHandle> handle;
auto collect = context.GrabEvent<TEvBlobStorage::TEvCollectGarbage>(handle);
UNIT_ASSERT(collect);
@@ -319,24 +309,17 @@ Y_UNIT_TEST(TestKeyValueCollectorMany) {
UNIT_ASSERT_VALUES_EQUAL((collect->DoNotKeep ? collect->DoNotKeep->size() : 0), doNotKeepSize);
context.Send(new TEvBlobStorage::TEvCollectGarbageResult(NKikimrProto::OK, collect->TabletId,
collect->RecordGeneration, collect->PerGenerationCounter, collect->Channel));
-
- if (withContinueGC) {
- auto complete = context.GrabEvent<TEvKeyValue::TEvPartialCompleteGC>(handle);
- complete->CollectedDoNotKeep.clear();
- auto cont = std::make_unique<TEvKeyValue::TEvContinueGC>(std::move(complete->CollectedDoNotKeep));
- context.Send(cont.release());
- }
};
- handleGC(true, 20, 20); // group 0
- handleGC(true, 10, 10); // group 1
- handleGC(true, 0, 10'000); // group 2 DoNotKeep 30..10029
- handleGC(true, 30, 9'970); // group 2 DoNotKeep 10030..19999 Keep 30.59
- handleGC(false, 10'000, 0); // group 2 Keep 60..10059
- handleGC(false, 9'940, 0); // group 2 Keep 10060..20000
- handleGC(false, 0, 0); // group 3
- handleGC(false, 0, 0); // group 4
- handleGC(false, 0, 0); // group 5
+ handleGC(20, 20); // group 0
+ handleGC(10, 10); // group 1
+ handleGC(0, 10'000); // group 2 DoNotKeep 30..10029
+ handleGC(30, 9'970); // group 2 DoNotKeep 10030..19999 Keep 30.59
+ handleGC(10'000, 0); // group 2 Keep 60..10059
+ handleGC(9'940, 0); // group 2 Keep 10060..20000
+ handleGC(0, 0); // group 3
+ handleGC(0, 0); // group 4
+ handleGC(0, 0); // group 5
TAutoPtr<IEventHandle> handle;
auto eraseCollect = context.GrabEvent<TEvKeyValue::TEvCompleteGC>(handle);
diff --git a/ydb/core/keyvalue/keyvalue_events.h b/ydb/core/keyvalue/keyvalue_events.h
index 95ccf72ef50..2c64e120e30 100644
--- a/ydb/core/keyvalue/keyvalue_events.h
+++ b/ydb/core/keyvalue/keyvalue_events.h
@@ -20,13 +20,10 @@ struct TEvKeyValue {
EvNotify,
EvStoreCollect,
EvCollect,
- EvEraseCollect,
EvPeriodicRefresh,
EvReportWriteLatency,
EvUpdateWeights,
EvCompleteGC,
- EvPartialCompleteGC,
- EvContinueGC,
EvRead = EvRequest + 16,
EvReadRange,
@@ -185,18 +182,10 @@ struct TEvKeyValue {
}
};
- struct TEvStoreCollect : public TEventLocal<TEvStoreCollect, EvStoreCollect> {
- TEvStoreCollect() { }
- };
-
struct TEvCollect : public TEventLocal<TEvCollect, EvCollect> {
TEvCollect() { }
};
- struct TEvEraseCollect : public TEventLocal<TEvEraseCollect, EvEraseCollect> {
- TEvEraseCollect() { }
- };
-
struct TEvPeriodicRefresh : public TEventLocal<TEvPeriodicRefresh, EvPeriodicRefresh> {
TEvPeriodicRefresh() { }
};
@@ -204,23 +193,6 @@ struct TEvKeyValue {
struct TEvCompleteGC : public TEventLocal<TEvCompleteGC, EvCompleteGC> {
TEvCompleteGC() { }
};
-
- struct TEvPartialCompleteGC : public TEventLocal<TEvPartialCompleteGC, EvPartialCompleteGC> {
- TVector<TLogoBlobID> CollectedDoNotKeep;
-
- TEvPartialCompleteGC() { }
-
- TEvPartialCompleteGC(TVector<TLogoBlobID> &&doNotKeeps)
- : CollectedDoNotKeep(std::move(doNotKeeps))
- { }
- };
-
- struct TEvContinueGC : public TEventLocal<TEvContinueGC, EvContinueGC> {
- TVector<TLogoBlobID> Buffer;
- TEvContinueGC(TVector<TLogoBlobID> &&buffer)
- : Buffer(std::move(buffer))
- { }
- };
};
} // NKikimr
diff --git a/ydb/core/keyvalue/keyvalue_flat_impl.h b/ydb/core/keyvalue/keyvalue_flat_impl.h
index d5f6211438e..5e97a383c1d 100644
--- a/ydb/core/keyvalue/keyvalue_flat_impl.h
+++ b/ydb/core/keyvalue/keyvalue_flat_impl.h
@@ -265,11 +265,8 @@ protected:
}
#endif
- KV_SIMPLE_TX(StoreCollect);
- KV_SIMPLE_TX(EraseCollect);
KV_SIMPLE_TX(RegisterInitialGCCompletion);
KV_SIMPLE_TX(CompleteGC);
- KV_SIMPLE_TX(PartialCompleteGC);
TKeyValueState State;
TDeque<TAutoPtr<IEventHandle>> InitialEventsQueue;
@@ -332,13 +329,6 @@ protected:
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Online state
- void Handle(TEvKeyValue::TEvEraseCollect::TPtr &ev, const TActorContext &ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletID()
- << " Handle TEvEraseCollect " << ev->Get()->ToString());
- State.OnEvEraseCollect(ctx);
- Execute(new TTxEraseCollect(this), ctx);
- }
-
void Handle(TEvKeyValue::TEvCompleteGC::TPtr &ev, const TActorContext &ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletID()
<< " Handle TEvCompleteGC " << ev->Get()->ToString());
@@ -346,13 +336,6 @@ protected:
Execute(new TTxCompleteGC(this), ctx);
}
- void Handle(TEvKeyValue::TEvPartialCompleteGC::TPtr &ev, const TActorContext &ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletID()
- << " Handle TEvPartitialCompleteGC " << ev->Get()->ToString());
- State.OnEvPartialCompleteGC(ev->Get());
- Execute(new TTxPartialCompleteGC(this), ctx);
- }
-
void Handle(TEvKeyValue::TEvCollect::TPtr &ev, const TActorContext &ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletID()
<< " Handle TEvCollect " << ev->Get()->ToString());
@@ -376,12 +359,6 @@ protected:
}
}
- void Handle(TEvKeyValue::TEvStoreCollect::TPtr &ev, const TActorContext &ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletID()
- << " Handle TEvStoreCollect " << ev->Get()->ToString());
- Execute(new TTxStoreCollect(this), ctx);
- }
-
void CheckYellowChannels(TRequestStat& stat) {
IExecutor* executor = Executor();
if ((stat.YellowMoveChannels || stat.YellowStopChannels) && executor) {
@@ -545,11 +522,8 @@ public:
hFunc(TEvKeyValue::TEvGetStorageChannelStatus, Handle);
hFunc(TEvKeyValue::TEvAcquireLock, Handle);
- HFunc(TEvKeyValue::TEvEraseCollect, Handle);
HFunc(TEvKeyValue::TEvCompleteGC, Handle);
- HFunc(TEvKeyValue::TEvPartialCompleteGC, Handle);
HFunc(TEvKeyValue::TEvCollect, Handle);
- HFunc(TEvKeyValue::TEvStoreCollect, Handle);
HFunc(TEvKeyValue::TEvRequest, Handle);
HFunc(TEvKeyValue::TEvIntermediate, Handle);
HFunc(TEvKeyValue::TEvNotify, Handle);
diff --git a/ydb/core/keyvalue/keyvalue_state.cpp b/ydb/core/keyvalue/keyvalue_state.cpp
index 3d162cb7b73..9ea2fb90864 100644
--- a/ydb/core/keyvalue/keyvalue_state.cpp
+++ b/ydb/core/keyvalue/keyvalue_state.cpp
@@ -438,28 +438,8 @@ void TKeyValueState::Load(const TString &key, const TString& value) {
break;
}
case EIT_COLLECT: {
+ // just ignore the record
Y_VERIFY(arbitraryPart.size() == 0);
- Y_VERIFY(!CollectOperation.Get());
- Y_VERIFY(value.size() >= sizeof(TCollectOperationHeader));
- const TCollectOperationHeader *header = (const TCollectOperationHeader*)value.data();
- Y_VERIFY(header->DataHeader.ItemType == EIT_COLLECT);
- ui64 totalSize = sizeof(TCollectOperationHeader)
- + sizeof(TLogoBlobID) * (header->KeepCount + header->DoNotKeepCount);
- Y_VERIFY(value.size() == totalSize);
- TVector<TLogoBlobID> keep;
- TVector<TLogoBlobID> doNotKeep;
- keep.resize(header->KeepCount);
- doNotKeep.resize(header->DoNotKeepCount);
- const char* data = value.data() + sizeof(TCollectOperationHeader);
- if (keep.size()) {
- memcpy((char *) &keep[0], data, sizeof(TLogoBlobID) * keep.size());
- }
- data += sizeof(TLogoBlobID) * keep.size();
- if (doNotKeep.size()) {
- memcpy((char *) &doNotKeep[0], data, sizeof(TLogoBlobID) * doNotKeep.size());
- }
- CollectOperation.Reset(new TCollectOperation(
- header->CollectGeneration, header->CollectStep, std::move(keep), std::move(doNotKeep)));
break;
}
case EIT_STATE: {
@@ -601,12 +581,6 @@ void TKeyValueState::InitExecute(ui64 tabletId, TActorId keyValueActorId, ui32 e
}
}
- if (CollectOperation) {
- for (const TLogoBlobID &id : CollectOperation->Keep) {
- addBlobToKeep(id);
- }
- }
-
for (const auto &channelInfo : info->Channels) {
if (channelInfo.Channel < BLOB_CHANNEL) {
continue;
@@ -630,10 +604,11 @@ void TKeyValueState::InitExecute(ui64 tabletId, TActorId keyValueActorId, ui32 e
std::tie(group, channel) = keepInfo.first;
THolder<TVector<TLogoBlobID>>& keep = keepInfo.second;
+ const ui32 step = TEvBlobStorage::TEvCollectGarbage::PerGenerationCounterStepSize(keep.Get(), nullptr);
auto ev = MakeHolder<TEvBlobStorage::TEvCollectGarbage>(info->TabletID, executorGeneration,
PerGenerationCounter, channel, true /*collect*/, barrierGeneration, barrierStep, keep.Release(),
nullptr /*doNotKeep*/, TInstant::Max(), false /*isMultiCollectAllowed*/, false /*hard*/);
- ++PerGenerationCounter;
+ PerGenerationCounter += step;
const TActorId nodeWarden = MakeBlobStorageNodeWardenID(ctx.SelfID.NodeId());
const TActorId proxy = MakeBlobStorageProxyID(group);
@@ -686,40 +661,10 @@ void TKeyValueState::InitExecute(ui64 tabletId, TActorId keyValueActorId, ui32 e
ChannelRangeSets[id.Channel()].Remove(id.Generation());
}
}
- if (CollectOperation) {
- auto &keep = CollectOperation->Keep;
- for (auto it = keep.begin(); it != keep.end(); ++it) {
- const TLogoBlobID &id = *it;
- if (id.Channel() < ChannelRangeSets.size()) {
- ChannelRangeSets[id.Channel()].Remove(id.Generation());
- }
- }
- auto &doNotKeep = CollectOperation->DoNotKeep;
- for (auto it = doNotKeep.begin(); it != doNotKeep.end(); ++it) {
- const TLogoBlobID &id = *it;
- if (id.Channel() < ChannelRangeSets.size()) {
- ChannelRangeSets[id.Channel()].Remove(id.Generation());
- }
- }
- // Patch collect operation generation and step
- Y_VERIFY(CollectOperation->Header.GetCollectGeneration() < ExecutorGeneration);
- CollectOperation->Header.SetCollectGeneration(ExecutorGeneration);
- CollectOperation->Header.SetCollectStep(0);
- }
-
- if (CollectOperation) {
- for (const TLogoBlobID &id : CollectOperation->DoNotKeep) {
- Trash.insert(id);
- THelpers::DbUpdateTrash(id, db, ctx);
- }
- THelpers::DbEraseCollect(db, ctx);
- CollectOperation = nullptr;
- }
-
+ THelpers::DbEraseCollect(db, ctx);
THelpers::DbUpdateState(StoredState, db, ctx);
-
// corner case, if no CollectGarbage events were sent
if (InitialCollectsSent == 0) {
SendCutHistory(ctx);
@@ -798,7 +743,7 @@ void TKeyValueState::Step() {
}
}
-TLogoBlobID TKeyValueState::AllocateLogoBlobId(ui32 size, ui32 storageChannelIdx) {
+TLogoBlobID TKeyValueState::AllocateLogoBlobId(ui32 size, ui32 storageChannelIdx, ui64 requestUid) {
ui32 generation = ExecutorGeneration;
TLogoBlobID id(TabletId, generation, NextLogoBlobStep, storageChannelIdx, size, NextLogoBlobCookie);
if (NextLogoBlobCookie < TLogoBlobID::MaxCookie) {
@@ -808,6 +753,10 @@ TLogoBlobID TKeyValueState::AllocateLogoBlobId(ui32 size, ui32 storageChannelIdx
}
Y_VERIFY(!CollectOperation || THelpers::GenerationStep(id) >
THelpers::TGenerationStep(CollectOperation->Header.GetCollectGeneration(), CollectOperation->Header.GetCollectStep()));
+ if (requestUid) {
+ ++InFlightForStep[id.Step()];
+ ++RequestUidStepToCount[std::make_tuple(requestUid, id.Step())];
+ }
return id;
}
@@ -1363,7 +1312,7 @@ void TKeyValueState::CmdTrimLeakedBlobs(THolder<TIntermediate>& intermediate, IS
Y_VERIFY(it->second != 0);
} else if (!Trash.count(id)) { // we found a candidate for trash
if (numItems < intermediate->TrimLeakedBlobs->MaxItemsToTrim) {
- LOG_WARN_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId << " trimming " << id.ToString());
+ LOG_WARN_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId << " trimming " << id);
Trash.insert(id);
CountTrashRecord(id.BlobSize());
THelpers::DbUpdateTrash(id, db, ctx);
@@ -1753,15 +1702,21 @@ void TKeyValueState::OnRequestComplete(ui64 requestUid, ui64 generation, ui64 st
CountRequestTakeOffOrEnqueue(requestType);
}
- if (StoredState.GetChannelGeneration() == generation) {
- auto it = InFlightForStep.find(step);
- Y_VERIFY(it != InFlightForStep.end(), "Unexpected step# %" PRIu64, (ui64)step);
- it->second--;
- if (it->second == 0) {
- InFlightForStep.erase(it);
+ CancelInFlight(requestUid);
+ PrepareCollectIfNeeded(ctx);
+}
- // Initiate Garbage collection process if needed
- StartCollectingIfPossible(ctx);
+void TKeyValueState::CancelInFlight(ui64 requestUid) {
+ for (auto it = RequestUidStepToCount.lower_bound(std::make_tuple(requestUid, 0)); it != RequestUidStepToCount.end() &&
+ std::get<0>(it->first) == requestUid; it = RequestUidStepToCount.erase(it)) {
+ const auto& [requestUid, step] = it->first;
+ const ui32 drop = it->second;
+ auto stepIt = InFlightForStep.find(step);
+ Y_VERIFY(stepIt != InFlightForStep.end() && drop <= stepIt->second);
+ if (drop < stepIt->second) {
+ stepIt->second -= drop;
+ } else {
+ InFlightForStep.erase(stepIt);
}
}
}
@@ -2210,7 +2165,7 @@ void TKeyValueState::SplitIntoBlobs(TIntermediate::TWrite &cmd, bool isInline, u
ui64 sizeRemain = cmd.Data.size();
while (sizeRemain) {
ui32 blobSize = Min<ui64>(sizeRemain, 8 << 20);
- cmd.LogoBlobIds.push_back(AllocateLogoBlobId(blobSize, storageChannelIdx));
+ cmd.LogoBlobIds.push_back(AllocateLogoBlobId(blobSize, storageChannelIdx, intermediate->RequestUid));
sizeRemain -= blobSize;
}
for (const TLogoBlobID& logoBlobId : cmd.LogoBlobIds) {
@@ -2337,7 +2292,7 @@ TKeyValueState::TPrepareResult TKeyValueState::InitGetStatusCommand(TIntermediat
}
cmd.StorageChannel = storageChannel;
- cmd.LogoBlobId = AllocateLogoBlobId(1, storageChannelIdx);
+ cmd.LogoBlobId = AllocateLogoBlobId(1, storageChannelIdx, 0);
cmd.Status = NKikimrProto::UNKNOWN;
}
return {false, msg};
@@ -2925,7 +2880,6 @@ void TKeyValueState::OnEvReadRequest(TEvKeyValue::TEvRead::TPtr &ev, const TActo
CountRequestIncoming(requestType);
if (PrepareReadRequest(ctx, ev, intermediate, &requestType)) {
- ++InFlightForStep[StoredState.GetChannelStep()];
if (requestType == TRequestType::ReadOnlyInline) {
LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId
<< " Create storage inline read request, Marker# KV49");
@@ -2962,7 +2916,6 @@ void TKeyValueState::OnEvReadRangeRequest(TEvKeyValue::TEvReadRange::TPtr &ev, c
CountRequestIncoming(requestType);
if (PrepareReadRangeRequest(ctx, ev, intermediate, &requestType)) {
- ++InFlightForStep[StoredState.GetChannelStep()];
if (requestType == TRequestType::ReadOnlyInline) {
LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId
<< " Create storage inline read range request, Marker# KV58");
@@ -3000,7 +2953,6 @@ void TKeyValueState::OnEvExecuteTransaction(TEvKeyValue::TEvExecuteTransaction::
CountRequestIncoming(requestType);
if (PrepareExecuteTransactionRequest(ctx, ev, intermediate, info)) {
- ++InFlightForStep[StoredState.GetChannelStep()];
LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId
<< " Create storage request for WO, Marker# KV67");
RegisterRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration);
@@ -3009,6 +2961,7 @@ void TKeyValueState::OnEvExecuteTransaction(TEvKeyValue::TEvExecuteTransaction::
} else {
intermediate->UpdateStat();
CountRequestOtherError(requestType);
+ CancelInFlight(intermediate->RequestUid);
}
}
@@ -3024,7 +2977,6 @@ void TKeyValueState::OnEvGetStorageChannelStatus(TEvKeyValue::TEvGetStorageChann
CountRequestIncoming(requestType);
if (PrepareGetStorageChannelStatusRequest(ctx, ev, intermediate, info)) {
- ++InFlightForStep[StoredState.GetChannelStep()];
LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId
<< " Create GetStorageChannelStatus request, Marker# KV75");
RegisterRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration);
@@ -3048,7 +3000,6 @@ void TKeyValueState::OnEvAcquireLock(TEvKeyValue::TEvAcquireLock::TPtr &ev, cons
CountRequestIncoming(requestType);
if (PrepareAcquireLockRequest(ctx, ev, intermediate)) {
- ++InFlightForStep[StoredState.GetChannelStep()];
LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId
<< " Create AcquireLock request, Marker# KV80");
RegisterRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration);
@@ -3094,7 +3045,6 @@ void TKeyValueState::OnEvRequest(TEvKeyValue::TEvRequest::TPtr &ev, const TActor
if (PrepareIntermediate(ev, intermediate, requestType, ctx, info)) {
// Spawn KeyValueStorageRequest actor on the same thread
- ++InFlightForStep[StoredState.GetChannelStep()];
if (requestType == TRequestType::WriteOnly) {
LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId
<< " Create storage request for WO, Marker# KV42");
@@ -3118,11 +3068,10 @@ void TKeyValueState::OnEvRequest(TEvKeyValue::TEvRequest::TPtr &ev, const TActor
}
CountRequestTakeOffOrEnqueue(requestType);
-
} else {
intermediate->UpdateStat();
-
CountRequestOtherError(requestType);
+ CancelInFlight(intermediate->RequestUid);
}
}
diff --git a/ydb/core/keyvalue/keyvalue_state.h b/ydb/core/keyvalue/keyvalue_state.h
index fc559b3d363..e7749f33904 100644
--- a/ydb/core/keyvalue/keyvalue_state.h
+++ b/ydb/core/keyvalue/keyvalue_state.h
@@ -13,6 +13,7 @@
#include "keyvalue_simple_db.h"
#include "channel_balancer.h"
#include <util/generic/set.h>
+#include <util/generic/hash_multi_map.h>
#include <ydb/core/base/appdata.h>
#include <ydb/public/lib/base/msgbus.h>
#include <ydb/core/tablet/tablet_counters.h>
@@ -254,8 +255,9 @@ protected:
THashMap<TLogoBlobID, ui32> RefCounts;
TSet<TLogoBlobID> Trash;
TMap<ui64, ui64> InFlightForStep;
+ TMap<std::tuple<ui64, ui32>, ui32> RequestUidStepToCount;
THashMap<ui64, TInstant> RequestInputTime;
- ui64 NextRequestUid = 0;
+ ui64 NextRequestUid = 1;
TIntrusivePtr<TCollectOperation> CollectOperation;
bool IsCollectEventSent;
bool IsSpringCleanupDone;
@@ -338,29 +340,16 @@ public:
// garbage collection methods
void PrepareCollectIfNeeded(const TActorContext &ctx);
- void RemoveFromTrashDoNotKeep(ISimpleDb &db, const TActorContext &ctx, const TVector<TLogoBlobID> &collectedDoNotKeep);
- void RemoveFromTrashBySoftBarrier(ISimpleDb &db, const TActorContext &ctx, const NKeyValue::THelpers::TGenerationStep &genStep);
+ bool RemoveCollectedTrash(ISimpleDb &db, const TActorContext &ctx);
void UpdateStoredState(ISimpleDb &db, const TActorContext &ctx, const NKeyValue::THelpers::TGenerationStep &genStep);
- void UpdateGC(ISimpleDb &db, const TActorContext &ctx, bool updateTrash, bool updateState);
- void UpdateAfterPartialGC(ISimpleDb &db, const TActorContext &ctx);
- void StoreCollectExecute(ISimpleDb &db, const TActorContext &ctx);
- void StoreCollectComplete(const TActorContext &ctx);
- void EraseCollectExecute(ISimpleDb &db, const TActorContext &ctx);
- void EraseCollectComplete(const TActorContext &ctx);
void CompleteGCExecute(ISimpleDb &db, const TActorContext &ctx);
void CompleteGCComplete(const TActorContext &ctx);
- void PartialCompleteGCExecute(ISimpleDb &db, const TActorContext &ctx);
- void PartialCompleteGCComplete(const TActorContext &ctx);
- void SendStoreCollect(const TActorContext &ctx, const THelpers::TGenerationStep &genStep,
- TVector<TLogoBlobID> &keep, TVector<TLogoBlobID> &doNotKeep);
- void StartGC(const TActorContext &ctx, const THelpers::TGenerationStep &genStep,
- TVector<TLogoBlobID> &keep, TVector<TLogoBlobID> &doNotKeep);
+ void StartGC(const TActorContext &ctx, TVector<TLogoBlobID> &keep, TVector<TLogoBlobID> &doNotKeep,
+ TVector<TLogoBlobID>& trashGoingToCollect);
void StartCollectingIfPossible(const TActorContext &ctx);
ui64 OnEvCollect(const TActorContext &ctx);
void OnEvCollectDone(ui64 perGenerationCounterStepSize, TActorId collector, const TActorContext &ctx);
- void OnEvEraseCollect(const TActorContext &ctx);
void OnEvCompleteGC();
- void OnEvPartialCompleteGC(TEvKeyValue::TEvPartialCompleteGC *ev);
void Reply(THolder<TIntermediate> &intermediate, const TActorContext &ctx, const TTabletStorageInfo *info);
@@ -434,7 +423,7 @@ public:
const TTabletStorageInfo* /*info*/);
void Step();
- TLogoBlobID AllocateLogoBlobId(ui32 size, ui32 storageChannelIdx);
+ TLogoBlobID AllocateLogoBlobId(ui32 size, ui32 storageChannelIdx, ui64 requestUid);
TIntrusivePtr<TCollectOperation>& GetCollectOperation() {
return CollectOperation;
}
@@ -463,6 +452,7 @@ public:
void OnRequestComplete(ui64 requestUid, ui64 generation, ui64 step, const TActorContext &ctx,
const TTabletStorageInfo *info, NMsgBusProxy::EResponseStatus status, const TRequestStat &stat);
+ void CancelInFlight(ui64 requestUid);
void OnEvIntermediate(TIntermediate &intermediate, const TActorContext &ctx);
void OnEvRequest(TEvKeyValue::TEvRequest::TPtr &ev, const TActorContext &ctx, const TTabletStorageInfo *info);
diff --git a/ydb/core/keyvalue/keyvalue_state_collect.cpp b/ydb/core/keyvalue/keyvalue_state_collect.cpp
index 9ca39f19fd1..18f111ea04c 100644
--- a/ydb/core/keyvalue/keyvalue_state_collect.cpp
+++ b/ydb/core/keyvalue/keyvalue_state_collect.cpp
@@ -7,7 +7,7 @@ namespace NKeyValue {
void TKeyValueState::PrepareCollectIfNeeded(const TActorContext &ctx) {
LOG_TRACE_S(ctx, NKikimrServices::KEYVALUE, "PrepareCollectIfNeeded KeyValue# " << TabletId << " Marker# KV61");
- if (CollectOperation.Get() || InitialCollectsSent) {
+ if (IsCollectEventSent || InitialCollectsSent) {
// We already are trying to collect something, just pass this time.
return;
}
@@ -25,22 +25,27 @@ void TKeyValueState::PrepareCollectIfNeeded(const TActorContext &ctx) {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// derive new collect step for this operation
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- ui64 collectNeededStep = 0;
- if (maxId.Generation() == ExecutorGeneration) {
- collectNeededStep = maxId.Step(); // collect up to this maximum blob
- } else {
- // step 0 by default provides collection of all trash blobs up to maxId as the generation is less than current one
- Y_VERIFY(maxId.Generation() < ExecutorGeneration);
+ THelpers::TGenerationStep inflightGenStep(Max<ui32>(), Max<ui32>());
+ if (InFlightForStep) {
+ const auto& [step, _] = *InFlightForStep.begin();
+ Y_VERIFY(step);
+ inflightGenStep = THelpers::TGenerationStep(ExecutorGeneration, step - 1);
}
+ const auto storedGenStep = THelpers::TGenerationStep(StoredState.GetCollectGeneration(), StoredState.GetCollectStep());
+ const auto collectGenStep = Min(inflightGenStep, Max(storedGenStep, THelpers::GenerationStep(maxId)));
+ Y_VERIFY(THelpers::TGenerationStep(ExecutorGeneration, 0) <= collectGenStep);
+ Y_VERIFY(storedGenStep <= collectGenStep);
- // Don't let the CollectGeneration step backwards
- if (StoredState.GetCollectGeneration() == ExecutorGeneration && collectNeededStep < StoredState.GetCollectStep()) {
- collectNeededStep = StoredState.GetCollectStep();
+ // check if it is useful to start any collection
+ const TLogoBlobID minTrashId = *Trash.begin();
+ if (collectGenStep < THelpers::GenerationStep(minTrashId)) {
+ return; // we do not have the opportunity to collect anything here with this allowed barrier
}
// create basic collect operation with zero keep/doNotKeep flag vectors; they will be calculated just before sending
- CollectOperation.Reset(new TCollectOperation(ExecutorGeneration, collectNeededStep, {} /* keep */, {} /* doNoKeep */));
- if (collectNeededStep == NextLogoBlobStep) {
+ CollectOperation.Reset(new TCollectOperation(std::get<0>(collectGenStep), std::get<1>(collectGenStep),
+ {} /* keep */, {} /* doNoKeep */, {} /* trashGoingToCollect */));
+ if (std::get<1>(collectGenStep) == NextLogoBlobStep) {
// advance to the next step if we are going to collect everything up to current one; otherwise we can keep
// current step
Step();
@@ -49,48 +54,26 @@ void TKeyValueState::PrepareCollectIfNeeded(const TActorContext &ctx) {
StartCollectingIfPossible(ctx);
}
+bool TKeyValueState::RemoveCollectedTrash(ISimpleDb &db, const TActorContext &ctx) {
+ if (auto& trash = CollectOperation->TrashGoingToCollect) {
+ ui32 collected = 0;
+ for (ui32 maxItemsToStore = 10'000; trash && maxItemsToStore; trash.pop_back(), --maxItemsToStore) {
+ const TLogoBlobID& id = trash.back();
+ THelpers::DbEraseTrash(id, db, ctx);
+ ui32 num = Trash.erase(id);
+ Y_VERIFY(num == 1);
+ CountTrashCollected(id.BlobSize());
+ ++collected;
+ }
-void TKeyValueState::RemoveFromTrashDoNotKeep(ISimpleDb &db, const TActorContext &ctx,
- const TVector<TLogoBlobID> &collectedDoNotKeep)
-{
- for (const TLogoBlobID &id : collectedDoNotKeep) {
- THelpers::DbEraseTrash(id, db, ctx);
- ui32 num = Trash.erase(id);
- Y_VERIFY(num == 1);
- CountTrashCollected(id.BlobSize());
- }
- STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC24, "Remove from Trash",
- (TabletId, TabletId),
- (RemovedCount, collectedDoNotKeep.size()),
- (TrashCount, Trash.size()));
-}
+ STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC24, "Remove from Trash",
+ (TabletId, TabletId), (RemovedCount, collected), (TrashCount, Trash.size()));
-void TKeyValueState::RemoveFromTrashBySoftBarrier(ISimpleDb &db, const TActorContext &ctx,
- const NKeyValue::THelpers::TGenerationStep &genStep)
-{
- ui64 storedCollectGeneration = StoredState.GetCollectGeneration();
- ui64 storedCollectStep = StoredState.GetCollectStep();
-
- // remove trash entries that were not marked as 'Keep' before, but which are automatically deleted by this barrier
- // to prevent them from being added to 'DoNotKeep' list after
- ui32 counter = 0;
- for (auto it = Trash.begin(); it != Trash.end(); ) {
- THelpers::TGenerationStep trashGenStep = THelpers::GenerationStep(*it);
- bool afterStoredSoftBarrier = trashGenStep > THelpers::TGenerationStep(storedCollectGeneration, storedCollectStep);
- bool beforeSoftBarrier = trashGenStep <= genStep;
- if (afterStoredSoftBarrier && beforeSoftBarrier) {
- CountTrashCollected(it->BlobSize());
- THelpers::DbEraseTrash(*it, db, ctx);
- it = Trash.erase(it);
- if (++counter >= MaxCollectGarbageFlagsPerMessage) {
- RepeatGCTX = true;
- break;
- }
- } else {
- ++it;
- }
+ return trash.empty();
}
+
+ return true;
}
void TKeyValueState::UpdateStoredState(ISimpleDb &db, const TActorContext &ctx,
@@ -101,90 +84,15 @@ void TKeyValueState::UpdateStoredState(ISimpleDb &db, const TActorContext &ctx,
THelpers::DbUpdateState(StoredState, db, ctx);
}
-void TKeyValueState::UpdateAfterPartialGC(ISimpleDb &db, const TActorContext &ctx) {
- RemoveFromTrashDoNotKeep(db, ctx, PartialCollectedDoNotKeep);
-}
-
-void TKeyValueState::UpdateGC(ISimpleDb &db, const TActorContext &ctx, bool updateTrash, bool updateState) {
- if (IsDamaged) {
- return;
- }
- Y_VERIFY(CollectOperation);
-
- ui64 collectGeneration = CollectOperation->Header.GetCollectGeneration();
- ui64 collectStep = CollectOperation->Header.GetCollectStep();
- auto collectGenStep = THelpers::TGenerationStep(collectGeneration, collectStep);
-
- if (updateTrash) {
- for (TLogoBlobID &id: CollectOperation->DoNotKeep) {
- THelpers::DbEraseTrash(id, db, ctx);
- ui32 num = Trash.erase(id);
- Y_VERIFY(num == 1);
- CountTrashCollected(id.BlobSize());
- }
-
- RemoveFromTrashBySoftBarrier(db, ctx, collectGenStep);
- }
-
- if (updateState) {
+void TKeyValueState::CompleteGCExecute(ISimpleDb &db, const TActorContext &ctx) {
+ if (RemoveCollectedTrash(db, ctx)) {
+ const ui32 collectGeneration = CollectOperation->Header.GetCollectGeneration();
+ const ui32 collectStep = CollectOperation->Header.GetCollectStep();
+ auto collectGenStep = THelpers::TGenerationStep(collectGeneration, collectStep);
UpdateStoredState(db, ctx, collectGenStep);
+ } else {
+ RepeatGCTX = true;
}
-
-}
-
-void TKeyValueState::StoreCollectExecute(ISimpleDb &db, const TActorContext &ctx) {
- LOG_TRACE_S(ctx, NKikimrServices::KEYVALUE, "StoreCollectExecute KeyValue# " << TabletId
- << " IsDamaged# " << IsDamaged << " Marker# KV62");
-
- if (IsDamaged) {
- return;
- }
- Y_VERIFY(CollectOperation.Get());
-
- // This operation will be executed no matter what
- const ui32 collectGen = CollectOperation->Header.GetCollectGeneration();
- const ui32 collectStep = CollectOperation->Header.GetCollectStep();
- THelpers::DbUpdateCollect(collectGen,
- collectStep,
- CollectOperation->Keep,
- CollectOperation->DoNotKeep,
- db, ctx);
-
- UpdateGC(db, ctx, true, false);
-}
-
-void TKeyValueState::StoreCollectComplete(const TActorContext &ctx) {
- ctx.Send(KeyValueActorId, new TEvKeyValue::TEvCollect());
-}
-
-void TKeyValueState::EraseCollectExecute(ISimpleDb &db, const TActorContext &ctx) {
- LOG_TRACE_S(ctx, NKikimrServices::KEYVALUE, "EraseCollectExecute KeyValue# " << TabletId
- << " IsDamaged# " << IsDamaged << " Marker# KV63");
- if (IsDamaged) {
- return;
- }
- Y_VERIFY(CollectOperation);
- // Erase the collect operation
- THelpers::DbEraseCollect(db, ctx);
- // Update the state
- UpdateGC(db, ctx, false, true);
-}
-
-void TKeyValueState::EraseCollectComplete(const TActorContext &ctx) {
- Y_VERIFY(CollectOperation);
- CollectOperation.Reset(nullptr);
- IsCollectEventSent = false;
-
- // Start new collect operation if needed
- PrepareCollectIfNeeded(ctx);
-}
-
-void TKeyValueState::CompleteGCExecute(ISimpleDb &db, const TActorContext &ctx) {
- ui64 collectGeneration = CollectOperation->Header.GetCollectGeneration();
- ui64 collectStep = CollectOperation->Header.GetCollectStep();
- auto collectGenStep = THelpers::TGenerationStep(collectGeneration, collectStep);
- RemoveFromTrashBySoftBarrier(db, ctx, collectGenStep);
- UpdateStoredState(db, ctx, collectGenStep);
}
void TKeyValueState::CompleteGCComplete(const TActorContext &ctx) {
@@ -205,35 +113,21 @@ void TKeyValueState::CompleteGCComplete(const TActorContext &ctx) {
PrepareCollectIfNeeded(ctx);
}
-
-void TKeyValueState::PartialCompleteGCExecute(ISimpleDb &db, const TActorContext &ctx) {
- UpdateAfterPartialGC(db, ctx);
-}
-
-void TKeyValueState::PartialCompleteGCComplete(const TActorContext &ctx) {
- PartialCollectedDoNotKeep.clear();
- STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC23, "Send ContinueGC",
- (TabletId, TabletId),
- (TrashCount, Trash.size()));
- ctx.Send(CollectorActorId, new TEvKeyValue::TEvContinueGC(std::move(PartialCollectedDoNotKeep)));
-}
-
-// Prepare the completely new full collect operation with the same gen/step, but with correct keep & doNotKeep lists
-void TKeyValueState::SendStoreCollect(const TActorContext &ctx, const THelpers::TGenerationStep &genStep,
- TVector<TLogoBlobID> &keep, TVector<TLogoBlobID> &doNotKeep) {
- ui32 generation, step;
- std::tie(generation, step) = genStep;
- CollectOperation.Reset(new TCollectOperation(generation, step, std::move(keep), std::move(doNotKeep)));
- ctx.Send(KeyValueActorId, new TEvKeyValue::TEvStoreCollect());
-}
-
-void TKeyValueState::StartGC(const TActorContext &ctx, const THelpers::TGenerationStep &genStep,
- TVector<TLogoBlobID> &keep, TVector<TLogoBlobID> &doNotKeep)
-{
- ui32 generation, step;
- std::tie(generation, step) = genStep;
- CollectOperation.Reset(new TCollectOperation(generation, step, std::move(keep), std::move(doNotKeep)));
+void TKeyValueState::StartGC(const TActorContext &ctx, TVector<TLogoBlobID> &keep, TVector<TLogoBlobID> &doNotKeep,
+ TVector<TLogoBlobID>& trashGoingToCollect) {
+ // ensure we haven't filled these fields yet
+ Y_VERIFY(CollectOperation);
+ Y_VERIFY(!CollectOperation->Keep);
+ Y_VERIFY(!CollectOperation->DoNotKeep);
+ Y_VERIFY(!CollectOperation->TrashGoingToCollect);
+ // fill in correct values
+ CollectOperation->Keep = std::move(keep);
+ CollectOperation->DoNotKeep = std::move(doNotKeep);
+ CollectOperation->TrashGoingToCollect = std::move(trashGoingToCollect);
+ // issue command to collector
ctx.Send(KeyValueActorId, new TEvKeyValue::TEvCollect());
+ Y_VERIFY(!IsCollectEventSent);
+ IsCollectEventSent = true;
}
void TKeyValueState::StartCollectingIfPossible(const TActorContext &ctx) {
@@ -241,9 +135,7 @@ void TKeyValueState::StartCollectingIfPossible(const TActorContext &ctx) {
<< " IsCollectEventSent# " << IsCollectEventSent << " Marker# KV64");
// there is nothing to collect yet, or the event was already sent
- if (!CollectOperation || IsCollectEventSent) {
- return;
- }
+ Y_VERIFY(CollectOperation && !IsCollectEventSent);
// create generation:step barrier tuple for proposed garbage collection command
const auto &header = CollectOperation->Header;
@@ -251,19 +143,10 @@ void TKeyValueState::StartCollectingIfPossible(const TActorContext &ctx) {
// if we have some in flight writes, check if they do not overlap with the new barrier
if (InFlightForStep) {
- // get the first item from the map with the least value; its key contains step of blob being written
- auto leastWriteIt = InFlightForStep.begin();
- // construct generation:step pair of the oldest in flight blob
- auto inFlightGenStep = THelpers::TGenerationStep(ExecutorGeneration, leastWriteIt->first);
- // check if the new barrier would delete blob being written, in this case hold with the collect operation
- if (inFlightGenStep <= collectGenStep) {
- return;
- }
+ const auto& [step, _] = *InFlightForStep.begin();
+ Y_VERIFY(collectGenStep < THelpers::TGenerationStep(ExecutorGeneration, step));
}
- Y_VERIFY(!IsCollectEventSent);
- IsCollectEventSent = true;
-
// create stored (previously issued) generation:step barrier as a tuple
auto storedGenStep = THelpers::TGenerationStep(StoredState.GetCollectGeneration(), StoredState.GetCollectStep());
@@ -283,18 +166,30 @@ void TKeyValueState::StartCollectingIfPossible(const TActorContext &ctx) {
// create list of blobs that must have to DoNotKeep flag set; these blobs must have Keep flag written and reside in
// Trash now
TVector<TLogoBlobID> doNotKeep;
+ TVector<TLogoBlobID> trashGoingToCollect;
doNotKeep.reserve(Trash.size());
- for (const TLogoBlobID &id : Trash) {
- if (THelpers::GenerationStep(id) <= storedGenStep) {
+ trashGoingToCollect.reserve(Trash.size());
+
+ for (const TLogoBlobID& id : Trash) {
+ auto genStep = THelpers::GenerationStep(id);
+ if (collectGenStep < genStep) {
+ break;
+ }
+ if (genStep <= storedGenStep || id.Generation() < ExecutorGeneration) { // assume Keep flag was issued to these blobs
doNotKeep.push_back(id);
}
+ Y_VERIFY(genStep <= collectGenStep);
+ trashGoingToCollect.push_back(id);
}
doNotKeep.shrink_to_fit();
+ trashGoingToCollect.shrink_to_fit();
+
+ Y_VERIFY(trashGoingToCollect);
LOG_TRACE_S(ctx, NKikimrServices::KEYVALUE, "StartCollectingIfPossible KeyValue# " << TabletId
<< "Flags Keep.Size# " << keep.size() << " DoNotKeep.Size# " << doNotKeep.size() << " Marker# KV65");
- StartGC(ctx, collectGenStep, keep, doNotKeep);
+ StartGC(ctx, keep, doNotKeep, trashGoingToCollect);
}
ui64 TKeyValueState::OnEvCollect(const TActorContext &ctx) {
@@ -317,18 +212,9 @@ void TKeyValueState::OnEvCollectDone(ui64 perGenerationCounterStepSize, TActorId
CollectorActorId = collector;
}
-void TKeyValueState::OnEvEraseCollect(const TActorContext &ctx) {
- Y_UNUSED(ctx);
- CountLatencyBsCollect();
-}
-
void TKeyValueState::OnEvCompleteGC() {
CountLatencyBsCollect();
}
-void TKeyValueState::OnEvPartialCompleteGC(TEvKeyValue::TEvPartialCompleteGC *ev) {
- PartialCollectedDoNotKeep = std::move(ev->CollectedDoNotKeep);
-}
-
} // NKeyValue
} // NKikimr
diff --git a/ydb/core/keyvalue/keyvalue_ut.cpp b/ydb/core/keyvalue/keyvalue_ut.cpp
index a8babc46c12..2f2d8118f08 100644
--- a/ydb/core/keyvalue/keyvalue_ut.cpp
+++ b/ydb/core/keyvalue/keyvalue_ut.cpp
@@ -1125,9 +1125,6 @@ Y_UNIT_TEST(TestWriteReadDeleteWithRestartsAndCatchCollectGarbageEventsWithSlowI
savedInitialEvents.pop();
}
- TDispatchOptions options3;
- options3.FinalEvents.push_back(TEvKeyValue::TEvEraseCollect::EventType);
-
TestLog("Third dispatch ", collectStep);
UNIT_ASSERT_VALUES_EQUAL(collectStep, 2);