summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/core/kafka_proxy/actors/control_plane_common.h10
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_alter_configs_actor.cpp14
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp40
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_describe_configs_actor.cpp9
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_describe_configs_actor.h1
-rw-r--r--ydb/core/kafka_proxy/kafka_constants.h6
-rw-r--r--ydb/core/kafka_proxy/kafka_events.h3
-rw-r--r--ydb/core/kafka_proxy/ut/ut_protocol.cpp80
-rw-r--r--ydb/core/persqueue/partition.cpp5
-rw-r--r--ydb/core/persqueue/partition_blob_encoder.h1
-rw-r--r--ydb/core/persqueue/partition_init.cpp1
-rw-r--r--ydb/core/persqueue/ut/common/pq_ut_common.cpp3
-rw-r--r--ydb/core/persqueue/ut/common/pq_ut_common.h1
-rw-r--r--ydb/core/persqueue/ut/pq_ut.cpp30
-rw-r--r--ydb/core/protos/pqconfig.proto3
15 files changed, 197 insertions, 10 deletions
diff --git a/ydb/core/kafka_proxy/actors/control_plane_common.h b/ydb/core/kafka_proxy/actors/control_plane_common.h
index f92bba2cc08..166fa895fec 100644
--- a/ydb/core/kafka_proxy/actors/control_plane_common.h
+++ b/ydb/core/kafka_proxy/actors/control_plane_common.h
@@ -109,6 +109,7 @@ inline std::optional<THolder<TEvKafka::TEvTopicModificationResponse>> ValidateTo
}
}
+
template<class T>
inline std::unordered_set<TString> ExtractDuplicates(
std::vector<T>& source,
@@ -377,4 +378,13 @@ private:
}
};
+enum class ECleanupPolicy {
+ DELETE,
+ COMPACT,
+ UNKNOWN
+};
+
+std::optional<THolder<TEvKafka::TEvTopicModificationResponse>> ConvertCleanupPolicy(const std::optional<TString>& configValue,
+ std::optional<ECleanupPolicy>& cleanupPolicy);
+
} //namespace NKafka
diff --git a/ydb/core/kafka_proxy/actors/kafka_alter_configs_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_alter_configs_actor.cpp
index 6b7c86c9995..a3b79c9ede1 100644
--- a/ydb/core/kafka_proxy/actors/kafka_alter_configs_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_alter_configs_actor.cpp
@@ -39,7 +39,8 @@ public:
TString topicPath,
TString databaseName,
std::optional<ui64> retentionMs,
- std::optional<ui64> retentionBytes)
+ std::optional<ui64> retentionBytes,
+ std::optional<ECleanupPolicy> cleanupPolicy)
: TAlterTopicActor<TAlterConfigsActor, TKafkaAlterConfigsRequest>(
requester,
userToken,
@@ -47,6 +48,7 @@ public:
databaseName)
, RetentionMs(retentionMs)
, RetentionBytes(retentionBytes)
+ , CleanupPolicy(cleanupPolicy)
{
KAFKA_LOG_D("Alter configs actor. DatabaseName: " << databaseName << ". TopicPath: " << TopicPath);
};
@@ -72,11 +74,15 @@ public:
if (RetentionBytes.has_value()) {
partitionConfig->SetStorageLimitBytes(RetentionBytes.value());
}
+ if (CleanupPolicy.has_value()) {
+ groupConfig.MutablePQTabletConfig()->SetEnableCompactification(CleanupPolicy.value() == ECleanupPolicy::COMPACT);
+ }
}
private:
std::optional<ui64> RetentionMs;
std::optional<ui64> RetentionBytes;
+ std::optional<ECleanupPolicy> CleanupPolicy;
};
NActors::IActor* CreateKafkaAlterConfigsActor(
@@ -116,6 +122,7 @@ void TKafkaAlterConfigsActor::Bootstrap(const NActors::TActorContext& ctx) {
std::optional<TString> retentionMs;
std::optional<TString> retentionBytes;
+ std::optional<ECleanupPolicy> cleanupPolicy;
std::optional<THolder<TEvKafka::TEvTopicModificationResponse>> unsupportedConfigResponse;
@@ -129,6 +136,8 @@ void TKafkaAlterConfigsActor::Bootstrap(const NActors::TActorContext& ctx) {
retentionMs = config.Value;
} else if (config.Name.value() == RETENTION_BYTES_CONFIG_NAME) {
retentionBytes = config.Value;
+ } else if (config.Name.value() == CLEANUP_POLICY) {
+ unsupportedConfigResponse = ConvertCleanupPolicy(config.Value, cleanupPolicy);
}
}
@@ -150,7 +159,8 @@ void TKafkaAlterConfigsActor::Bootstrap(const NActors::TActorContext& ctx) {
resource.ResourceName.value(),
Context->DatabasePath,
convertedRetentions.Ms,
- convertedRetentions.Bytes
+ convertedRetentions.Bytes,
+ cleanupPolicy
));
InflyTopics++;
diff --git a/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp
index cb6062d351b..968c16239da 100644
--- a/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp
@@ -11,6 +11,26 @@
namespace NKafka {
+std::optional<THolder<TEvKafka::TEvTopicModificationResponse>> ConvertCleanupPolicy(
+ const std::optional<TString>& configValue, std::optional<ECleanupPolicy>& cleanupPolicy
+) {
+ if (configValue.value_or("") == "delete") {
+ cleanupPolicy = ECleanupPolicy::DELETE;
+ return std::nullopt;
+ } else if (configValue.value_or("") == "compact") {
+ cleanupPolicy = ECleanupPolicy::COMPACT;
+ return std::nullopt;
+ }
+ auto result = MakeHolder<TEvKafka::TEvTopicModificationResponse>();
+ result->Status = EKafkaErrors::INVALID_REQUEST;
+ result->Message = TStringBuilder()
+ << "Topic-level config '"
+ << CLEANUP_POLICY
+ << "' has invalid/unsupported value: "
+ << configValue.value_or("");
+ return result;
+}
+
class TCreateTopicActor : public NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase<TCreateTopicActor, TKafkaTopicRequestCtx> {
using TBase = NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase<TCreateTopicActor, TKafkaTopicRequestCtx>;
public:
@@ -22,7 +42,8 @@ public:
TString databaseName,
ui32 partitionsNumber,
std::optional<ui64> retentionMs,
- std::optional<ui64> retentionBytes)
+ std::optional<ui64> retentionBytes,
+ std::optional<ECleanupPolicy> cleanupPolicy)
: TBase(new TKafkaTopicRequestCtx(
userToken,
topicPath,
@@ -36,6 +57,7 @@ public:
, PartionsNumber(partitionsNumber)
, RetentionMs(retentionMs)
, RetentionBytes(retentionBytes)
+ , CleanupPolicy(cleanupPolicy)
{
KAFKA_LOG_D(LogMessage(databaseName));
};
@@ -62,6 +84,9 @@ public:
auto pqDescr = modifyScheme.MutableCreatePersQueueGroup();
pqDescr->SetPartitionPerTablet(1);
+ if (CleanupPolicy.value_or(ECleanupPolicy::UNKNOWN) == ECleanupPolicy::COMPACT) {
+ pqDescr->MutablePQTabletConfig()->SetEnableCompactification(true);
+ }
Ydb::Topic::CreateTopicRequest topicRequest;
topicRequest.mutable_partitioning_settings()->set_min_active_partitions(PartionsNumber);
@@ -110,6 +135,7 @@ private:
const ui32 PartionsNumber;
std::optional<ui64> RetentionMs;
std::optional<ui64> RetentionBytes;
+ std::optional<ECleanupPolicy> CleanupPolicy;
TStringBuilder LogMessage(TString& databaseName) {
TStringBuilder stringBuilder = TStringBuilder()
@@ -122,6 +148,9 @@ private:
if (RetentionBytes.has_value()) {
stringBuilder << ". RetentionBytes: " << RetentionBytes.value();
}
+ if (CleanupPolicy.has_value() && CleanupPolicy.value() == ECleanupPolicy::COMPACT) {
+ stringBuilder << ". CleaunpPolicy: compact";
+ }
return stringBuilder;
}
};
@@ -163,6 +192,7 @@ void TKafkaCreateTopicsActor::Bootstrap(const NActors::TActorContext& ctx) {
std::optional<TString> retentionMs;
std::optional<TString> retentionBytes;
+ std::optional<ECleanupPolicy> cleanupPolicy;
std::optional<THolder<TEvKafka::TEvTopicModificationResponse>> unsupportedConfigResponse;
@@ -175,6 +205,11 @@ void TKafkaCreateTopicsActor::Bootstrap(const NActors::TActorContext& ctx) {
retentionMs = config.Value;
} else if (config.Name.value() == RETENTION_BYTES_CONFIG_NAME) {
retentionBytes = config.Value;
+ } else if (config.Name.value() == CLEANUP_POLICY) {
+ unsupportedConfigResponse = ConvertCleanupPolicy(config.Value, cleanupPolicy);
+ if (unsupportedConfigResponse.has_value()) {
+ break;
+ }
}
}
@@ -202,7 +237,8 @@ void TKafkaCreateTopicsActor::Bootstrap(const NActors::TActorContext& ctx) {
Context->DatabasePath,
topic.NumPartitions,
convertedRetentions.Ms,
- convertedRetentions.Bytes
+ convertedRetentions.Bytes,
+ cleanupPolicy
));
InflyTopics++;
diff --git a/ydb/core/kafka_proxy/actors/kafka_describe_configs_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_describe_configs_actor.cpp
index 824a40569d6..ea5e04ec302 100644
--- a/ydb/core/kafka_proxy/actors/kafka_describe_configs_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_describe_configs_actor.cpp
@@ -56,6 +56,7 @@ void TKafkaDescribeTopicActor::SendResult(const EKafkaErrors status, const TStri
if (status == EKafkaErrors::NONE_ERROR) {
const auto* protoResponse = dynamic_cast<const Ydb::Topic::DescribeTopicResult*>(&result);
response->Response = *protoResponse;
+ response->PQGroupInfo = PQGroupInfo;
}
TBase::Send(Requester, response.Release());
TBase::Send(TBase::SelfId(), new TEvents::TEvPoison());
@@ -90,6 +91,7 @@ void TKafkaDescribeTopicActor::HandleCacheNavigateResponse(NKikimr::TEvTxProxySc
TBase::Reply(status, ActorContext());
return;
}
+ PQGroupInfo = response.PQGroupInfo;
} else {
Ydb::Scheme::Entry *selfEntry = result.mutable_self();
@@ -183,7 +185,6 @@ void TKafkaDescribeConfigsActor::AddDescribeResponse(
AddConfigEntry(singleConfig, "remote.storage.enable", "false", EKafkaConfigType::BOOLEAN);
AddConfigEntry(singleConfig, "segment.jitter.ms", "0", EKafkaConfigType::LONG);
AddConfigEntry(singleConfig, "local.retention.ms", "-2", EKafkaConfigType::LONG);
- AddConfigEntry(singleConfig, "cleanup.policy", "delete", EKafkaConfigType::LIST);
AddConfigEntry(singleConfig, "flush.ms", "9223372036854775807", EKafkaConfigType::LONG);
AddConfigEntry(singleConfig, "follower.replication.throttled.replicas", "", EKafkaConfigType::LIST);
AddConfigEntry(singleConfig, "compression.lz4.level", "9", EKafkaConfigType::INT);
@@ -224,6 +225,12 @@ void TKafkaDescribeConfigsActor::AddDescribeResponse(
} else {
AddConfigEntry(singleConfig, "retention.bytes", "-1", EKafkaConfigType::LONG);
}
+ if (ev->PQGroupInfo && ev->PQGroupInfo->Description.GetPQTabletConfig().GetEnableCompactification()) {
+ AddConfigEntry(singleConfig, "cleanup.policy", "compact", EKafkaConfigType::LIST);
+ } else {
+ AddConfigEntry(singleConfig, "cleanup.policy", "delete", EKafkaConfigType::LIST);
+ }
+
response->Results.emplace_back(std::move(singleConfig));
}
diff --git a/ydb/core/kafka_proxy/actors/kafka_describe_configs_actor.h b/ydb/core/kafka_proxy/actors/kafka_describe_configs_actor.h
index a0683c8d3a5..c3827af0c67 100644
--- a/ydb/core/kafka_proxy/actors/kafka_describe_configs_actor.h
+++ b/ydb/core/kafka_proxy/actors/kafka_describe_configs_actor.h
@@ -78,6 +78,7 @@ protected:
private:
const TActorId Requester;
const std::shared_ptr<TString> SerializedToken;
+ TIntrusiveConstPtr<NKikimr::NSchemeCache::TSchemeCacheNavigate::TPQGroupInfo> PQGroupInfo;
};
diff --git a/ydb/core/kafka_proxy/kafka_constants.h b/ydb/core/kafka_proxy/kafka_constants.h
index 74837dc2500..52a3d515622 100644
--- a/ydb/core/kafka_proxy/kafka_constants.h
+++ b/ydb/core/kafka_proxy/kafka_constants.h
@@ -8,9 +8,11 @@ namespace NKafka {
static const TString RETENTION_MS_CONFIG_NAME = "retention.ms";
static const TString RETENTION_BYTES_CONFIG_NAME = "retention.bytes";
static const TString COMPRESSION_TYPE = "compression.type";
-
+ static const TString CLEANUP_POLICY = "cleanup.policy";
+
+
static const ui64 TRANSACTIONAL_ID_EXPIRATION_MS = 7 * 24 * 60 * 60 * 1000; // 7 days
-
+
static const i64 NO_PRODUCER_ID = -1;
static const i16 NO_PRODUCER_EPOCH = -1;
}
diff --git a/ydb/core/kafka_proxy/kafka_events.h b/ydb/core/kafka_proxy/kafka_events.h
index a6bcced0901..6ecf4eed18a 100644
--- a/ydb/core/kafka_proxy/kafka_events.h
+++ b/ydb/core/kafka_proxy/kafka_events.h
@@ -3,6 +3,7 @@
#include <ydb/library/actors/core/event_local.h>
#include <ydb/core/base/events.h>
#include <ydb/services/persqueue_v1/actors/events.h>
+#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include "kafka_messages.h"
#include "kafka_producer_instance_id.h"
@@ -293,7 +294,7 @@ struct TEvTopicDescribeResponse : public NActors::TEventLocal<TEvTopicDescribeRe
EKafkaErrors Status;
TString Message;
Ydb::Topic::DescribeTopicResult Response;
-
+ TIntrusiveConstPtr<NKikimr::NSchemeCache::TSchemeCacheNavigate::TPQGroupInfo> PQGroupInfo;
};
struct TEvAddOffsetsToTxnRequest : public TEventLocal<TEvAddOffsetsToTxnRequest, EvAddOffsetsToTxnRequest> {
diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp
index e6a3823a123..87dd248a251 100644
--- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp
+++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp
@@ -1392,7 +1392,6 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
auto result = pqClient.DescribeTopic("/Root/topic-986-test", describeTopicSettings).GetValueSync();
UNIT_ASSERT(!result.IsSuccess());
}
-
}
Y_UNIT_TEST(CreateTopicsScenarioWithKafkaAuth) {
@@ -1520,6 +1519,85 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
}
} // Y_UNIT_TEST(CreatePartitionsScenario)
+ void RunCreateTopicsWithCleanupPolicy(TInsecureTestServer& testServer, TKafkaTestClient& client) {
+ NYdb::NTopic::TTopicClient pqClient(*testServer.Driver);
+
+ TString topic1 = "topic-999-test", topic2 = "topic-998-test";
+
+ {
+ // Creation of two topics
+ auto msg = client.CreateTopics({
+ TTopicConfig(topic1, 12, std::nullopt, std::nullopt, {{"cleanup.policy", "compact"}}),
+ TTopicConfig(topic2, 13, std::nullopt, std::nullopt, {{"cleanup.policy", "delete"}}),
+ TTopicConfig("topic_bad", 13, std::nullopt, std::nullopt, {{"cleanup.policy", "bad"}})
+ });
+ UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 3);
+
+ UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].ErrorCode, NONE_ERROR);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].Name.value(), topic1);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Topics[1].ErrorCode, NONE_ERROR);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Topics[1].Name.value(), topic2);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Topics[2].ErrorCode, INVALID_REQUEST);
+ }
+
+ auto getConfigsMap = [&](const auto& describeResult) {
+ THashMap<TString, TDescribeConfigsResponseData::TDescribeConfigsResult::TDescribeConfigsResourceResult> configs;
+ for (const auto& config : describeResult.Configs) {
+ configs[TString(config.Name->data())] = config;
+ }
+ return configs;
+ };
+
+ struct TDescribeTopicResult {
+ TString name;
+ TString policy;
+ };
+
+ auto checkDescribeTopic = [&](const std::vector<TDescribeTopicResult>& topics) {
+ std::vector<TString> topicNames;
+ for (const auto& topic : topics) {
+ topicNames.push_back(topic.name);
+ }
+
+ auto msg = client.DescribeConfigs(topicNames);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Results.size(), topics.size());
+ for (auto i = 0u; i < topics.size(); ++i) {
+ const auto& res = msg->Results[i];
+ UNIT_ASSERT_VALUES_EQUAL(res.ResourceName.value(), topics[i].name);
+ UNIT_ASSERT_VALUES_EQUAL(res.ErrorCode, NONE_ERROR);
+ UNIT_ASSERT_VALUES_EQUAL_C(getConfigsMap(res).find("cleanup.policy")->second.Value->data(),
+ topics[i].policy, res.ResourceName.value());
+ }
+ };
+
+ checkDescribeTopic({{topic1, "compact"}, {topic2, "delete"}});
+
+ {
+ auto msg = client.AlterConfigs({
+ TTopicConfig(topic1, 12, std::nullopt, std::nullopt, {{"cleanup.policy", "bad"}}),
+ TTopicConfig(topic2, 13, std::nullopt, std::nullopt, {{"cleanup.policy", "compact"}}),
+ });
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].ErrorCode, INVALID_REQUEST);
+ checkDescribeTopic({{topic1, "compact"}, {topic2, "compact"}});
+ }
+ {
+ auto msg = client.AlterConfigs({
+ TTopicConfig(topic1, 12, std::nullopt, std::nullopt, {{"cleanup.policy", "delete"}}),
+ TTopicConfig(topic2, 13, std::nullopt, std::nullopt, {{"cleanup.policy", ""}})
+ });
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[1].ErrorCode, INVALID_REQUEST);
+ checkDescribeTopic({{topic1, "delete"}, {topic2, "compact"}});
+ }
+ }
+
+
+ Y_UNIT_TEST(TopicsWithCleaunpPolicyScenario) {
+ TInsecureTestServer testServer("2");
+ TKafkaTestClient client(testServer.Port);
+
+ RunCreateTopicsWithCleanupPolicy(testServer, client);
+ }
+
Y_UNIT_TEST(DescribeConfigsScenario) {
TInsecureTestServer testServer("2");
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index d9085e399e0..71cebffc4c9 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -413,8 +413,11 @@ bool TPartition::CleanUpBlobs(TEvKeyValue::TEvRequest *request, const TActorCont
if (BlobEncoder.StartOffset == BlobEncoder.EndOffset || BlobEncoder.DataKeysBody.size() <= 1) {
return false;
}
-
+ if (Config.GetEnableCompactification()) {
+ return false;
+ }
const auto& partConfig = Config.GetPartitionConfig();
+
const TDuration lifetimeLimit{TDuration::Seconds(partConfig.GetLifetimeSeconds())};
const bool hasStorageLimit = partConfig.HasStorageLimitBytes();
diff --git a/ydb/core/persqueue/partition_blob_encoder.h b/ydb/core/persqueue/partition_blob_encoder.h
index 6a18697a55e..a9abf2e1835 100644
--- a/ydb/core/persqueue/partition_blob_encoder.h
+++ b/ydb/core/persqueue/partition_blob_encoder.h
@@ -97,6 +97,7 @@ struct TPartitionBlobEncoder {
std::deque<TDataKey> DataKeysBody;
TVector<TKeyLevel> DataKeysHead;
std::deque<TDataKey> HeadKeys;
+ ui64 FirstUncompactedOffset = 0;
};
}
diff --git a/ydb/core/persqueue/partition_init.cpp b/ydb/core/persqueue/partition_init.cpp
index bcd11d8ed0b..3281bdcf94c 100644
--- a/ydb/core/persqueue/partition_init.cpp
+++ b/ydb/core/persqueue/partition_init.cpp
@@ -325,6 +325,7 @@ void TInitMetaStep::LoadMeta(const NKikimrClient::TResponse& kvResponse, const T
Partition()->BlobEncoder.StartOffset = meta.GetStartOffset();
Partition()->BlobEncoder.EndOffset = meta.GetEndOffset();
+ Partition()->BlobEncoder.FirstUncompactedOffset = meta.GetFirstUncompactedOffset();
if (Partition()->BlobEncoder.StartOffset == Partition()->BlobEncoder.EndOffset) {
Partition()->BlobEncoder.NewHead.Offset = Partition()->BlobEncoder.Head.Offset = Partition()->BlobEncoder.EndOffset;
}
diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.cpp b/ydb/core/persqueue/ut/common/pq_ut_common.cpp
index cfc1541e777..e16a9290f71 100644
--- a/ydb/core/persqueue/ut/common/pq_ut_common.cpp
+++ b/ydb/core/persqueue/ut/common/pq_ut_common.cpp
@@ -88,6 +88,9 @@ void PQTabletPrepare(const TTabletPreparationParameters& parameters,
partitionConfig->SetMaxWriteInflightSize(90'000'000);
partitionConfig->SetLowWatermark(parameters.lowWatermark);
+ if (parameters.enableCompactificationByKey) {
+ tabletConfig->SetEnableCompactification(true);
+ }
for (auto& u : users) {
if (u.second)
partitionConfig->AddImportantClientId(u.first);
diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.h b/ydb/core/persqueue/ut/common/pq_ut_common.h
index 1dc0ebd4c2d..e4231252295 100644
--- a/ydb/core/persqueue/ut/common/pq_ut_common.h
+++ b/ydb/core/persqueue/ut/common/pq_ut_common.h
@@ -260,6 +260,7 @@ struct TTabletPreparationParameters {
TString databasePath{"/Root/PQ"};
TString account{"federationAccount"};
::NKikimrPQ::TPQTabletConfig_EMeteringMode meteringMode = NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY;
+ bool enableCompactificationByKey{false};
};
void PQTabletPrepare(
const TTabletPreparationParameters& parameters,
diff --git a/ydb/core/persqueue/ut/pq_ut.cpp b/ydb/core/persqueue/ut/pq_ut.cpp
index 0bf2957ae98..ee60f887a5d 100644
--- a/ydb/core/persqueue/ut/pq_ut.cpp
+++ b/ydb/core/persqueue/ut/pq_ut.cpp
@@ -1570,7 +1570,37 @@ Y_UNIT_TEST(TestTimeRetention) {
});
}
+Y_UNIT_TEST(TestComactifiedWithRetention) {
+ TTestContext tc;
+ RunTestWithReboots(tc.TabletIds, [&]() {
+ return tc.InitialEventsFilter.Prepare();
+ }, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) {
+ TFinalizer finalizer(tc);
+ activeZone = false;
+ tc.Prepare(dispatchName, setup, activeZone);
+ tc.Runtime->SetScheduledLimit(100);
+
+ TVector<std::pair<ui64, TString>> data;
+ activeZone = PlainOrSoSlow(true, false);
+
+ TString s{32, 'c'};
+ ui32 pp = 8 + 4 + 2 + 9;
+ for (ui32 i = 0; i < 10; ++i) {
+ data.push_back({i + 1, s.substr(pp)});
+ }
+ PQTabletPrepare({.maxCountInPartition=1000, .deleteTime=0, .lowWatermark=100, .enableCompactificationByKey = true}, {}, tc);
+ CmdWrite(0, "sourceid0", data, tc, false, {}, true);
+ CmdWrite(0, "sourceid1", data, tc, false);
+ CmdWrite(0, "sourceid2", data, tc, false);
+ PQGetPartInfo(0, 30, tc);
+ PQTabletPrepare({.maxCountInPartition=1000, .deleteTime=0, .lowWatermark=100, .enableCompactificationByKey = false}, {}, tc);
+ CmdWrite(0, "sourceid3", data, tc, false);
+ CmdWrite(0, "sourceid4", data, tc, false);
+ CmdWrite(0, "sourceid5", data, tc, false);
+ PQGetPartInfo(50, 60, tc);
+ });
+}
Y_UNIT_TEST(TestStorageRetention) {
TTestContext tc;
diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto
index 905d59e2ac5..1b87fa6c448 100644
--- a/ydb/core/protos/pqconfig.proto
+++ b/ydb/core/protos/pqconfig.proto
@@ -33,6 +33,7 @@ message TPartitionMeta {
optional bool SubDomainOutOfSpace = 3 [default = false];
optional TPartitionCounterData CounterData = 4;
optional uint64 EndWriteTimestamp = 5;
+ optional uint64 FirstUncompactedOffset = 6;
}
message TPartitionTxMeta {
@@ -433,6 +434,8 @@ message TPQTabletConfig {
repeated TPartition AllPartitions = 36; // filled by schemeshard
optional TOffloadConfig OffloadConfig = 38;
+
+ optional bool EnableCompactification = 39 [default = false];
}
message THeartbeat {