diff options
author | alexvru <alexvru@ydb.tech> | 2023-07-20 15:59:55 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-07-20 15:59:55 +0300 |
commit | cb49856f9541f9089e06bbd91524ba418919037d (patch) | |
tree | e38f111ea3919a34de7b5c779381eb9e5ce1451a | |
parent | 629dca0f6d0e10c2a76afdebaa75bd9042b8eaaa (diff) | |
download | ydb-cb49856f9541f9089e06bbd91524ba418919037d.tar.gz |
Fix blob leakage KIKIMR-18784
-rw-r--r-- | ydb/core/keyvalue/keyvalue_collect_operation.h | 4 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_collector.cpp | 32 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_collector_ut.cpp | 47 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_events.h | 28 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_flat_impl.h | 26 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_state.cpp | 107 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_state.h | 26 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_state_collect.cpp | 258 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_ut.cpp | 3 |
9 files changed, 126 insertions, 405 deletions
diff --git a/ydb/core/keyvalue/keyvalue_collect_operation.h b/ydb/core/keyvalue/keyvalue_collect_operation.h index 49c67c34cf7..1fd19da441a 100644 --- a/ydb/core/keyvalue/keyvalue_collect_operation.h +++ b/ydb/core/keyvalue/keyvalue_collect_operation.h @@ -50,12 +50,14 @@ struct TCollectOperation : public TThrRefBase { TCollectOperationHeader Header; TVector<TLogoBlobID> Keep; TVector<TLogoBlobID> DoNotKeep; + TVector<TLogoBlobID> TrashGoingToCollect; TCollectOperation(ui64 collectGeneration, ui64 collectStep, - TVector<TLogoBlobID> &&keep, TVector<TLogoBlobID> &&doNotKeep) + TVector<TLogoBlobID> &&keep, TVector<TLogoBlobID> &&doNotKeep, TVector<TLogoBlobID>&& trashGoingToCollect) : Header(collectGeneration, collectStep, keep, doNotKeep) , Keep(std::move(keep)) , DoNotKeep(std::move(doNotKeep)) + , TrashGoingToCollect(std::move(trashGoingToCollect)) {} }; diff --git a/ydb/core/keyvalue/keyvalue_collector.cpp b/ydb/core/keyvalue/keyvalue_collector.cpp index b4c72dbd148..eeb2585ff84 100644 --- a/ydb/core/keyvalue/keyvalue_collector.cpp +++ b/ydb/core/keyvalue/keyvalue_collector.cpp @@ -34,9 +34,6 @@ class TKeyValueCollector : public TActorBootstrapped<TKeyValueCollector> { TVector<TMap<ui32, TGroupCollector>> CollectorForGroupForChannel; ui32 EndChannel = 0; - // For DoNotKeep - TVector<TLogoBlobID> CollectedDoNotKeep; - public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::KEYVALUE_ACTOR; @@ -106,7 +103,6 @@ public: maxDoNotKeepSizeInGroupChannel = Max(maxDoNotKeepSizeInGroupChannel, collector.DoNotKeep.size()); } } - CollectedDoNotKeep.reserve(Min(maxDoNotKeepSizeInGroupChannel, MaxCollectGarbageFlagsPerMessage)); SendTheRequest(); Become(&TThis::StateWait); @@ -158,10 +154,6 @@ public: (TabletId, TabletInfo->TabletID), (Channel, GetCurretChannelId())); CollectorForGroupForChannel.pop_back(); } - if (CollectedDoNotKeep.size()) { - SendPartialCompleteGC(); - return; - } if (CollectorForGroupForChannel.empty()) { SendCompleteGCAndDie(); return; @@ -196,14 +188,6 @@ public: } } - void SendPartialCompleteGC() { - STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC14, "Collector send PartialCompleteGC", - (TabletId, TabletInfo->TabletID), - (FirstDoNotKeep, (CollectedDoNotKeep.size() ? CollectedDoNotKeep[0].ToString() : "none")), - (CollectedDoNotKeepSize, CollectedDoNotKeep.size())); - Send(KeyValueActorId, new TEvKeyValue::TEvPartialCompleteGC(std::move(CollectedDoNotKeep))); - } - void SendCompleteGCAndDie() { STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC19, "Collector send CompleteGC", (TabletId, TabletInfo->TabletID)); @@ -251,9 +235,6 @@ public: collector.NextCountOfSentFlags += doNotKeepSize; Copy(begin, end, doNotKeep->begin()); - if (!IsRepeatedRequest) { - Copy(doNotKeep->cbegin(), doNotKeep->cend(), std::back_inserter(CollectedDoNotKeep)); - } } ui32 keepStartIdx = 0; @@ -298,18 +279,6 @@ public: IsRepeatedRequest = false; } - void HandleContinueGC(TEvKeyValue::TEvContinueGC::TPtr &ev) { - STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC13, "Collector continue GC", - (TabletId, TabletInfo->TabletID)); - if (CollectorForGroupForChannel.empty()) { - SendCompleteGCAndDie(); - return; - } - CollectedDoNotKeep = std::move(ev->Get()->Buffer); - CollectedDoNotKeep.clear(); - SendTheRequest(); - } - void HandleWakeUp() { auto collectorsOfCurrentChannel = CollectorForGroupForChannel.rbegin(); if (collectorsOfCurrentChannel == CollectorForGroupForChannel.rend()) { @@ -352,7 +321,6 @@ public: STATEFN(StateWait) { switch (ev->GetTypeRewrite()) { hFunc(TEvBlobStorage::TEvCollectGarbageResult, Handle); - hFunc(TEvKeyValue::TEvContinueGC, HandleContinueGC); cFunc(TEvents::TEvWakeup::EventType, HandleWakeUp); cFunc(TEvents::TEvPoisonPill::EventType, HandlePoisonPill); default: diff --git a/ydb/core/keyvalue/keyvalue_collector_ut.cpp b/ydb/core/keyvalue/keyvalue_collector_ut.cpp index 94d8314b8e9..a7926ca3b8c 100644 --- a/ydb/core/keyvalue/keyvalue_collector_ut.cpp +++ b/ydb/core/keyvalue/keyvalue_collector_ut.cpp @@ -117,7 +117,7 @@ Y_UNIT_TEST(TestKeyValueCollectorEmpty) { TVector<TLogoBlobID> keep; TVector<TLogoBlobID> doNotKeep; - TIntrusivePtr<NKeyValue::TCollectOperation> operation(new NKeyValue::TCollectOperation(100, 100, std::move(keep), std::move(doNotKeep))); + TIntrusivePtr<NKeyValue::TCollectOperation> operation(new NKeyValue::TCollectOperation(100, 100, std::move(keep), std::move(doNotKeep), {})); context.SetActor(CreateKeyValueCollector( context.GetTabletActorId(), operation, context.GetTabletInfo().Get(), 200, 200, true)); @@ -144,7 +144,7 @@ Y_UNIT_TEST(TestKeyValueCollectorSingle) { TVector<TLogoBlobID> keep; keep.emplace_back(0x10010000001000Bull, 5, 58949, NKeyValue::BLOB_CHANNEL, 1209816, 10); TVector<TLogoBlobID> doNotKeep; - TIntrusivePtr<NKeyValue::TCollectOperation> operation(new NKeyValue::TCollectOperation(100, 100, std::move(keep), std::move(doNotKeep))); + TIntrusivePtr<NKeyValue::TCollectOperation> operation(new NKeyValue::TCollectOperation(100, 100, std::move(keep), std::move(doNotKeep), {})); context.SetActor(CreateKeyValueCollector( context.GetTabletActorId(), operation, context.GetTabletInfo().Get(), 200, 200, true)); @@ -180,7 +180,7 @@ Y_UNIT_TEST(TestKeyValueCollectorSingleWithOneError) { TVector<TLogoBlobID> keep; keep.emplace_back(0x10010000001000Bull, 5, 58949, NKeyValue::BLOB_CHANNEL, 1209816, 10); TVector<TLogoBlobID> doNotKeep; - TIntrusivePtr<NKeyValue::TCollectOperation> operation(new NKeyValue::TCollectOperation(100, 100, std::move(keep), std::move(doNotKeep))); + TIntrusivePtr<NKeyValue::TCollectOperation> operation(new NKeyValue::TCollectOperation(100, 100, std::move(keep), std::move(doNotKeep), {})); context.SetActor(CreateKeyValueCollector( context.GetTabletActorId(), operation, context.GetTabletInfo().Get(), 200, 200, true)); @@ -242,7 +242,7 @@ Y_UNIT_TEST(TestKeyValueCollectorMultiple) { ids.insert(doNotKeep[i]); } - TIntrusivePtr<NKeyValue::TCollectOperation> operation(new NKeyValue::TCollectOperation(100, 100, std::move(keep), std::move(doNotKeep))); + TIntrusivePtr<NKeyValue::TCollectOperation> operation(new NKeyValue::TCollectOperation(100, 100, std::move(keep), std::move(doNotKeep), {})); context.SetActor(CreateKeyValueCollector( context.GetTabletActorId(), operation, context.GetTabletInfo().Get(), 200, 200, true)); @@ -275,16 +275,6 @@ Y_UNIT_TEST(TestKeyValueCollectorMultiple) { context.Send(new TEvBlobStorage::TEvCollectGarbageResult(NKikimrProto::OK, collect->TabletId, collect->RecordGeneration, collect->PerGenerationCounter, collect->Channel)); - - if (collect && collect->DoNotKeep && collect->DoNotKeep->size()) { - auto complete = context.GrabEvent<TEvKeyValue::TEvPartialCompleteGC>(handle); - THashSet<TLogoBlobID> uniqueBlobs; - uniqueBlobs.insert(complete->CollectedDoNotKeep.begin(), complete->CollectedDoNotKeep.end()); - UNIT_ASSERT_VALUES_EQUAL(uniqueBlobs.size(), complete->CollectedDoNotKeep.size()); - complete->CollectedDoNotKeep.clear(); - auto cont = std::make_unique<TEvKeyValue::TEvContinueGC>(std::move(complete->CollectedDoNotKeep)); - context.Send(cont.release()); - } } UNIT_ASSERT(erased == 8); @@ -307,11 +297,11 @@ Y_UNIT_TEST(TestKeyValueCollectorMany) { keep.emplace_back(0x10010000001000Bull, idx, 58949, NKeyValue::BLOB_CHANNEL, 1209816, 10); } - TIntrusivePtr<NKeyValue::TCollectOperation> operation(new NKeyValue::TCollectOperation(100, 100, std::move(keep), std::move(doNotKeep))); + TIntrusivePtr<NKeyValue::TCollectOperation> operation(new NKeyValue::TCollectOperation(100, 100, std::move(keep), std::move(doNotKeep), {})); context.SetActor(CreateKeyValueCollector( context.GetTabletActorId(), operation, context.GetTabletInfo().Get(), 200, 200, true)); - auto handleGC = [&](bool withContinueGC, ui32 keepSize, ui32 doNotKeepSize) { + auto handleGC = [&](ui32 keepSize, ui32 doNotKeepSize) { TAutoPtr<IEventHandle> handle; auto collect = context.GrabEvent<TEvBlobStorage::TEvCollectGarbage>(handle); UNIT_ASSERT(collect); @@ -319,24 +309,17 @@ Y_UNIT_TEST(TestKeyValueCollectorMany) { UNIT_ASSERT_VALUES_EQUAL((collect->DoNotKeep ? collect->DoNotKeep->size() : 0), doNotKeepSize); context.Send(new TEvBlobStorage::TEvCollectGarbageResult(NKikimrProto::OK, collect->TabletId, collect->RecordGeneration, collect->PerGenerationCounter, collect->Channel)); - - if (withContinueGC) { - auto complete = context.GrabEvent<TEvKeyValue::TEvPartialCompleteGC>(handle); - complete->CollectedDoNotKeep.clear(); - auto cont = std::make_unique<TEvKeyValue::TEvContinueGC>(std::move(complete->CollectedDoNotKeep)); - context.Send(cont.release()); - } }; - handleGC(true, 20, 20); // group 0 - handleGC(true, 10, 10); // group 1 - handleGC(true, 0, 10'000); // group 2 DoNotKeep 30..10029 - handleGC(true, 30, 9'970); // group 2 DoNotKeep 10030..19999 Keep 30.59 - handleGC(false, 10'000, 0); // group 2 Keep 60..10059 - handleGC(false, 9'940, 0); // group 2 Keep 10060..20000 - handleGC(false, 0, 0); // group 3 - handleGC(false, 0, 0); // group 4 - handleGC(false, 0, 0); // group 5 + handleGC(20, 20); // group 0 + handleGC(10, 10); // group 1 + handleGC(0, 10'000); // group 2 DoNotKeep 30..10029 + handleGC(30, 9'970); // group 2 DoNotKeep 10030..19999 Keep 30.59 + handleGC(10'000, 0); // group 2 Keep 60..10059 + handleGC(9'940, 0); // group 2 Keep 10060..20000 + handleGC(0, 0); // group 3 + handleGC(0, 0); // group 4 + handleGC(0, 0); // group 5 TAutoPtr<IEventHandle> handle; auto eraseCollect = context.GrabEvent<TEvKeyValue::TEvCompleteGC>(handle); diff --git a/ydb/core/keyvalue/keyvalue_events.h b/ydb/core/keyvalue/keyvalue_events.h index 95ccf72ef50..2c64e120e30 100644 --- a/ydb/core/keyvalue/keyvalue_events.h +++ b/ydb/core/keyvalue/keyvalue_events.h @@ -20,13 +20,10 @@ struct TEvKeyValue { EvNotify, EvStoreCollect, EvCollect, - EvEraseCollect, EvPeriodicRefresh, EvReportWriteLatency, EvUpdateWeights, EvCompleteGC, - EvPartialCompleteGC, - EvContinueGC, EvRead = EvRequest + 16, EvReadRange, @@ -185,18 +182,10 @@ struct TEvKeyValue { } }; - struct TEvStoreCollect : public TEventLocal<TEvStoreCollect, EvStoreCollect> { - TEvStoreCollect() { } - }; - struct TEvCollect : public TEventLocal<TEvCollect, EvCollect> { TEvCollect() { } }; - struct TEvEraseCollect : public TEventLocal<TEvEraseCollect, EvEraseCollect> { - TEvEraseCollect() { } - }; - struct TEvPeriodicRefresh : public TEventLocal<TEvPeriodicRefresh, EvPeriodicRefresh> { TEvPeriodicRefresh() { } }; @@ -204,23 +193,6 @@ struct TEvKeyValue { struct TEvCompleteGC : public TEventLocal<TEvCompleteGC, EvCompleteGC> { TEvCompleteGC() { } }; - - struct TEvPartialCompleteGC : public TEventLocal<TEvPartialCompleteGC, EvPartialCompleteGC> { - TVector<TLogoBlobID> CollectedDoNotKeep; - - TEvPartialCompleteGC() { } - - TEvPartialCompleteGC(TVector<TLogoBlobID> &&doNotKeeps) - : CollectedDoNotKeep(std::move(doNotKeeps)) - { } - }; - - struct TEvContinueGC : public TEventLocal<TEvContinueGC, EvContinueGC> { - TVector<TLogoBlobID> Buffer; - TEvContinueGC(TVector<TLogoBlobID> &&buffer) - : Buffer(std::move(buffer)) - { } - }; }; } // NKikimr diff --git a/ydb/core/keyvalue/keyvalue_flat_impl.h b/ydb/core/keyvalue/keyvalue_flat_impl.h index d5f6211438e..5e97a383c1d 100644 --- a/ydb/core/keyvalue/keyvalue_flat_impl.h +++ b/ydb/core/keyvalue/keyvalue_flat_impl.h @@ -265,11 +265,8 @@ protected: } #endif - KV_SIMPLE_TX(StoreCollect); - KV_SIMPLE_TX(EraseCollect); KV_SIMPLE_TX(RegisterInitialGCCompletion); KV_SIMPLE_TX(CompleteGC); - KV_SIMPLE_TX(PartialCompleteGC); TKeyValueState State; TDeque<TAutoPtr<IEventHandle>> InitialEventsQueue; @@ -332,13 +329,6 @@ protected: //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Online state - void Handle(TEvKeyValue::TEvEraseCollect::TPtr &ev, const TActorContext &ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletID() - << " Handle TEvEraseCollect " << ev->Get()->ToString()); - State.OnEvEraseCollect(ctx); - Execute(new TTxEraseCollect(this), ctx); - } - void Handle(TEvKeyValue::TEvCompleteGC::TPtr &ev, const TActorContext &ctx) { LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletID() << " Handle TEvCompleteGC " << ev->Get()->ToString()); @@ -346,13 +336,6 @@ protected: Execute(new TTxCompleteGC(this), ctx); } - void Handle(TEvKeyValue::TEvPartialCompleteGC::TPtr &ev, const TActorContext &ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletID() - << " Handle TEvPartitialCompleteGC " << ev->Get()->ToString()); - State.OnEvPartialCompleteGC(ev->Get()); - Execute(new TTxPartialCompleteGC(this), ctx); - } - void Handle(TEvKeyValue::TEvCollect::TPtr &ev, const TActorContext &ctx) { LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletID() << " Handle TEvCollect " << ev->Get()->ToString()); @@ -376,12 +359,6 @@ protected: } } - void Handle(TEvKeyValue::TEvStoreCollect::TPtr &ev, const TActorContext &ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletID() - << " Handle TEvStoreCollect " << ev->Get()->ToString()); - Execute(new TTxStoreCollect(this), ctx); - } - void CheckYellowChannels(TRequestStat& stat) { IExecutor* executor = Executor(); if ((stat.YellowMoveChannels || stat.YellowStopChannels) && executor) { @@ -545,11 +522,8 @@ public: hFunc(TEvKeyValue::TEvGetStorageChannelStatus, Handle); hFunc(TEvKeyValue::TEvAcquireLock, Handle); - HFunc(TEvKeyValue::TEvEraseCollect, Handle); HFunc(TEvKeyValue::TEvCompleteGC, Handle); - HFunc(TEvKeyValue::TEvPartialCompleteGC, Handle); HFunc(TEvKeyValue::TEvCollect, Handle); - HFunc(TEvKeyValue::TEvStoreCollect, Handle); HFunc(TEvKeyValue::TEvRequest, Handle); HFunc(TEvKeyValue::TEvIntermediate, Handle); HFunc(TEvKeyValue::TEvNotify, Handle); diff --git a/ydb/core/keyvalue/keyvalue_state.cpp b/ydb/core/keyvalue/keyvalue_state.cpp index 3d162cb7b73..9ea2fb90864 100644 --- a/ydb/core/keyvalue/keyvalue_state.cpp +++ b/ydb/core/keyvalue/keyvalue_state.cpp @@ -438,28 +438,8 @@ void TKeyValueState::Load(const TString &key, const TString& value) { break; } case EIT_COLLECT: { + // just ignore the record Y_VERIFY(arbitraryPart.size() == 0); - Y_VERIFY(!CollectOperation.Get()); - Y_VERIFY(value.size() >= sizeof(TCollectOperationHeader)); - const TCollectOperationHeader *header = (const TCollectOperationHeader*)value.data(); - Y_VERIFY(header->DataHeader.ItemType == EIT_COLLECT); - ui64 totalSize = sizeof(TCollectOperationHeader) - + sizeof(TLogoBlobID) * (header->KeepCount + header->DoNotKeepCount); - Y_VERIFY(value.size() == totalSize); - TVector<TLogoBlobID> keep; - TVector<TLogoBlobID> doNotKeep; - keep.resize(header->KeepCount); - doNotKeep.resize(header->DoNotKeepCount); - const char* data = value.data() + sizeof(TCollectOperationHeader); - if (keep.size()) { - memcpy((char *) &keep[0], data, sizeof(TLogoBlobID) * keep.size()); - } - data += sizeof(TLogoBlobID) * keep.size(); - if (doNotKeep.size()) { - memcpy((char *) &doNotKeep[0], data, sizeof(TLogoBlobID) * doNotKeep.size()); - } - CollectOperation.Reset(new TCollectOperation( - header->CollectGeneration, header->CollectStep, std::move(keep), std::move(doNotKeep))); break; } case EIT_STATE: { @@ -601,12 +581,6 @@ void TKeyValueState::InitExecute(ui64 tabletId, TActorId keyValueActorId, ui32 e } } - if (CollectOperation) { - for (const TLogoBlobID &id : CollectOperation->Keep) { - addBlobToKeep(id); - } - } - for (const auto &channelInfo : info->Channels) { if (channelInfo.Channel < BLOB_CHANNEL) { continue; @@ -630,10 +604,11 @@ void TKeyValueState::InitExecute(ui64 tabletId, TActorId keyValueActorId, ui32 e std::tie(group, channel) = keepInfo.first; THolder<TVector<TLogoBlobID>>& keep = keepInfo.second; + const ui32 step = TEvBlobStorage::TEvCollectGarbage::PerGenerationCounterStepSize(keep.Get(), nullptr); auto ev = MakeHolder<TEvBlobStorage::TEvCollectGarbage>(info->TabletID, executorGeneration, PerGenerationCounter, channel, true /*collect*/, barrierGeneration, barrierStep, keep.Release(), nullptr /*doNotKeep*/, TInstant::Max(), false /*isMultiCollectAllowed*/, false /*hard*/); - ++PerGenerationCounter; + PerGenerationCounter += step; const TActorId nodeWarden = MakeBlobStorageNodeWardenID(ctx.SelfID.NodeId()); const TActorId proxy = MakeBlobStorageProxyID(group); @@ -686,40 +661,10 @@ void TKeyValueState::InitExecute(ui64 tabletId, TActorId keyValueActorId, ui32 e ChannelRangeSets[id.Channel()].Remove(id.Generation()); } } - if (CollectOperation) { - auto &keep = CollectOperation->Keep; - for (auto it = keep.begin(); it != keep.end(); ++it) { - const TLogoBlobID &id = *it; - if (id.Channel() < ChannelRangeSets.size()) { - ChannelRangeSets[id.Channel()].Remove(id.Generation()); - } - } - auto &doNotKeep = CollectOperation->DoNotKeep; - for (auto it = doNotKeep.begin(); it != doNotKeep.end(); ++it) { - const TLogoBlobID &id = *it; - if (id.Channel() < ChannelRangeSets.size()) { - ChannelRangeSets[id.Channel()].Remove(id.Generation()); - } - } - // Patch collect operation generation and step - Y_VERIFY(CollectOperation->Header.GetCollectGeneration() < ExecutorGeneration); - CollectOperation->Header.SetCollectGeneration(ExecutorGeneration); - CollectOperation->Header.SetCollectStep(0); - } - - if (CollectOperation) { - for (const TLogoBlobID &id : CollectOperation->DoNotKeep) { - Trash.insert(id); - THelpers::DbUpdateTrash(id, db, ctx); - } - THelpers::DbEraseCollect(db, ctx); - CollectOperation = nullptr; - } - + THelpers::DbEraseCollect(db, ctx); THelpers::DbUpdateState(StoredState, db, ctx); - // corner case, if no CollectGarbage events were sent if (InitialCollectsSent == 0) { SendCutHistory(ctx); @@ -798,7 +743,7 @@ void TKeyValueState::Step() { } } -TLogoBlobID TKeyValueState::AllocateLogoBlobId(ui32 size, ui32 storageChannelIdx) { +TLogoBlobID TKeyValueState::AllocateLogoBlobId(ui32 size, ui32 storageChannelIdx, ui64 requestUid) { ui32 generation = ExecutorGeneration; TLogoBlobID id(TabletId, generation, NextLogoBlobStep, storageChannelIdx, size, NextLogoBlobCookie); if (NextLogoBlobCookie < TLogoBlobID::MaxCookie) { @@ -808,6 +753,10 @@ TLogoBlobID TKeyValueState::AllocateLogoBlobId(ui32 size, ui32 storageChannelIdx } Y_VERIFY(!CollectOperation || THelpers::GenerationStep(id) > THelpers::TGenerationStep(CollectOperation->Header.GetCollectGeneration(), CollectOperation->Header.GetCollectStep())); + if (requestUid) { + ++InFlightForStep[id.Step()]; + ++RequestUidStepToCount[std::make_tuple(requestUid, id.Step())]; + } return id; } @@ -1363,7 +1312,7 @@ void TKeyValueState::CmdTrimLeakedBlobs(THolder<TIntermediate>& intermediate, IS Y_VERIFY(it->second != 0); } else if (!Trash.count(id)) { // we found a candidate for trash if (numItems < intermediate->TrimLeakedBlobs->MaxItemsToTrim) { - LOG_WARN_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId << " trimming " << id.ToString()); + LOG_WARN_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId << " trimming " << id); Trash.insert(id); CountTrashRecord(id.BlobSize()); THelpers::DbUpdateTrash(id, db, ctx); @@ -1753,15 +1702,21 @@ void TKeyValueState::OnRequestComplete(ui64 requestUid, ui64 generation, ui64 st CountRequestTakeOffOrEnqueue(requestType); } - if (StoredState.GetChannelGeneration() == generation) { - auto it = InFlightForStep.find(step); - Y_VERIFY(it != InFlightForStep.end(), "Unexpected step# %" PRIu64, (ui64)step); - it->second--; - if (it->second == 0) { - InFlightForStep.erase(it); + CancelInFlight(requestUid); + PrepareCollectIfNeeded(ctx); +} - // Initiate Garbage collection process if needed - StartCollectingIfPossible(ctx); +void TKeyValueState::CancelInFlight(ui64 requestUid) { + for (auto it = RequestUidStepToCount.lower_bound(std::make_tuple(requestUid, 0)); it != RequestUidStepToCount.end() && + std::get<0>(it->first) == requestUid; it = RequestUidStepToCount.erase(it)) { + const auto& [requestUid, step] = it->first; + const ui32 drop = it->second; + auto stepIt = InFlightForStep.find(step); + Y_VERIFY(stepIt != InFlightForStep.end() && drop <= stepIt->second); + if (drop < stepIt->second) { + stepIt->second -= drop; + } else { + InFlightForStep.erase(stepIt); } } } @@ -2210,7 +2165,7 @@ void TKeyValueState::SplitIntoBlobs(TIntermediate::TWrite &cmd, bool isInline, u ui64 sizeRemain = cmd.Data.size(); while (sizeRemain) { ui32 blobSize = Min<ui64>(sizeRemain, 8 << 20); - cmd.LogoBlobIds.push_back(AllocateLogoBlobId(blobSize, storageChannelIdx)); + cmd.LogoBlobIds.push_back(AllocateLogoBlobId(blobSize, storageChannelIdx, intermediate->RequestUid)); sizeRemain -= blobSize; } for (const TLogoBlobID& logoBlobId : cmd.LogoBlobIds) { @@ -2337,7 +2292,7 @@ TKeyValueState::TPrepareResult TKeyValueState::InitGetStatusCommand(TIntermediat } cmd.StorageChannel = storageChannel; - cmd.LogoBlobId = AllocateLogoBlobId(1, storageChannelIdx); + cmd.LogoBlobId = AllocateLogoBlobId(1, storageChannelIdx, 0); cmd.Status = NKikimrProto::UNKNOWN; } return {false, msg}; @@ -2925,7 +2880,6 @@ void TKeyValueState::OnEvReadRequest(TEvKeyValue::TEvRead::TPtr &ev, const TActo CountRequestIncoming(requestType); if (PrepareReadRequest(ctx, ev, intermediate, &requestType)) { - ++InFlightForStep[StoredState.GetChannelStep()]; if (requestType == TRequestType::ReadOnlyInline) { LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId << " Create storage inline read request, Marker# KV49"); @@ -2962,7 +2916,6 @@ void TKeyValueState::OnEvReadRangeRequest(TEvKeyValue::TEvReadRange::TPtr &ev, c CountRequestIncoming(requestType); if (PrepareReadRangeRequest(ctx, ev, intermediate, &requestType)) { - ++InFlightForStep[StoredState.GetChannelStep()]; if (requestType == TRequestType::ReadOnlyInline) { LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId << " Create storage inline read range request, Marker# KV58"); @@ -3000,7 +2953,6 @@ void TKeyValueState::OnEvExecuteTransaction(TEvKeyValue::TEvExecuteTransaction:: CountRequestIncoming(requestType); if (PrepareExecuteTransactionRequest(ctx, ev, intermediate, info)) { - ++InFlightForStep[StoredState.GetChannelStep()]; LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId << " Create storage request for WO, Marker# KV67"); RegisterRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration); @@ -3009,6 +2961,7 @@ void TKeyValueState::OnEvExecuteTransaction(TEvKeyValue::TEvExecuteTransaction:: } else { intermediate->UpdateStat(); CountRequestOtherError(requestType); + CancelInFlight(intermediate->RequestUid); } } @@ -3024,7 +2977,6 @@ void TKeyValueState::OnEvGetStorageChannelStatus(TEvKeyValue::TEvGetStorageChann CountRequestIncoming(requestType); if (PrepareGetStorageChannelStatusRequest(ctx, ev, intermediate, info)) { - ++InFlightForStep[StoredState.GetChannelStep()]; LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId << " Create GetStorageChannelStatus request, Marker# KV75"); RegisterRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration); @@ -3048,7 +3000,6 @@ void TKeyValueState::OnEvAcquireLock(TEvKeyValue::TEvAcquireLock::TPtr &ev, cons CountRequestIncoming(requestType); if (PrepareAcquireLockRequest(ctx, ev, intermediate)) { - ++InFlightForStep[StoredState.GetChannelStep()]; LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId << " Create AcquireLock request, Marker# KV80"); RegisterRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration); @@ -3094,7 +3045,6 @@ void TKeyValueState::OnEvRequest(TEvKeyValue::TEvRequest::TPtr &ev, const TActor if (PrepareIntermediate(ev, intermediate, requestType, ctx, info)) { // Spawn KeyValueStorageRequest actor on the same thread - ++InFlightForStep[StoredState.GetChannelStep()]; if (requestType == TRequestType::WriteOnly) { LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId << " Create storage request for WO, Marker# KV42"); @@ -3118,11 +3068,10 @@ void TKeyValueState::OnEvRequest(TEvKeyValue::TEvRequest::TPtr &ev, const TActor } CountRequestTakeOffOrEnqueue(requestType); - } else { intermediate->UpdateStat(); - CountRequestOtherError(requestType); + CancelInFlight(intermediate->RequestUid); } } diff --git a/ydb/core/keyvalue/keyvalue_state.h b/ydb/core/keyvalue/keyvalue_state.h index fc559b3d363..e7749f33904 100644 --- a/ydb/core/keyvalue/keyvalue_state.h +++ b/ydb/core/keyvalue/keyvalue_state.h @@ -13,6 +13,7 @@ #include "keyvalue_simple_db.h" #include "channel_balancer.h" #include <util/generic/set.h> +#include <util/generic/hash_multi_map.h> #include <ydb/core/base/appdata.h> #include <ydb/public/lib/base/msgbus.h> #include <ydb/core/tablet/tablet_counters.h> @@ -254,8 +255,9 @@ protected: THashMap<TLogoBlobID, ui32> RefCounts; TSet<TLogoBlobID> Trash; TMap<ui64, ui64> InFlightForStep; + TMap<std::tuple<ui64, ui32>, ui32> RequestUidStepToCount; THashMap<ui64, TInstant> RequestInputTime; - ui64 NextRequestUid = 0; + ui64 NextRequestUid = 1; TIntrusivePtr<TCollectOperation> CollectOperation; bool IsCollectEventSent; bool IsSpringCleanupDone; @@ -338,29 +340,16 @@ public: // garbage collection methods void PrepareCollectIfNeeded(const TActorContext &ctx); - void RemoveFromTrashDoNotKeep(ISimpleDb &db, const TActorContext &ctx, const TVector<TLogoBlobID> &collectedDoNotKeep); - void RemoveFromTrashBySoftBarrier(ISimpleDb &db, const TActorContext &ctx, const NKeyValue::THelpers::TGenerationStep &genStep); + bool RemoveCollectedTrash(ISimpleDb &db, const TActorContext &ctx); void UpdateStoredState(ISimpleDb &db, const TActorContext &ctx, const NKeyValue::THelpers::TGenerationStep &genStep); - void UpdateGC(ISimpleDb &db, const TActorContext &ctx, bool updateTrash, bool updateState); - void UpdateAfterPartialGC(ISimpleDb &db, const TActorContext &ctx); - void StoreCollectExecute(ISimpleDb &db, const TActorContext &ctx); - void StoreCollectComplete(const TActorContext &ctx); - void EraseCollectExecute(ISimpleDb &db, const TActorContext &ctx); - void EraseCollectComplete(const TActorContext &ctx); void CompleteGCExecute(ISimpleDb &db, const TActorContext &ctx); void CompleteGCComplete(const TActorContext &ctx); - void PartialCompleteGCExecute(ISimpleDb &db, const TActorContext &ctx); - void PartialCompleteGCComplete(const TActorContext &ctx); - void SendStoreCollect(const TActorContext &ctx, const THelpers::TGenerationStep &genStep, - TVector<TLogoBlobID> &keep, TVector<TLogoBlobID> &doNotKeep); - void StartGC(const TActorContext &ctx, const THelpers::TGenerationStep &genStep, - TVector<TLogoBlobID> &keep, TVector<TLogoBlobID> &doNotKeep); + void StartGC(const TActorContext &ctx, TVector<TLogoBlobID> &keep, TVector<TLogoBlobID> &doNotKeep, + TVector<TLogoBlobID>& trashGoingToCollect); void StartCollectingIfPossible(const TActorContext &ctx); ui64 OnEvCollect(const TActorContext &ctx); void OnEvCollectDone(ui64 perGenerationCounterStepSize, TActorId collector, const TActorContext &ctx); - void OnEvEraseCollect(const TActorContext &ctx); void OnEvCompleteGC(); - void OnEvPartialCompleteGC(TEvKeyValue::TEvPartialCompleteGC *ev); void Reply(THolder<TIntermediate> &intermediate, const TActorContext &ctx, const TTabletStorageInfo *info); @@ -434,7 +423,7 @@ public: const TTabletStorageInfo* /*info*/); void Step(); - TLogoBlobID AllocateLogoBlobId(ui32 size, ui32 storageChannelIdx); + TLogoBlobID AllocateLogoBlobId(ui32 size, ui32 storageChannelIdx, ui64 requestUid); TIntrusivePtr<TCollectOperation>& GetCollectOperation() { return CollectOperation; } @@ -463,6 +452,7 @@ public: void OnRequestComplete(ui64 requestUid, ui64 generation, ui64 step, const TActorContext &ctx, const TTabletStorageInfo *info, NMsgBusProxy::EResponseStatus status, const TRequestStat &stat); + void CancelInFlight(ui64 requestUid); void OnEvIntermediate(TIntermediate &intermediate, const TActorContext &ctx); void OnEvRequest(TEvKeyValue::TEvRequest::TPtr &ev, const TActorContext &ctx, const TTabletStorageInfo *info); diff --git a/ydb/core/keyvalue/keyvalue_state_collect.cpp b/ydb/core/keyvalue/keyvalue_state_collect.cpp index 9ca39f19fd1..18f111ea04c 100644 --- a/ydb/core/keyvalue/keyvalue_state_collect.cpp +++ b/ydb/core/keyvalue/keyvalue_state_collect.cpp @@ -7,7 +7,7 @@ namespace NKeyValue { void TKeyValueState::PrepareCollectIfNeeded(const TActorContext &ctx) { LOG_TRACE_S(ctx, NKikimrServices::KEYVALUE, "PrepareCollectIfNeeded KeyValue# " << TabletId << " Marker# KV61"); - if (CollectOperation.Get() || InitialCollectsSent) { + if (IsCollectEventSent || InitialCollectsSent) { // We already are trying to collect something, just pass this time. return; } @@ -25,22 +25,27 @@ void TKeyValueState::PrepareCollectIfNeeded(const TActorContext &ctx) { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // derive new collect step for this operation //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - ui64 collectNeededStep = 0; - if (maxId.Generation() == ExecutorGeneration) { - collectNeededStep = maxId.Step(); // collect up to this maximum blob - } else { - // step 0 by default provides collection of all trash blobs up to maxId as the generation is less than current one - Y_VERIFY(maxId.Generation() < ExecutorGeneration); + THelpers::TGenerationStep inflightGenStep(Max<ui32>(), Max<ui32>()); + if (InFlightForStep) { + const auto& [step, _] = *InFlightForStep.begin(); + Y_VERIFY(step); + inflightGenStep = THelpers::TGenerationStep(ExecutorGeneration, step - 1); } + const auto storedGenStep = THelpers::TGenerationStep(StoredState.GetCollectGeneration(), StoredState.GetCollectStep()); + const auto collectGenStep = Min(inflightGenStep, Max(storedGenStep, THelpers::GenerationStep(maxId))); + Y_VERIFY(THelpers::TGenerationStep(ExecutorGeneration, 0) <= collectGenStep); + Y_VERIFY(storedGenStep <= collectGenStep); - // Don't let the CollectGeneration step backwards - if (StoredState.GetCollectGeneration() == ExecutorGeneration && collectNeededStep < StoredState.GetCollectStep()) { - collectNeededStep = StoredState.GetCollectStep(); + // check if it is useful to start any collection + const TLogoBlobID minTrashId = *Trash.begin(); + if (collectGenStep < THelpers::GenerationStep(minTrashId)) { + return; // we do not have the opportunity to collect anything here with this allowed barrier } // create basic collect operation with zero keep/doNotKeep flag vectors; they will be calculated just before sending - CollectOperation.Reset(new TCollectOperation(ExecutorGeneration, collectNeededStep, {} /* keep */, {} /* doNoKeep */)); - if (collectNeededStep == NextLogoBlobStep) { + CollectOperation.Reset(new TCollectOperation(std::get<0>(collectGenStep), std::get<1>(collectGenStep), + {} /* keep */, {} /* doNoKeep */, {} /* trashGoingToCollect */)); + if (std::get<1>(collectGenStep) == NextLogoBlobStep) { // advance to the next step if we are going to collect everything up to current one; otherwise we can keep // current step Step(); @@ -49,48 +54,26 @@ void TKeyValueState::PrepareCollectIfNeeded(const TActorContext &ctx) { StartCollectingIfPossible(ctx); } +bool TKeyValueState::RemoveCollectedTrash(ISimpleDb &db, const TActorContext &ctx) { + if (auto& trash = CollectOperation->TrashGoingToCollect) { + ui32 collected = 0; + for (ui32 maxItemsToStore = 10'000; trash && maxItemsToStore; trash.pop_back(), --maxItemsToStore) { + const TLogoBlobID& id = trash.back(); + THelpers::DbEraseTrash(id, db, ctx); + ui32 num = Trash.erase(id); + Y_VERIFY(num == 1); + CountTrashCollected(id.BlobSize()); + ++collected; + } -void TKeyValueState::RemoveFromTrashDoNotKeep(ISimpleDb &db, const TActorContext &ctx, - const TVector<TLogoBlobID> &collectedDoNotKeep) -{ - for (const TLogoBlobID &id : collectedDoNotKeep) { - THelpers::DbEraseTrash(id, db, ctx); - ui32 num = Trash.erase(id); - Y_VERIFY(num == 1); - CountTrashCollected(id.BlobSize()); - } - STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC24, "Remove from Trash", - (TabletId, TabletId), - (RemovedCount, collectedDoNotKeep.size()), - (TrashCount, Trash.size())); -} + STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC24, "Remove from Trash", + (TabletId, TabletId), (RemovedCount, collected), (TrashCount, Trash.size())); -void TKeyValueState::RemoveFromTrashBySoftBarrier(ISimpleDb &db, const TActorContext &ctx, - const NKeyValue::THelpers::TGenerationStep &genStep) -{ - ui64 storedCollectGeneration = StoredState.GetCollectGeneration(); - ui64 storedCollectStep = StoredState.GetCollectStep(); - - // remove trash entries that were not marked as 'Keep' before, but which are automatically deleted by this barrier - // to prevent them from being added to 'DoNotKeep' list after - ui32 counter = 0; - for (auto it = Trash.begin(); it != Trash.end(); ) { - THelpers::TGenerationStep trashGenStep = THelpers::GenerationStep(*it); - bool afterStoredSoftBarrier = trashGenStep > THelpers::TGenerationStep(storedCollectGeneration, storedCollectStep); - bool beforeSoftBarrier = trashGenStep <= genStep; - if (afterStoredSoftBarrier && beforeSoftBarrier) { - CountTrashCollected(it->BlobSize()); - THelpers::DbEraseTrash(*it, db, ctx); - it = Trash.erase(it); - if (++counter >= MaxCollectGarbageFlagsPerMessage) { - RepeatGCTX = true; - break; - } - } else { - ++it; - } + return trash.empty(); } + + return true; } void TKeyValueState::UpdateStoredState(ISimpleDb &db, const TActorContext &ctx, @@ -101,90 +84,15 @@ void TKeyValueState::UpdateStoredState(ISimpleDb &db, const TActorContext &ctx, THelpers::DbUpdateState(StoredState, db, ctx); } -void TKeyValueState::UpdateAfterPartialGC(ISimpleDb &db, const TActorContext &ctx) { - RemoveFromTrashDoNotKeep(db, ctx, PartialCollectedDoNotKeep); -} - -void TKeyValueState::UpdateGC(ISimpleDb &db, const TActorContext &ctx, bool updateTrash, bool updateState) { - if (IsDamaged) { - return; - } - Y_VERIFY(CollectOperation); - - ui64 collectGeneration = CollectOperation->Header.GetCollectGeneration(); - ui64 collectStep = CollectOperation->Header.GetCollectStep(); - auto collectGenStep = THelpers::TGenerationStep(collectGeneration, collectStep); - - if (updateTrash) { - for (TLogoBlobID &id: CollectOperation->DoNotKeep) { - THelpers::DbEraseTrash(id, db, ctx); - ui32 num = Trash.erase(id); - Y_VERIFY(num == 1); - CountTrashCollected(id.BlobSize()); - } - - RemoveFromTrashBySoftBarrier(db, ctx, collectGenStep); - } - - if (updateState) { +void TKeyValueState::CompleteGCExecute(ISimpleDb &db, const TActorContext &ctx) { + if (RemoveCollectedTrash(db, ctx)) { + const ui32 collectGeneration = CollectOperation->Header.GetCollectGeneration(); + const ui32 collectStep = CollectOperation->Header.GetCollectStep(); + auto collectGenStep = THelpers::TGenerationStep(collectGeneration, collectStep); UpdateStoredState(db, ctx, collectGenStep); + } else { + RepeatGCTX = true; } - -} - -void TKeyValueState::StoreCollectExecute(ISimpleDb &db, const TActorContext &ctx) { - LOG_TRACE_S(ctx, NKikimrServices::KEYVALUE, "StoreCollectExecute KeyValue# " << TabletId - << " IsDamaged# " << IsDamaged << " Marker# KV62"); - - if (IsDamaged) { - return; - } - Y_VERIFY(CollectOperation.Get()); - - // This operation will be executed no matter what - const ui32 collectGen = CollectOperation->Header.GetCollectGeneration(); - const ui32 collectStep = CollectOperation->Header.GetCollectStep(); - THelpers::DbUpdateCollect(collectGen, - collectStep, - CollectOperation->Keep, - CollectOperation->DoNotKeep, - db, ctx); - - UpdateGC(db, ctx, true, false); -} - -void TKeyValueState::StoreCollectComplete(const TActorContext &ctx) { - ctx.Send(KeyValueActorId, new TEvKeyValue::TEvCollect()); -} - -void TKeyValueState::EraseCollectExecute(ISimpleDb &db, const TActorContext &ctx) { - LOG_TRACE_S(ctx, NKikimrServices::KEYVALUE, "EraseCollectExecute KeyValue# " << TabletId - << " IsDamaged# " << IsDamaged << " Marker# KV63"); - if (IsDamaged) { - return; - } - Y_VERIFY(CollectOperation); - // Erase the collect operation - THelpers::DbEraseCollect(db, ctx); - // Update the state - UpdateGC(db, ctx, false, true); -} - -void TKeyValueState::EraseCollectComplete(const TActorContext &ctx) { - Y_VERIFY(CollectOperation); - CollectOperation.Reset(nullptr); - IsCollectEventSent = false; - - // Start new collect operation if needed - PrepareCollectIfNeeded(ctx); -} - -void TKeyValueState::CompleteGCExecute(ISimpleDb &db, const TActorContext &ctx) { - ui64 collectGeneration = CollectOperation->Header.GetCollectGeneration(); - ui64 collectStep = CollectOperation->Header.GetCollectStep(); - auto collectGenStep = THelpers::TGenerationStep(collectGeneration, collectStep); - RemoveFromTrashBySoftBarrier(db, ctx, collectGenStep); - UpdateStoredState(db, ctx, collectGenStep); } void TKeyValueState::CompleteGCComplete(const TActorContext &ctx) { @@ -205,35 +113,21 @@ void TKeyValueState::CompleteGCComplete(const TActorContext &ctx) { PrepareCollectIfNeeded(ctx); } - -void TKeyValueState::PartialCompleteGCExecute(ISimpleDb &db, const TActorContext &ctx) { - UpdateAfterPartialGC(db, ctx); -} - -void TKeyValueState::PartialCompleteGCComplete(const TActorContext &ctx) { - PartialCollectedDoNotKeep.clear(); - STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC23, "Send ContinueGC", - (TabletId, TabletId), - (TrashCount, Trash.size())); - ctx.Send(CollectorActorId, new TEvKeyValue::TEvContinueGC(std::move(PartialCollectedDoNotKeep))); -} - -// Prepare the completely new full collect operation with the same gen/step, but with correct keep & doNotKeep lists -void TKeyValueState::SendStoreCollect(const TActorContext &ctx, const THelpers::TGenerationStep &genStep, - TVector<TLogoBlobID> &keep, TVector<TLogoBlobID> &doNotKeep) { - ui32 generation, step; - std::tie(generation, step) = genStep; - CollectOperation.Reset(new TCollectOperation(generation, step, std::move(keep), std::move(doNotKeep))); - ctx.Send(KeyValueActorId, new TEvKeyValue::TEvStoreCollect()); -} - -void TKeyValueState::StartGC(const TActorContext &ctx, const THelpers::TGenerationStep &genStep, - TVector<TLogoBlobID> &keep, TVector<TLogoBlobID> &doNotKeep) -{ - ui32 generation, step; - std::tie(generation, step) = genStep; - CollectOperation.Reset(new TCollectOperation(generation, step, std::move(keep), std::move(doNotKeep))); +void TKeyValueState::StartGC(const TActorContext &ctx, TVector<TLogoBlobID> &keep, TVector<TLogoBlobID> &doNotKeep, + TVector<TLogoBlobID>& trashGoingToCollect) { + // ensure we haven't filled these fields yet + Y_VERIFY(CollectOperation); + Y_VERIFY(!CollectOperation->Keep); + Y_VERIFY(!CollectOperation->DoNotKeep); + Y_VERIFY(!CollectOperation->TrashGoingToCollect); + // fill in correct values + CollectOperation->Keep = std::move(keep); + CollectOperation->DoNotKeep = std::move(doNotKeep); + CollectOperation->TrashGoingToCollect = std::move(trashGoingToCollect); + // issue command to collector ctx.Send(KeyValueActorId, new TEvKeyValue::TEvCollect()); + Y_VERIFY(!IsCollectEventSent); + IsCollectEventSent = true; } void TKeyValueState::StartCollectingIfPossible(const TActorContext &ctx) { @@ -241,9 +135,7 @@ void TKeyValueState::StartCollectingIfPossible(const TActorContext &ctx) { << " IsCollectEventSent# " << IsCollectEventSent << " Marker# KV64"); // there is nothing to collect yet, or the event was already sent - if (!CollectOperation || IsCollectEventSent) { - return; - } + Y_VERIFY(CollectOperation && !IsCollectEventSent); // create generation:step barrier tuple for proposed garbage collection command const auto &header = CollectOperation->Header; @@ -251,19 +143,10 @@ void TKeyValueState::StartCollectingIfPossible(const TActorContext &ctx) { // if we have some in flight writes, check if they do not overlap with the new barrier if (InFlightForStep) { - // get the first item from the map with the least value; its key contains step of blob being written - auto leastWriteIt = InFlightForStep.begin(); - // construct generation:step pair of the oldest in flight blob - auto inFlightGenStep = THelpers::TGenerationStep(ExecutorGeneration, leastWriteIt->first); - // check if the new barrier would delete blob being written, in this case hold with the collect operation - if (inFlightGenStep <= collectGenStep) { - return; - } + const auto& [step, _] = *InFlightForStep.begin(); + Y_VERIFY(collectGenStep < THelpers::TGenerationStep(ExecutorGeneration, step)); } - Y_VERIFY(!IsCollectEventSent); - IsCollectEventSent = true; - // create stored (previously issued) generation:step barrier as a tuple auto storedGenStep = THelpers::TGenerationStep(StoredState.GetCollectGeneration(), StoredState.GetCollectStep()); @@ -283,18 +166,30 @@ void TKeyValueState::StartCollectingIfPossible(const TActorContext &ctx) { // create list of blobs that must have to DoNotKeep flag set; these blobs must have Keep flag written and reside in // Trash now TVector<TLogoBlobID> doNotKeep; + TVector<TLogoBlobID> trashGoingToCollect; doNotKeep.reserve(Trash.size()); - for (const TLogoBlobID &id : Trash) { - if (THelpers::GenerationStep(id) <= storedGenStep) { + trashGoingToCollect.reserve(Trash.size()); + + for (const TLogoBlobID& id : Trash) { + auto genStep = THelpers::GenerationStep(id); + if (collectGenStep < genStep) { + break; + } + if (genStep <= storedGenStep || id.Generation() < ExecutorGeneration) { // assume Keep flag was issued to these blobs doNotKeep.push_back(id); } + Y_VERIFY(genStep <= collectGenStep); + trashGoingToCollect.push_back(id); } doNotKeep.shrink_to_fit(); + trashGoingToCollect.shrink_to_fit(); + + Y_VERIFY(trashGoingToCollect); LOG_TRACE_S(ctx, NKikimrServices::KEYVALUE, "StartCollectingIfPossible KeyValue# " << TabletId << "Flags Keep.Size# " << keep.size() << " DoNotKeep.Size# " << doNotKeep.size() << " Marker# KV65"); - StartGC(ctx, collectGenStep, keep, doNotKeep); + StartGC(ctx, keep, doNotKeep, trashGoingToCollect); } ui64 TKeyValueState::OnEvCollect(const TActorContext &ctx) { @@ -317,18 +212,9 @@ void TKeyValueState::OnEvCollectDone(ui64 perGenerationCounterStepSize, TActorId CollectorActorId = collector; } -void TKeyValueState::OnEvEraseCollect(const TActorContext &ctx) { - Y_UNUSED(ctx); - CountLatencyBsCollect(); -} - void TKeyValueState::OnEvCompleteGC() { CountLatencyBsCollect(); } -void TKeyValueState::OnEvPartialCompleteGC(TEvKeyValue::TEvPartialCompleteGC *ev) { - PartialCollectedDoNotKeep = std::move(ev->CollectedDoNotKeep); -} - } // NKeyValue } // NKikimr diff --git a/ydb/core/keyvalue/keyvalue_ut.cpp b/ydb/core/keyvalue/keyvalue_ut.cpp index a8babc46c12..2f2d8118f08 100644 --- a/ydb/core/keyvalue/keyvalue_ut.cpp +++ b/ydb/core/keyvalue/keyvalue_ut.cpp @@ -1125,9 +1125,6 @@ Y_UNIT_TEST(TestWriteReadDeleteWithRestartsAndCatchCollectGarbageEventsWithSlowI savedInitialEvents.pop(); } - TDispatchOptions options3; - options3.FinalEvents.push_back(TEvKeyValue::TEvEraseCollect::EventType); - TestLog("Third dispatch ", collectStep); UNIT_ASSERT_VALUES_EQUAL(collectStep, 2); |