summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <[email protected]>2022-04-22 21:43:25 +0300
committerIlnaz Nizametdinov <[email protected]>2022-04-22 21:43:25 +0300
commit0d1c4d5c0dee58c162e1c0649f0e5b394a9306b3 (patch)
tree9a872f1be7cb7803efbb679ce43017a022e0e538
parent8f1d749c26573a6a3397dbed564286bd071c69f6 (diff)
Configurable retention period: private api KIKIMR-14776
ref:627ed67f78a6ddd50b99a4f6124409c936ad549c
-rw-r--r--ydb/core/protos/flat_scheme_op.proto1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp9
-rw-r--r--ydb/core/tx/schemeshard/ut_cdc_stream.cpp41
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp7
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/ls_checks.h1
5 files changed, 58 insertions, 1 deletions
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto
index e3b438d69ed..474a0260185 100644
--- a/ydb/core/protos/flat_scheme_op.proto
+++ b/ydb/core/protos/flat_scheme_op.proto
@@ -729,6 +729,7 @@ message TCdcStreamDescription {
message TCreateCdcStream {
optional string TableName = 1;
optional TCdcStreamDescription StreamDescription = 2;
+ optional uint64 RetentionPeriodSeconds = 3 [default = 86400]; // 1d by default
}
message TAlterCdcStream {
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
index b21831f033a..a87849ab963 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
@@ -591,6 +591,13 @@ TVector<ISubOperationBase::TPtr> CreateNewCdcStream(TOperationId opId, const TTx
<< "Invalid stream mode: " << static_cast<ui32>(streamDesc.GetMode()))};
}
+ const auto retentionPeriod = TDuration::Seconds(op.GetRetentionPeriodSeconds());
+ if (retentionPeriod.Seconds() > TSchemeShard::MaxPQLifetimeSeconds) {
+ return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, TStringBuilder()
+ << "Invalid retention period specified: " << retentionPeriod.Seconds()
+ << ", limit: " << TSchemeShard::MaxPQLifetimeSeconds)};
+ }
+
const ui64 aliveStreams = context.SS->GetAliveChildren(tablePath.Base(), NKikimrSchemeOp::EPathTypeCdcStream);
if (aliveStreams + 1 > tablePath.DomainInfo()->GetSchemeLimits().MaxTableCdcStreams) {
return {CreateReject(opId, NKikimrScheme::EStatus::StatusResourceExhausted, TStringBuilder()
@@ -643,7 +650,7 @@ TVector<ISubOperationBase::TPtr> CreateNewCdcStream(TOperationId opId, const TTx
pqConfig.SetTopicName(streamName);
pqConfig.SetTopicPath(streamPath.Child("streamImpl").PathString());
auto& partitionConfig = *pqConfig.MutablePartitionConfig();
- partitionConfig.SetLifetimeSeconds(TDuration::Days(1).Seconds());
+ partitionConfig.SetLifetimeSeconds(retentionPeriod.Seconds());
for (const auto& tag : table->KeyColumnIds) {
Y_VERIFY(table->Columns.contains(tag));
diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp
index 78d1ec2bfd9..ee7519f0e59 100644
--- a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp
@@ -1,4 +1,5 @@
#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h>
+#include <ydb/core/tx/schemeshard/schemeshard_impl.h>
#include <util/string/printf.h>
@@ -62,6 +63,46 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream/streamImpl"), {NLs::PathNotExist});
}
+ Y_UNIT_TEST(RetentionPeriod) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true));
+ ui64 txId = 100;
+
+ TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "value" Type: "Uint64" }
+ KeyColumnNames: ["key"]
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ for (const auto& rp : {TDuration::Hours(12), TDuration::Days(7), TDuration::Days(60)}) {
+ const auto status = rp.Seconds() <= TSchemeShard::MaxPQLifetimeSeconds
+ ? NKikimrScheme::StatusAccepted
+ : NKikimrScheme::StatusInvalidParameter;
+
+ TestCreateCdcStream(runtime, ++txId, "/MyRoot", Sprintf(R"(
+ TableName: "Table"
+ StreamDescription {
+ Name: "Stream%lu"
+ Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
+ }
+ RetentionPeriodSeconds: %lu
+ )", rp.Seconds(), rp.Seconds()), {status});
+
+ if (status != NKikimrScheme::StatusAccepted) {
+ continue;
+ }
+
+ env.TestWaitNotification(runtime, txId);
+ TestDescribeResult(DescribePrivatePath(runtime, Sprintf("/MyRoot/Table/Stream%lu/streamImpl", rp.Seconds())), {
+ NLs::PathExist,
+ NLs::RetentionPeriod(rp),
+ });
+ }
+ }
+
Y_UNIT_TEST(Negative) {
TTestBasicRuntime runtime;
TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true));
diff --git a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp
index d9c663ee54c..cd08f875dd8 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp
+++ b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp
@@ -699,6 +699,13 @@ TCheckFunc StreamState(NKikimrSchemeOp::ECdcStreamState state) {
};
}
+TCheckFunc RetentionPeriod(const TDuration& value) {
+ return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
+ UNIT_ASSERT_VALUES_EQUAL(value.Seconds(), record.GetPathDescription().GetPersQueueGroup()
+ .GetPQTabletConfig().GetPartitionConfig().GetLifetimeSeconds());
+ };
+}
+
void NoChildren(const NKikimrScheme::TEvDescribeSchemeResult& record) {
ChildrenCount(0)(record);
}
diff --git a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h
index 60f3960bae6..8e0d5f4b941 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h
+++ b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h
@@ -120,6 +120,7 @@ namespace NLs {
TCheckFunc StreamMode(NKikimrSchemeOp::ECdcStreamMode mode);
TCheckFunc StreamFormat(NKikimrSchemeOp::ECdcStreamFormat format);
TCheckFunc StreamState(NKikimrSchemeOp::ECdcStreamState state);
+ TCheckFunc RetentionPeriod(const TDuration& value);
TCheckFunc HasBackupInFly(ui64 txId);
void NoBackupInFly(const NKikimrScheme::TEvDescribeSchemeResult& record);