diff options
author | Nikolay Shestakov <tesseract@ydb.tech> | 2025-03-31 22:49:08 +0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-03-31 20:49:08 +0300 |
commit | ba56e6037c80d57513c9acc90f517e7c0e44ce4c (patch) | |
tree | f64280dd6321851b4f6670e82d04c1a06598fdcb | |
parent | 4534474a990eb9d15d21b8bbb528a47f3ce809bb (diff) | |
download | ydb-ba56e6037c80d57513c9acc90f517e7c0e44ce4c.tar.gz |
Complex messages expiration in the topic (#16413)
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 15 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/utils_ut.cpp | 33 | ||||
-rw-r--r-- | ydb/core/persqueue/utils.cpp | 7 | ||||
-rw-r--r-- | ydb/core/protos/pqconfig.proto | 7 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp | 3 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp | 3 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/persqueue_ut.cpp | 3 |
7 files changed, 61 insertions, 10 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 6ca8e04621..b3907018ba 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -429,6 +429,7 @@ bool TPartition::CleanUpBlobs(TEvKeyValue::TEvRequest *request, const TActorCont const TDuration lifetimeLimit{TDuration::Seconds(partConfig.GetLifetimeSeconds())}; const bool hasStorageLimit = partConfig.HasStorageLimitBytes(); + const auto hasLifetime = !hasStorageLimit || (partConfig.HasLifetimeSeconds() && partConfig.GetLifetimeSeconds() > 0); const auto now = ctx.Now(); const ui64 importantConsumerMinOffset = ImportantClientsMinOffset(); @@ -446,15 +447,11 @@ bool TPartition::CleanUpBlobs(TEvKeyValue::TEvRequest *request, const TActorCont } auto& firstKey = DataKeysBody.front(); - if (hasStorageLimit) { - const auto bodySize = BodySize - firstKey.Size; - if (bodySize < partConfig.GetStorageLimitBytes()) { - break; - } - } else { - if (now < firstKey.Timestamp + lifetimeLimit) { - break; - } + + auto expiredByLifetime = hasLifetime && now >= firstKey.Timestamp + lifetimeLimit; + auto expiredByStorageLimit = hasStorageLimit && BodySize > firstKey.Size && (BodySize - firstKey.Size) >= partConfig.GetStorageLimitBytes(); + if (!expiredByLifetime && !expiredByStorageLimit) { + break; } BodySize -= firstKey.Size; diff --git a/ydb/core/persqueue/ut/utils_ut.cpp b/ydb/core/persqueue/ut/utils_ut.cpp index 85513ea70a..dc064c02c6 100644 --- a/ydb/core/persqueue/ut/utils_ut.cpp +++ b/ydb/core/persqueue/ut/utils_ut.cpp @@ -76,6 +76,39 @@ Y_UNIT_TEST_SUITE(TPQUtilsTest) { UNIT_ASSERT_VALUES_EQUAL(r, 2); } } + + Y_UNIT_TEST(Migration_Lifetime) { + { + NKikimrPQ::TPQTabletConfig config; + config.MutablePartitionConfig()->SetLifetimeSeconds(123); + NKikimr::NPQ::Migrate(config); + + UNIT_ASSERT_VALUES_EQUAL(true, config.GetMigrations().GetLifetime()); + UNIT_ASSERT_VALUES_EQUAL(false, config.GetPartitionConfig().HasStorageLimitBytes()); + UNIT_ASSERT_VALUES_EQUAL(123, config.GetPartitionConfig().GetLifetimeSeconds()); + } + { + NKikimrPQ::TPQTabletConfig config; + config.MutablePartitionConfig()->SetLifetimeSeconds(123); + config.MutablePartitionConfig()->SetStorageLimitBytes(456); + NKikimr::NPQ::Migrate(config); + + UNIT_ASSERT_VALUES_EQUAL(true, config.GetMigrations().GetLifetime()); + UNIT_ASSERT_VALUES_EQUAL(456, config.GetPartitionConfig().GetStorageLimitBytes()); + UNIT_ASSERT_VALUES_EQUAL(TDuration::Days(3650).Seconds(), config.GetPartitionConfig().GetLifetimeSeconds()); + } + { + NKikimrPQ::TPQTabletConfig config; + config.MutableMigrations()->SetLifetime(true); + config.MutablePartitionConfig()->SetLifetimeSeconds(123); + config.MutablePartitionConfig()->SetStorageLimitBytes(456); + NKikimr::NPQ::Migrate(config); + + UNIT_ASSERT_VALUES_EQUAL(true, config.GetMigrations().GetLifetime()); + UNIT_ASSERT_VALUES_EQUAL(456, config.GetPartitionConfig().GetStorageLimitBytes()); + UNIT_ASSERT_VALUES_EQUAL(123, config.GetPartitionConfig().GetLifetimeSeconds()); + } + } } } diff --git a/ydb/core/persqueue/utils.cpp b/ydb/core/persqueue/utils.cpp index 7f7ca3d81e..7367fc471c 100644 --- a/ydb/core/persqueue/utils.cpp +++ b/ydb/core/persqueue/utils.cpp @@ -116,6 +116,13 @@ void Migrate(NKikimrPQ::TPQTabletConfig& config) { config.AddAllPartitions()->CopyFrom(partition); } } + + if (!config.GetMigrations().GetLifetime()) { + if (config.GetPartitionConfig().HasStorageLimitBytes()) { + config.MutablePartitionConfig()->SetLifetimeSeconds(TDuration::Days(3650).Seconds()); + } + } + config.MutableMigrations()->SetLifetime(true); } bool HasConsumer(const NKikimrPQ::TPQTabletConfig& config, const TString& consumerName) { diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index b2f9d70972..d64aeb45f9 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -421,7 +421,7 @@ message TPQTabletConfig { optional uint32 MinPartitionCount = 1 [default = 1]; // The maximum number of partitions that will be supported by the strategy. The strategy will not create partitions if the specified // amount is reached, even if the load exceeds the current capabilities of the topic. - optional uint32 MaxPartitionCount = 2 [default = 1];; + optional uint32 MaxPartitionCount = 2 [default = 1]; optional uint32 ScaleThresholdSeconds = 3 [default = 300]; optional uint32 ScaleUpPartitionWriteSpeedThresholdPercent = 4 [default = 80]; optional uint32 ScaleDownPartitionWriteSpeedThresholdPercent = 5 [default = 20]; @@ -433,6 +433,11 @@ message TPQTabletConfig { repeated TPartition AllPartitions = 36; // filled by schemeshard optional TOffloadConfig OffloadConfig = 38; + + message TMigrations { + optional bool Lifetime = 1 [default = false]; + } + optional TMigrations Migrations = 39; } message THeartbeat { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp index c6dd1aa23a..a2487e7f95 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp @@ -217,6 +217,8 @@ public: TPath::Resolve(alterConfig.GetOffloadConfig().GetIncrementalBackup().GetDstPath(), context.SS).Base()->PathId.ToProto(pathId); } + alterConfig.MutableMigrations()->CopyFrom(tabletConfig->GetMigrations()); + alterConfig.MutablePartitionKeySchema()->Swap(tabletConfig->MutablePartitionKeySchema()); Y_PROTOBUF_SUPPRESS_NODISCARD alterConfig.SerializeToString(¶ms->TabletConfig); alterConfig.Swap(tabletConfig); @@ -557,6 +559,7 @@ public: } NKikimrPQ::TPQTabletConfig tabletConfig = topic->GetTabletConfig(); + NKikimr::NPQ::Migrate(tabletConfig); NKikimrPQ::TPQTabletConfig newTabletConfig = tabletConfig; TTopicInfo::TPtr alterData = ParseParams(context, &newTabletConfig, alter, errStr); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp index c472b8adf5..ba4dd86c52 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp @@ -168,6 +168,9 @@ TTopicInfo::TPtr CreatePersQueueGroup(TOperationContext& context, } NKikimrPQ::TPQTabletConfig tabletConfig = op.GetPQTabletConfig(); + + tabletConfig.MutableMigrations()->SetLifetime(true); + tabletConfig.ClearPartitionIds(); tabletConfig.ClearPartitions(); diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 322c61ea17..c92883fea2 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -5428,6 +5428,9 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { Version: 567 Important: true } + Migrations { + Lifetime: true + } } ErrorCode: OK } |