summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/core/persqueue/partition.cpp50
-rw-r--r--ydb/core/persqueue/partition.h4
-rw-r--r--ydb/core/persqueue/partition_write.cpp2
3 files changed, 10 insertions, 46 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index c6b5b52b00c..81647207078 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -244,7 +244,7 @@ void TPartition::HandleWakeup(const TActorContext& ctx) {
WriteTimestampEstimate = now;
THolder <TEvKeyValue::TEvRequest> request = MakeHolder<TEvKeyValue::TEvRequest>();
- bool haveChanges = CleanUp(request.Get(), false, ctx);
+ bool haveChanges = CleanUp(request.Get(), ctx);
if (DiskIsFull) {
AddCheckDiskRequest(request.Get(), Config.GetPartitionConfig().GetNumChannels());
haveChanges = true;
@@ -280,8 +280,8 @@ void TPartition::AddMetaKey(TEvKeyValue::TEvRequest* request) {
}
-bool TPartition::CleanUp(TEvKeyValue::TEvRequest* request, bool hasWrites, const TActorContext& ctx) {
- bool haveChanges = CleanUpBlobs(request, hasWrites, ctx);
+bool TPartition::CleanUp(TEvKeyValue::TEvRequest* request, const TActorContext& ctx) {
+ bool haveChanges = CleanUpBlobs(request, ctx);
LOG_DEBUG(ctx, NKikimrServices::PERSQUEUE, TStringBuilder() << "Have " <<
request->Record.CmdDeleteRangeSize() << " items to delete old stuff");
@@ -297,7 +297,7 @@ bool TPartition::CleanUp(TEvKeyValue::TEvRequest* request, bool hasWrites, const
return haveChanges;
}
-bool TPartition::CleanUpBlobs(TEvKeyValue::TEvRequest *request, bool hasWrites, const TActorContext& ctx) {
+bool TPartition::CleanUpBlobs(TEvKeyValue::TEvRequest *request, const TActorContext& ctx) {
if (StartOffset == EndOffset || DataKeysBody.size() <= 1)
return false;
@@ -350,53 +350,17 @@ bool TPartition::CleanUpBlobs(TEvKeyValue::TEvRequest *request, bool hasWrites,
}
}
- TDataKey lastKey = HeadKeys.empty() ? DataKeysBody.back() : HeadKeys.back();
-
- if (!hasWrites &&
- ctx.Now() >= lastKey.Timestamp + lifetimeLimit &&
- minOffset == EndOffset &&
- false) { // disable drop of all data
- Y_VERIFY(!HeadKeys.empty() || !DataKeysBody.empty());
-
- Y_VERIFY(CompactedKeys.empty());
- Y_VERIFY(NewHead.PackedSize == 0);
- Y_VERIFY(NewHeadKey.Size == 0);
-
- Y_VERIFY(EndOffset == Head.GetNextOffset());
- Y_VERIFY(EndOffset == NewHead.GetNextOffset() || NewHead.GetNextOffset() == 0);
-
- hasDrop = true;
-
- BodySize = 0;
- DataKeysBody.clear();
- GapSize = 0;
- GapOffsets.clear();
-
- for (ui32 i = 0; i < TotalLevels; ++i) {
- DataKeysHead[i].Clear();
- }
- HeadKeys.clear();
- Head.Clear();
- Head.Offset = EndOffset;
- NewHead.Clear();
- NewHead.Offset = EndOffset;
- endOffset = EndOffset;
- } else {
- if (hasDrop) {
- lastKey = DataKeysBody.front();
- }
- }
-
if (!hasDrop)
return false;
StartOffset = endOffset;
- TKey key(TKeyPrefix::TypeData, Partition, 0, 0, 0, 0); //will drop all that could not be dropped before of case of full disks
+ TKey firstKey(TKeyPrefix::TypeData, Partition, 0, 0, 0, 0); //will drop all that could not be dropped before of case of full disks
+ const TDataKey& lastKey = DataKeysBody.front();
auto del = request->Record.AddCmdDeleteRange();
auto range = del->MutableRange();
- range->SetFrom(key.Data(), key.Size());
+ range->SetFrom(firstKey.Data(), firstKey.Size());
range->SetIncludeFrom(true);
range->SetTo(lastKey.Key.Data(), lastKey.Key.Size());
range->SetIncludeTo(StartOffset == EndOffset);
diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h
index 544ef7d6f06..4aab43956ad 100644
--- a/ydb/core/persqueue/partition.h
+++ b/ydb/core/persqueue/partition.h
@@ -196,8 +196,8 @@ private:
TInstant GetWriteTimeEstimate(ui64 offset) const;
bool AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx, TSourceIdWriter& sourceIdWriter);
- bool CleanUp(TEvKeyValue::TEvRequest* request, bool hasWrites, const TActorContext& ctx);
- bool CleanUpBlobs(TEvKeyValue::TEvRequest *request, bool hasWrites, const TActorContext& ctx);
+ bool CleanUp(TEvKeyValue::TEvRequest* request, const TActorContext& ctx);
+ bool CleanUpBlobs(TEvKeyValue::TEvRequest *request, const TActorContext& ctx);
bool IsQuotingEnabled() const;
bool ProcessWrites(TEvKeyValue::TEvRequest* request, TInstant now, const TActorContext& ctx);
bool WaitingForPreviousBlobQuota() const;
diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp
index 9d73e0ba85c..a8b540a88ad 100644
--- a/ydb/core/persqueue/partition_write.cpp
+++ b/ydb/core/persqueue/partition_write.cpp
@@ -1350,7 +1350,7 @@ void TPartition::HandleWrites(const TActorContext& ctx) {
} else {
haveData = ProcessWrites(request.Get(), now, ctx);
}
- bool haveDrop = CleanUp(request.Get(), haveData, ctx);
+ bool haveDrop = CleanUp(request.Get(), ctx);
ProcessReserveRequests(ctx);
if (!haveData && !haveDrop && !haveCheckDisk) { //no data writed/deleted