diff options
| -rw-r--r-- | ydb/core/persqueue/partition.cpp | 50 | ||||
| -rw-r--r-- | ydb/core/persqueue/partition.h | 4 | ||||
| -rw-r--r-- | ydb/core/persqueue/partition_write.cpp | 2 |
3 files changed, 10 insertions, 46 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index c6b5b52b00c..81647207078 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -244,7 +244,7 @@ void TPartition::HandleWakeup(const TActorContext& ctx) { WriteTimestampEstimate = now; THolder <TEvKeyValue::TEvRequest> request = MakeHolder<TEvKeyValue::TEvRequest>(); - bool haveChanges = CleanUp(request.Get(), false, ctx); + bool haveChanges = CleanUp(request.Get(), ctx); if (DiskIsFull) { AddCheckDiskRequest(request.Get(), Config.GetPartitionConfig().GetNumChannels()); haveChanges = true; @@ -280,8 +280,8 @@ void TPartition::AddMetaKey(TEvKeyValue::TEvRequest* request) { } -bool TPartition::CleanUp(TEvKeyValue::TEvRequest* request, bool hasWrites, const TActorContext& ctx) { - bool haveChanges = CleanUpBlobs(request, hasWrites, ctx); +bool TPartition::CleanUp(TEvKeyValue::TEvRequest* request, const TActorContext& ctx) { + bool haveChanges = CleanUpBlobs(request, ctx); LOG_DEBUG(ctx, NKikimrServices::PERSQUEUE, TStringBuilder() << "Have " << request->Record.CmdDeleteRangeSize() << " items to delete old stuff"); @@ -297,7 +297,7 @@ bool TPartition::CleanUp(TEvKeyValue::TEvRequest* request, bool hasWrites, const return haveChanges; } -bool TPartition::CleanUpBlobs(TEvKeyValue::TEvRequest *request, bool hasWrites, const TActorContext& ctx) { +bool TPartition::CleanUpBlobs(TEvKeyValue::TEvRequest *request, const TActorContext& ctx) { if (StartOffset == EndOffset || DataKeysBody.size() <= 1) return false; @@ -350,53 +350,17 @@ bool TPartition::CleanUpBlobs(TEvKeyValue::TEvRequest *request, bool hasWrites, } } - TDataKey lastKey = HeadKeys.empty() ? DataKeysBody.back() : HeadKeys.back(); - - if (!hasWrites && - ctx.Now() >= lastKey.Timestamp + lifetimeLimit && - minOffset == EndOffset && - false) { // disable drop of all data - Y_VERIFY(!HeadKeys.empty() || !DataKeysBody.empty()); - - Y_VERIFY(CompactedKeys.empty()); - Y_VERIFY(NewHead.PackedSize == 0); - Y_VERIFY(NewHeadKey.Size == 0); - - Y_VERIFY(EndOffset == Head.GetNextOffset()); - Y_VERIFY(EndOffset == NewHead.GetNextOffset() || NewHead.GetNextOffset() == 0); - - hasDrop = true; - - BodySize = 0; - DataKeysBody.clear(); - GapSize = 0; - GapOffsets.clear(); - - for (ui32 i = 0; i < TotalLevels; ++i) { - DataKeysHead[i].Clear(); - } - HeadKeys.clear(); - Head.Clear(); - Head.Offset = EndOffset; - NewHead.Clear(); - NewHead.Offset = EndOffset; - endOffset = EndOffset; - } else { - if (hasDrop) { - lastKey = DataKeysBody.front(); - } - } - if (!hasDrop) return false; StartOffset = endOffset; - TKey key(TKeyPrefix::TypeData, Partition, 0, 0, 0, 0); //will drop all that could not be dropped before of case of full disks + TKey firstKey(TKeyPrefix::TypeData, Partition, 0, 0, 0, 0); //will drop all that could not be dropped before of case of full disks + const TDataKey& lastKey = DataKeysBody.front(); auto del = request->Record.AddCmdDeleteRange(); auto range = del->MutableRange(); - range->SetFrom(key.Data(), key.Size()); + range->SetFrom(firstKey.Data(), firstKey.Size()); range->SetIncludeFrom(true); range->SetTo(lastKey.Key.Data(), lastKey.Key.Size()); range->SetIncludeTo(StartOffset == EndOffset); diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index 544ef7d6f06..4aab43956ad 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -196,8 +196,8 @@ private: TInstant GetWriteTimeEstimate(ui64 offset) const; bool AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx, TSourceIdWriter& sourceIdWriter); - bool CleanUp(TEvKeyValue::TEvRequest* request, bool hasWrites, const TActorContext& ctx); - bool CleanUpBlobs(TEvKeyValue::TEvRequest *request, bool hasWrites, const TActorContext& ctx); + bool CleanUp(TEvKeyValue::TEvRequest* request, const TActorContext& ctx); + bool CleanUpBlobs(TEvKeyValue::TEvRequest *request, const TActorContext& ctx); bool IsQuotingEnabled() const; bool ProcessWrites(TEvKeyValue::TEvRequest* request, TInstant now, const TActorContext& ctx); bool WaitingForPreviousBlobQuota() const; diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp index 9d73e0ba85c..a8b540a88ad 100644 --- a/ydb/core/persqueue/partition_write.cpp +++ b/ydb/core/persqueue/partition_write.cpp @@ -1350,7 +1350,7 @@ void TPartition::HandleWrites(const TActorContext& ctx) { } else { haveData = ProcessWrites(request.Get(), now, ctx); } - bool haveDrop = CleanUp(request.Get(), haveData, ctx); + bool haveDrop = CleanUp(request.Get(), ctx); ProcessReserveRequests(ctx); if (!haveData && !haveDrop && !haveCheckDisk) { //no data writed/deleted |
