diff options
| -rw-r--r-- | ydb/core/kafka_proxy/actors/control_plane_common.h | 10 | ||||
| -rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_alter_configs_actor.cpp | 14 | ||||
| -rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp | 40 | ||||
| -rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_describe_configs_actor.cpp | 9 | ||||
| -rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_describe_configs_actor.h | 1 | ||||
| -rw-r--r-- | ydb/core/kafka_proxy/kafka_constants.h | 6 | ||||
| -rw-r--r-- | ydb/core/kafka_proxy/kafka_events.h | 3 | ||||
| -rw-r--r-- | ydb/core/kafka_proxy/ut/ut_protocol.cpp | 80 | ||||
| -rw-r--r-- | ydb/core/persqueue/partition.cpp | 5 | ||||
| -rw-r--r-- | ydb/core/persqueue/partition_blob_encoder.h | 1 | ||||
| -rw-r--r-- | ydb/core/persqueue/partition_init.cpp | 1 | ||||
| -rw-r--r-- | ydb/core/persqueue/ut/common/pq_ut_common.cpp | 3 | ||||
| -rw-r--r-- | ydb/core/persqueue/ut/common/pq_ut_common.h | 1 | ||||
| -rw-r--r-- | ydb/core/persqueue/ut/pq_ut.cpp | 30 | ||||
| -rw-r--r-- | ydb/core/protos/pqconfig.proto | 3 |
15 files changed, 197 insertions, 10 deletions
diff --git a/ydb/core/kafka_proxy/actors/control_plane_common.h b/ydb/core/kafka_proxy/actors/control_plane_common.h index f92bba2cc08..166fa895fec 100644 --- a/ydb/core/kafka_proxy/actors/control_plane_common.h +++ b/ydb/core/kafka_proxy/actors/control_plane_common.h @@ -109,6 +109,7 @@ inline std::optional<THolder<TEvKafka::TEvTopicModificationResponse>> ValidateTo } } + template<class T> inline std::unordered_set<TString> ExtractDuplicates( std::vector<T>& source, @@ -377,4 +378,13 @@ private: } }; +enum class ECleanupPolicy { + DELETE, + COMPACT, + UNKNOWN +}; + +std::optional<THolder<TEvKafka::TEvTopicModificationResponse>> ConvertCleanupPolicy(const std::optional<TString>& configValue, + std::optional<ECleanupPolicy>& cleanupPolicy); + } //namespace NKafka diff --git a/ydb/core/kafka_proxy/actors/kafka_alter_configs_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_alter_configs_actor.cpp index 6b7c86c9995..a3b79c9ede1 100644 --- a/ydb/core/kafka_proxy/actors/kafka_alter_configs_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_alter_configs_actor.cpp @@ -39,7 +39,8 @@ public: TString topicPath, TString databaseName, std::optional<ui64> retentionMs, - std::optional<ui64> retentionBytes) + std::optional<ui64> retentionBytes, + std::optional<ECleanupPolicy> cleanupPolicy) : TAlterTopicActor<TAlterConfigsActor, TKafkaAlterConfigsRequest>( requester, userToken, @@ -47,6 +48,7 @@ public: databaseName) , RetentionMs(retentionMs) , RetentionBytes(retentionBytes) + , CleanupPolicy(cleanupPolicy) { KAFKA_LOG_D("Alter configs actor. DatabaseName: " << databaseName << ". TopicPath: " << TopicPath); }; @@ -72,11 +74,15 @@ public: if (RetentionBytes.has_value()) { partitionConfig->SetStorageLimitBytes(RetentionBytes.value()); } + if (CleanupPolicy.has_value()) { + groupConfig.MutablePQTabletConfig()->SetEnableCompactification(CleanupPolicy.value() == ECleanupPolicy::COMPACT); + } } private: std::optional<ui64> RetentionMs; std::optional<ui64> RetentionBytes; + std::optional<ECleanupPolicy> CleanupPolicy; }; NActors::IActor* CreateKafkaAlterConfigsActor( @@ -116,6 +122,7 @@ void TKafkaAlterConfigsActor::Bootstrap(const NActors::TActorContext& ctx) { std::optional<TString> retentionMs; std::optional<TString> retentionBytes; + std::optional<ECleanupPolicy> cleanupPolicy; std::optional<THolder<TEvKafka::TEvTopicModificationResponse>> unsupportedConfigResponse; @@ -129,6 +136,8 @@ void TKafkaAlterConfigsActor::Bootstrap(const NActors::TActorContext& ctx) { retentionMs = config.Value; } else if (config.Name.value() == RETENTION_BYTES_CONFIG_NAME) { retentionBytes = config.Value; + } else if (config.Name.value() == CLEANUP_POLICY) { + unsupportedConfigResponse = ConvertCleanupPolicy(config.Value, cleanupPolicy); } } @@ -150,7 +159,8 @@ void TKafkaAlterConfigsActor::Bootstrap(const NActors::TActorContext& ctx) { resource.ResourceName.value(), Context->DatabasePath, convertedRetentions.Ms, - convertedRetentions.Bytes + convertedRetentions.Bytes, + cleanupPolicy )); InflyTopics++; diff --git a/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp index cb6062d351b..968c16239da 100644 --- a/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp @@ -11,6 +11,26 @@ namespace NKafka { +std::optional<THolder<TEvKafka::TEvTopicModificationResponse>> ConvertCleanupPolicy( + const std::optional<TString>& configValue, std::optional<ECleanupPolicy>& cleanupPolicy +) { + if (configValue.value_or("") == "delete") { + cleanupPolicy = ECleanupPolicy::DELETE; + return std::nullopt; + } else if (configValue.value_or("") == "compact") { + cleanupPolicy = ECleanupPolicy::COMPACT; + return std::nullopt; + } + auto result = MakeHolder<TEvKafka::TEvTopicModificationResponse>(); + result->Status = EKafkaErrors::INVALID_REQUEST; + result->Message = TStringBuilder() + << "Topic-level config '" + << CLEANUP_POLICY + << "' has invalid/unsupported value: " + << configValue.value_or(""); + return result; +} + class TCreateTopicActor : public NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase<TCreateTopicActor, TKafkaTopicRequestCtx> { using TBase = NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase<TCreateTopicActor, TKafkaTopicRequestCtx>; public: @@ -22,7 +42,8 @@ public: TString databaseName, ui32 partitionsNumber, std::optional<ui64> retentionMs, - std::optional<ui64> retentionBytes) + std::optional<ui64> retentionBytes, + std::optional<ECleanupPolicy> cleanupPolicy) : TBase(new TKafkaTopicRequestCtx( userToken, topicPath, @@ -36,6 +57,7 @@ public: , PartionsNumber(partitionsNumber) , RetentionMs(retentionMs) , RetentionBytes(retentionBytes) + , CleanupPolicy(cleanupPolicy) { KAFKA_LOG_D(LogMessage(databaseName)); }; @@ -62,6 +84,9 @@ public: auto pqDescr = modifyScheme.MutableCreatePersQueueGroup(); pqDescr->SetPartitionPerTablet(1); + if (CleanupPolicy.value_or(ECleanupPolicy::UNKNOWN) == ECleanupPolicy::COMPACT) { + pqDescr->MutablePQTabletConfig()->SetEnableCompactification(true); + } Ydb::Topic::CreateTopicRequest topicRequest; topicRequest.mutable_partitioning_settings()->set_min_active_partitions(PartionsNumber); @@ -110,6 +135,7 @@ private: const ui32 PartionsNumber; std::optional<ui64> RetentionMs; std::optional<ui64> RetentionBytes; + std::optional<ECleanupPolicy> CleanupPolicy; TStringBuilder LogMessage(TString& databaseName) { TStringBuilder stringBuilder = TStringBuilder() @@ -122,6 +148,9 @@ private: if (RetentionBytes.has_value()) { stringBuilder << ". RetentionBytes: " << RetentionBytes.value(); } + if (CleanupPolicy.has_value() && CleanupPolicy.value() == ECleanupPolicy::COMPACT) { + stringBuilder << ". CleaunpPolicy: compact"; + } return stringBuilder; } }; @@ -163,6 +192,7 @@ void TKafkaCreateTopicsActor::Bootstrap(const NActors::TActorContext& ctx) { std::optional<TString> retentionMs; std::optional<TString> retentionBytes; + std::optional<ECleanupPolicy> cleanupPolicy; std::optional<THolder<TEvKafka::TEvTopicModificationResponse>> unsupportedConfigResponse; @@ -175,6 +205,11 @@ void TKafkaCreateTopicsActor::Bootstrap(const NActors::TActorContext& ctx) { retentionMs = config.Value; } else if (config.Name.value() == RETENTION_BYTES_CONFIG_NAME) { retentionBytes = config.Value; + } else if (config.Name.value() == CLEANUP_POLICY) { + unsupportedConfigResponse = ConvertCleanupPolicy(config.Value, cleanupPolicy); + if (unsupportedConfigResponse.has_value()) { + break; + } } } @@ -202,7 +237,8 @@ void TKafkaCreateTopicsActor::Bootstrap(const NActors::TActorContext& ctx) { Context->DatabasePath, topic.NumPartitions, convertedRetentions.Ms, - convertedRetentions.Bytes + convertedRetentions.Bytes, + cleanupPolicy )); InflyTopics++; diff --git a/ydb/core/kafka_proxy/actors/kafka_describe_configs_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_describe_configs_actor.cpp index 824a40569d6..ea5e04ec302 100644 --- a/ydb/core/kafka_proxy/actors/kafka_describe_configs_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_describe_configs_actor.cpp @@ -56,6 +56,7 @@ void TKafkaDescribeTopicActor::SendResult(const EKafkaErrors status, const TStri if (status == EKafkaErrors::NONE_ERROR) { const auto* protoResponse = dynamic_cast<const Ydb::Topic::DescribeTopicResult*>(&result); response->Response = *protoResponse; + response->PQGroupInfo = PQGroupInfo; } TBase::Send(Requester, response.Release()); TBase::Send(TBase::SelfId(), new TEvents::TEvPoison()); @@ -90,6 +91,7 @@ void TKafkaDescribeTopicActor::HandleCacheNavigateResponse(NKikimr::TEvTxProxySc TBase::Reply(status, ActorContext()); return; } + PQGroupInfo = response.PQGroupInfo; } else { Ydb::Scheme::Entry *selfEntry = result.mutable_self(); @@ -183,7 +185,6 @@ void TKafkaDescribeConfigsActor::AddDescribeResponse( AddConfigEntry(singleConfig, "remote.storage.enable", "false", EKafkaConfigType::BOOLEAN); AddConfigEntry(singleConfig, "segment.jitter.ms", "0", EKafkaConfigType::LONG); AddConfigEntry(singleConfig, "local.retention.ms", "-2", EKafkaConfigType::LONG); - AddConfigEntry(singleConfig, "cleanup.policy", "delete", EKafkaConfigType::LIST); AddConfigEntry(singleConfig, "flush.ms", "9223372036854775807", EKafkaConfigType::LONG); AddConfigEntry(singleConfig, "follower.replication.throttled.replicas", "", EKafkaConfigType::LIST); AddConfigEntry(singleConfig, "compression.lz4.level", "9", EKafkaConfigType::INT); @@ -224,6 +225,12 @@ void TKafkaDescribeConfigsActor::AddDescribeResponse( } else { AddConfigEntry(singleConfig, "retention.bytes", "-1", EKafkaConfigType::LONG); } + if (ev->PQGroupInfo && ev->PQGroupInfo->Description.GetPQTabletConfig().GetEnableCompactification()) { + AddConfigEntry(singleConfig, "cleanup.policy", "compact", EKafkaConfigType::LIST); + } else { + AddConfigEntry(singleConfig, "cleanup.policy", "delete", EKafkaConfigType::LIST); + } + response->Results.emplace_back(std::move(singleConfig)); } diff --git a/ydb/core/kafka_proxy/actors/kafka_describe_configs_actor.h b/ydb/core/kafka_proxy/actors/kafka_describe_configs_actor.h index a0683c8d3a5..c3827af0c67 100644 --- a/ydb/core/kafka_proxy/actors/kafka_describe_configs_actor.h +++ b/ydb/core/kafka_proxy/actors/kafka_describe_configs_actor.h @@ -78,6 +78,7 @@ protected: private: const TActorId Requester; const std::shared_ptr<TString> SerializedToken; + TIntrusiveConstPtr<NKikimr::NSchemeCache::TSchemeCacheNavigate::TPQGroupInfo> PQGroupInfo; }; diff --git a/ydb/core/kafka_proxy/kafka_constants.h b/ydb/core/kafka_proxy/kafka_constants.h index 74837dc2500..52a3d515622 100644 --- a/ydb/core/kafka_proxy/kafka_constants.h +++ b/ydb/core/kafka_proxy/kafka_constants.h @@ -8,9 +8,11 @@ namespace NKafka { static const TString RETENTION_MS_CONFIG_NAME = "retention.ms"; static const TString RETENTION_BYTES_CONFIG_NAME = "retention.bytes"; static const TString COMPRESSION_TYPE = "compression.type"; - + static const TString CLEANUP_POLICY = "cleanup.policy"; + + static const ui64 TRANSACTIONAL_ID_EXPIRATION_MS = 7 * 24 * 60 * 60 * 1000; // 7 days - + static const i64 NO_PRODUCER_ID = -1; static const i16 NO_PRODUCER_EPOCH = -1; } diff --git a/ydb/core/kafka_proxy/kafka_events.h b/ydb/core/kafka_proxy/kafka_events.h index a6bcced0901..6ecf4eed18a 100644 --- a/ydb/core/kafka_proxy/kafka_events.h +++ b/ydb/core/kafka_proxy/kafka_events.h @@ -3,6 +3,7 @@ #include <ydb/library/actors/core/event_local.h> #include <ydb/core/base/events.h> #include <ydb/services/persqueue_v1/actors/events.h> +#include <ydb/core/tx/scheme_cache/scheme_cache.h> #include "kafka_messages.h" #include "kafka_producer_instance_id.h" @@ -293,7 +294,7 @@ struct TEvTopicDescribeResponse : public NActors::TEventLocal<TEvTopicDescribeRe EKafkaErrors Status; TString Message; Ydb::Topic::DescribeTopicResult Response; - + TIntrusiveConstPtr<NKikimr::NSchemeCache::TSchemeCacheNavigate::TPQGroupInfo> PQGroupInfo; }; struct TEvAddOffsetsToTxnRequest : public TEventLocal<TEvAddOffsetsToTxnRequest, EvAddOffsetsToTxnRequest> { diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp index e6a3823a123..87dd248a251 100644 --- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp +++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp @@ -1392,7 +1392,6 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { auto result = pqClient.DescribeTopic("/Root/topic-986-test", describeTopicSettings).GetValueSync(); UNIT_ASSERT(!result.IsSuccess()); } - } Y_UNIT_TEST(CreateTopicsScenarioWithKafkaAuth) { @@ -1520,6 +1519,85 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { } } // Y_UNIT_TEST(CreatePartitionsScenario) + void RunCreateTopicsWithCleanupPolicy(TInsecureTestServer& testServer, TKafkaTestClient& client) { + NYdb::NTopic::TTopicClient pqClient(*testServer.Driver); + + TString topic1 = "topic-999-test", topic2 = "topic-998-test"; + + { + // Creation of two topics + auto msg = client.CreateTopics({ + TTopicConfig(topic1, 12, std::nullopt, std::nullopt, {{"cleanup.policy", "compact"}}), + TTopicConfig(topic2, 13, std::nullopt, std::nullopt, {{"cleanup.policy", "delete"}}), + TTopicConfig("topic_bad", 13, std::nullopt, std::nullopt, {{"cleanup.policy", "bad"}}) + }); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 3); + + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].ErrorCode, NONE_ERROR); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].Name.value(), topic1); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[1].ErrorCode, NONE_ERROR); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[1].Name.value(), topic2); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[2].ErrorCode, INVALID_REQUEST); + } + + auto getConfigsMap = [&](const auto& describeResult) { + THashMap<TString, TDescribeConfigsResponseData::TDescribeConfigsResult::TDescribeConfigsResourceResult> configs; + for (const auto& config : describeResult.Configs) { + configs[TString(config.Name->data())] = config; + } + return configs; + }; + + struct TDescribeTopicResult { + TString name; + TString policy; + }; + + auto checkDescribeTopic = [&](const std::vector<TDescribeTopicResult>& topics) { + std::vector<TString> topicNames; + for (const auto& topic : topics) { + topicNames.push_back(topic.name); + } + + auto msg = client.DescribeConfigs(topicNames); + UNIT_ASSERT_VALUES_EQUAL(msg->Results.size(), topics.size()); + for (auto i = 0u; i < topics.size(); ++i) { + const auto& res = msg->Results[i]; + UNIT_ASSERT_VALUES_EQUAL(res.ResourceName.value(), topics[i].name); + UNIT_ASSERT_VALUES_EQUAL(res.ErrorCode, NONE_ERROR); + UNIT_ASSERT_VALUES_EQUAL_C(getConfigsMap(res).find("cleanup.policy")->second.Value->data(), + topics[i].policy, res.ResourceName.value()); + } + }; + + checkDescribeTopic({{topic1, "compact"}, {topic2, "delete"}}); + + { + auto msg = client.AlterConfigs({ + TTopicConfig(topic1, 12, std::nullopt, std::nullopt, {{"cleanup.policy", "bad"}}), + TTopicConfig(topic2, 13, std::nullopt, std::nullopt, {{"cleanup.policy", "compact"}}), + }); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].ErrorCode, INVALID_REQUEST); + checkDescribeTopic({{topic1, "compact"}, {topic2, "compact"}}); + } + { + auto msg = client.AlterConfigs({ + TTopicConfig(topic1, 12, std::nullopt, std::nullopt, {{"cleanup.policy", "delete"}}), + TTopicConfig(topic2, 13, std::nullopt, std::nullopt, {{"cleanup.policy", ""}}) + }); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[1].ErrorCode, INVALID_REQUEST); + checkDescribeTopic({{topic1, "delete"}, {topic2, "compact"}}); + } + } + + + Y_UNIT_TEST(TopicsWithCleaunpPolicyScenario) { + TInsecureTestServer testServer("2"); + TKafkaTestClient client(testServer.Port); + + RunCreateTopicsWithCleanupPolicy(testServer, client); + } + Y_UNIT_TEST(DescribeConfigsScenario) { TInsecureTestServer testServer("2"); diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index d9085e399e0..71cebffc4c9 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -413,8 +413,11 @@ bool TPartition::CleanUpBlobs(TEvKeyValue::TEvRequest *request, const TActorCont if (BlobEncoder.StartOffset == BlobEncoder.EndOffset || BlobEncoder.DataKeysBody.size() <= 1) { return false; } - + if (Config.GetEnableCompactification()) { + return false; + } const auto& partConfig = Config.GetPartitionConfig(); + const TDuration lifetimeLimit{TDuration::Seconds(partConfig.GetLifetimeSeconds())}; const bool hasStorageLimit = partConfig.HasStorageLimitBytes(); diff --git a/ydb/core/persqueue/partition_blob_encoder.h b/ydb/core/persqueue/partition_blob_encoder.h index 6a18697a55e..a9abf2e1835 100644 --- a/ydb/core/persqueue/partition_blob_encoder.h +++ b/ydb/core/persqueue/partition_blob_encoder.h @@ -97,6 +97,7 @@ struct TPartitionBlobEncoder { std::deque<TDataKey> DataKeysBody; TVector<TKeyLevel> DataKeysHead; std::deque<TDataKey> HeadKeys; + ui64 FirstUncompactedOffset = 0; }; } diff --git a/ydb/core/persqueue/partition_init.cpp b/ydb/core/persqueue/partition_init.cpp index bcd11d8ed0b..3281bdcf94c 100644 --- a/ydb/core/persqueue/partition_init.cpp +++ b/ydb/core/persqueue/partition_init.cpp @@ -325,6 +325,7 @@ void TInitMetaStep::LoadMeta(const NKikimrClient::TResponse& kvResponse, const T Partition()->BlobEncoder.StartOffset = meta.GetStartOffset(); Partition()->BlobEncoder.EndOffset = meta.GetEndOffset(); + Partition()->BlobEncoder.FirstUncompactedOffset = meta.GetFirstUncompactedOffset(); if (Partition()->BlobEncoder.StartOffset == Partition()->BlobEncoder.EndOffset) { Partition()->BlobEncoder.NewHead.Offset = Partition()->BlobEncoder.Head.Offset = Partition()->BlobEncoder.EndOffset; } diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.cpp b/ydb/core/persqueue/ut/common/pq_ut_common.cpp index cfc1541e777..e16a9290f71 100644 --- a/ydb/core/persqueue/ut/common/pq_ut_common.cpp +++ b/ydb/core/persqueue/ut/common/pq_ut_common.cpp @@ -88,6 +88,9 @@ void PQTabletPrepare(const TTabletPreparationParameters& parameters, partitionConfig->SetMaxWriteInflightSize(90'000'000); partitionConfig->SetLowWatermark(parameters.lowWatermark); + if (parameters.enableCompactificationByKey) { + tabletConfig->SetEnableCompactification(true); + } for (auto& u : users) { if (u.second) partitionConfig->AddImportantClientId(u.first); diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.h b/ydb/core/persqueue/ut/common/pq_ut_common.h index 1dc0ebd4c2d..e4231252295 100644 --- a/ydb/core/persqueue/ut/common/pq_ut_common.h +++ b/ydb/core/persqueue/ut/common/pq_ut_common.h @@ -260,6 +260,7 @@ struct TTabletPreparationParameters { TString databasePath{"/Root/PQ"}; TString account{"federationAccount"}; ::NKikimrPQ::TPQTabletConfig_EMeteringMode meteringMode = NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY; + bool enableCompactificationByKey{false}; }; void PQTabletPrepare( const TTabletPreparationParameters& parameters, diff --git a/ydb/core/persqueue/ut/pq_ut.cpp b/ydb/core/persqueue/ut/pq_ut.cpp index 0bf2957ae98..ee60f887a5d 100644 --- a/ydb/core/persqueue/ut/pq_ut.cpp +++ b/ydb/core/persqueue/ut/pq_ut.cpp @@ -1570,7 +1570,37 @@ Y_UNIT_TEST(TestTimeRetention) { }); } +Y_UNIT_TEST(TestComactifiedWithRetention) { + TTestContext tc; + RunTestWithReboots(tc.TabletIds, [&]() { + return tc.InitialEventsFilter.Prepare(); + }, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) { + TFinalizer finalizer(tc); + activeZone = false; + tc.Prepare(dispatchName, setup, activeZone); + tc.Runtime->SetScheduledLimit(100); + + TVector<std::pair<ui64, TString>> data; + activeZone = PlainOrSoSlow(true, false); + + TString s{32, 'c'}; + ui32 pp = 8 + 4 + 2 + 9; + for (ui32 i = 0; i < 10; ++i) { + data.push_back({i + 1, s.substr(pp)}); + } + PQTabletPrepare({.maxCountInPartition=1000, .deleteTime=0, .lowWatermark=100, .enableCompactificationByKey = true}, {}, tc); + CmdWrite(0, "sourceid0", data, tc, false, {}, true); + CmdWrite(0, "sourceid1", data, tc, false); + CmdWrite(0, "sourceid2", data, tc, false); + PQGetPartInfo(0, 30, tc); + PQTabletPrepare({.maxCountInPartition=1000, .deleteTime=0, .lowWatermark=100, .enableCompactificationByKey = false}, {}, tc); + CmdWrite(0, "sourceid3", data, tc, false); + CmdWrite(0, "sourceid4", data, tc, false); + CmdWrite(0, "sourceid5", data, tc, false); + PQGetPartInfo(50, 60, tc); + }); +} Y_UNIT_TEST(TestStorageRetention) { TTestContext tc; diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index 905d59e2ac5..1b87fa6c448 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -33,6 +33,7 @@ message TPartitionMeta { optional bool SubDomainOutOfSpace = 3 [default = false]; optional TPartitionCounterData CounterData = 4; optional uint64 EndWriteTimestamp = 5; + optional uint64 FirstUncompactedOffset = 6; } message TPartitionTxMeta { @@ -433,6 +434,8 @@ message TPQTabletConfig { repeated TPartition AllPartitions = 36; // filled by schemeshard optional TOffloadConfig OffloadConfig = 38; + + optional bool EnableCompactification = 39 [default = false]; } message THeartbeat { |
