aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikolay Shestakov <tesseract@ydb.tech>2025-03-31 22:49:08 +0500
committerGitHub <noreply@github.com>2025-03-31 20:49:08 +0300
commitba56e6037c80d57513c9acc90f517e7c0e44ce4c (patch)
treef64280dd6321851b4f6670e82d04c1a06598fdcb
parent4534474a990eb9d15d21b8bbb528a47f3ce809bb (diff)
downloadydb-ba56e6037c80d57513c9acc90f517e7c0e44ce4c.tar.gz
Complex messages expiration in the topic (#16413)
-rw-r--r--ydb/core/persqueue/partition.cpp15
-rw-r--r--ydb/core/persqueue/ut/utils_ut.cpp33
-rw-r--r--ydb/core/persqueue/utils.cpp7
-rw-r--r--ydb/core/protos/pqconfig.proto7
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp3
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp3
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp3
7 files changed, 61 insertions, 10 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index 6ca8e04621..b3907018ba 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -429,6 +429,7 @@ bool TPartition::CleanUpBlobs(TEvKeyValue::TEvRequest *request, const TActorCont
const TDuration lifetimeLimit{TDuration::Seconds(partConfig.GetLifetimeSeconds())};
const bool hasStorageLimit = partConfig.HasStorageLimitBytes();
+ const auto hasLifetime = !hasStorageLimit || (partConfig.HasLifetimeSeconds() && partConfig.GetLifetimeSeconds() > 0);
const auto now = ctx.Now();
const ui64 importantConsumerMinOffset = ImportantClientsMinOffset();
@@ -446,15 +447,11 @@ bool TPartition::CleanUpBlobs(TEvKeyValue::TEvRequest *request, const TActorCont
}
auto& firstKey = DataKeysBody.front();
- if (hasStorageLimit) {
- const auto bodySize = BodySize - firstKey.Size;
- if (bodySize < partConfig.GetStorageLimitBytes()) {
- break;
- }
- } else {
- if (now < firstKey.Timestamp + lifetimeLimit) {
- break;
- }
+
+ auto expiredByLifetime = hasLifetime && now >= firstKey.Timestamp + lifetimeLimit;
+ auto expiredByStorageLimit = hasStorageLimit && BodySize > firstKey.Size && (BodySize - firstKey.Size) >= partConfig.GetStorageLimitBytes();
+ if (!expiredByLifetime && !expiredByStorageLimit) {
+ break;
}
BodySize -= firstKey.Size;
diff --git a/ydb/core/persqueue/ut/utils_ut.cpp b/ydb/core/persqueue/ut/utils_ut.cpp
index 85513ea70a..dc064c02c6 100644
--- a/ydb/core/persqueue/ut/utils_ut.cpp
+++ b/ydb/core/persqueue/ut/utils_ut.cpp
@@ -76,6 +76,39 @@ Y_UNIT_TEST_SUITE(TPQUtilsTest) {
UNIT_ASSERT_VALUES_EQUAL(r, 2);
}
}
+
+ Y_UNIT_TEST(Migration_Lifetime) {
+ {
+ NKikimrPQ::TPQTabletConfig config;
+ config.MutablePartitionConfig()->SetLifetimeSeconds(123);
+ NKikimr::NPQ::Migrate(config);
+
+ UNIT_ASSERT_VALUES_EQUAL(true, config.GetMigrations().GetLifetime());
+ UNIT_ASSERT_VALUES_EQUAL(false, config.GetPartitionConfig().HasStorageLimitBytes());
+ UNIT_ASSERT_VALUES_EQUAL(123, config.GetPartitionConfig().GetLifetimeSeconds());
+ }
+ {
+ NKikimrPQ::TPQTabletConfig config;
+ config.MutablePartitionConfig()->SetLifetimeSeconds(123);
+ config.MutablePartitionConfig()->SetStorageLimitBytes(456);
+ NKikimr::NPQ::Migrate(config);
+
+ UNIT_ASSERT_VALUES_EQUAL(true, config.GetMigrations().GetLifetime());
+ UNIT_ASSERT_VALUES_EQUAL(456, config.GetPartitionConfig().GetStorageLimitBytes());
+ UNIT_ASSERT_VALUES_EQUAL(TDuration::Days(3650).Seconds(), config.GetPartitionConfig().GetLifetimeSeconds());
+ }
+ {
+ NKikimrPQ::TPQTabletConfig config;
+ config.MutableMigrations()->SetLifetime(true);
+ config.MutablePartitionConfig()->SetLifetimeSeconds(123);
+ config.MutablePartitionConfig()->SetStorageLimitBytes(456);
+ NKikimr::NPQ::Migrate(config);
+
+ UNIT_ASSERT_VALUES_EQUAL(true, config.GetMigrations().GetLifetime());
+ UNIT_ASSERT_VALUES_EQUAL(456, config.GetPartitionConfig().GetStorageLimitBytes());
+ UNIT_ASSERT_VALUES_EQUAL(123, config.GetPartitionConfig().GetLifetimeSeconds());
+ }
+ }
}
}
diff --git a/ydb/core/persqueue/utils.cpp b/ydb/core/persqueue/utils.cpp
index 7f7ca3d81e..7367fc471c 100644
--- a/ydb/core/persqueue/utils.cpp
+++ b/ydb/core/persqueue/utils.cpp
@@ -116,6 +116,13 @@ void Migrate(NKikimrPQ::TPQTabletConfig& config) {
config.AddAllPartitions()->CopyFrom(partition);
}
}
+
+ if (!config.GetMigrations().GetLifetime()) {
+ if (config.GetPartitionConfig().HasStorageLimitBytes()) {
+ config.MutablePartitionConfig()->SetLifetimeSeconds(TDuration::Days(3650).Seconds());
+ }
+ }
+ config.MutableMigrations()->SetLifetime(true);
}
bool HasConsumer(const NKikimrPQ::TPQTabletConfig& config, const TString& consumerName) {
diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto
index b2f9d70972..d64aeb45f9 100644
--- a/ydb/core/protos/pqconfig.proto
+++ b/ydb/core/protos/pqconfig.proto
@@ -421,7 +421,7 @@ message TPQTabletConfig {
optional uint32 MinPartitionCount = 1 [default = 1];
// The maximum number of partitions that will be supported by the strategy. The strategy will not create partitions if the specified
// amount is reached, even if the load exceeds the current capabilities of the topic.
- optional uint32 MaxPartitionCount = 2 [default = 1];;
+ optional uint32 MaxPartitionCount = 2 [default = 1];
optional uint32 ScaleThresholdSeconds = 3 [default = 300];
optional uint32 ScaleUpPartitionWriteSpeedThresholdPercent = 4 [default = 80];
optional uint32 ScaleDownPartitionWriteSpeedThresholdPercent = 5 [default = 20];
@@ -433,6 +433,11 @@ message TPQTabletConfig {
repeated TPartition AllPartitions = 36; // filled by schemeshard
optional TOffloadConfig OffloadConfig = 38;
+
+ message TMigrations {
+ optional bool Lifetime = 1 [default = false];
+ }
+ optional TMigrations Migrations = 39;
}
message THeartbeat {
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp
index c6dd1aa23a..a2487e7f95 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp
@@ -217,6 +217,8 @@ public:
TPath::Resolve(alterConfig.GetOffloadConfig().GetIncrementalBackup().GetDstPath(), context.SS).Base()->PathId.ToProto(pathId);
}
+ alterConfig.MutableMigrations()->CopyFrom(tabletConfig->GetMigrations());
+
alterConfig.MutablePartitionKeySchema()->Swap(tabletConfig->MutablePartitionKeySchema());
Y_PROTOBUF_SUPPRESS_NODISCARD alterConfig.SerializeToString(&params->TabletConfig);
alterConfig.Swap(tabletConfig);
@@ -557,6 +559,7 @@ public:
}
NKikimrPQ::TPQTabletConfig tabletConfig = topic->GetTabletConfig();
+ NKikimr::NPQ::Migrate(tabletConfig);
NKikimrPQ::TPQTabletConfig newTabletConfig = tabletConfig;
TTopicInfo::TPtr alterData = ParseParams(context, &newTabletConfig, alter, errStr);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp
index c472b8adf5..ba4dd86c52 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp
@@ -168,6 +168,9 @@ TTopicInfo::TPtr CreatePersQueueGroup(TOperationContext& context,
}
NKikimrPQ::TPQTabletConfig tabletConfig = op.GetPQTabletConfig();
+
+ tabletConfig.MutableMigrations()->SetLifetime(true);
+
tabletConfig.ClearPartitionIds();
tabletConfig.ClearPartitions();
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index 322c61ea17..c92883fea2 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -5428,6 +5428,9 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
Version: 567
Important: true
}
+ Migrations {
+ Lifetime: true
+ }
}
ErrorCode: OK
}