diff options
| author | Ilnaz Nizametdinov <[email protected]> | 2022-04-22 21:43:25 +0300 |
|---|---|---|
| committer | Ilnaz Nizametdinov <[email protected]> | 2022-04-22 21:43:25 +0300 |
| commit | 0d1c4d5c0dee58c162e1c0649f0e5b394a9306b3 (patch) | |
| tree | 9a872f1be7cb7803efbb679ce43017a022e0e538 | |
| parent | 8f1d749c26573a6a3397dbed564286bd071c69f6 (diff) | |
Configurable retention period: private api KIKIMR-14776
ref:627ed67f78a6ddd50b99a4f6124409c936ad549c
| -rw-r--r-- | ydb/core/protos/flat_scheme_op.proto | 1 | ||||
| -rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp | 9 | ||||
| -rw-r--r-- | ydb/core/tx/schemeshard/ut_cdc_stream.cpp | 41 | ||||
| -rw-r--r-- | ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp | 7 | ||||
| -rw-r--r-- | ydb/core/tx/schemeshard/ut_helpers/ls_checks.h | 1 |
5 files changed, 58 insertions, 1 deletions
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index e3b438d69ed..474a0260185 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -729,6 +729,7 @@ message TCdcStreamDescription { message TCreateCdcStream { optional string TableName = 1; optional TCdcStreamDescription StreamDescription = 2; + optional uint64 RetentionPeriodSeconds = 3 [default = 86400]; // 1d by default } message TAlterCdcStream { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp index b21831f033a..a87849ab963 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp @@ -591,6 +591,13 @@ TVector<ISubOperationBase::TPtr> CreateNewCdcStream(TOperationId opId, const TTx << "Invalid stream mode: " << static_cast<ui32>(streamDesc.GetMode()))}; } + const auto retentionPeriod = TDuration::Seconds(op.GetRetentionPeriodSeconds()); + if (retentionPeriod.Seconds() > TSchemeShard::MaxPQLifetimeSeconds) { + return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, TStringBuilder() + << "Invalid retention period specified: " << retentionPeriod.Seconds() + << ", limit: " << TSchemeShard::MaxPQLifetimeSeconds)}; + } + const ui64 aliveStreams = context.SS->GetAliveChildren(tablePath.Base(), NKikimrSchemeOp::EPathTypeCdcStream); if (aliveStreams + 1 > tablePath.DomainInfo()->GetSchemeLimits().MaxTableCdcStreams) { return {CreateReject(opId, NKikimrScheme::EStatus::StatusResourceExhausted, TStringBuilder() @@ -643,7 +650,7 @@ TVector<ISubOperationBase::TPtr> CreateNewCdcStream(TOperationId opId, const TTx pqConfig.SetTopicName(streamName); pqConfig.SetTopicPath(streamPath.Child("streamImpl").PathString()); auto& partitionConfig = *pqConfig.MutablePartitionConfig(); - partitionConfig.SetLifetimeSeconds(TDuration::Days(1).Seconds()); + partitionConfig.SetLifetimeSeconds(retentionPeriod.Seconds()); for (const auto& tag : table->KeyColumnIds) { Y_VERIFY(table->Columns.contains(tag)); diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp index 78d1ec2bfd9..ee7519f0e59 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp @@ -1,4 +1,5 @@ #include <ydb/core/tx/schemeshard/ut_helpers/helpers.h> +#include <ydb/core/tx/schemeshard/schemeshard_impl.h> #include <util/string/printf.h> @@ -62,6 +63,46 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream/streamImpl"), {NLs::PathNotExist}); } + Y_UNIT_TEST(RetentionPeriod) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true)); + ui64 txId = 100; + + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + KeyColumnNames: ["key"] + )"); + env.TestWaitNotification(runtime, txId); + + for (const auto& rp : {TDuration::Hours(12), TDuration::Days(7), TDuration::Days(60)}) { + const auto status = rp.Seconds() <= TSchemeShard::MaxPQLifetimeSeconds + ? NKikimrScheme::StatusAccepted + : NKikimrScheme::StatusInvalidParameter; + + TestCreateCdcStream(runtime, ++txId, "/MyRoot", Sprintf(R"( + TableName: "Table" + StreamDescription { + Name: "Stream%lu" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + } + RetentionPeriodSeconds: %lu + )", rp.Seconds(), rp.Seconds()), {status}); + + if (status != NKikimrScheme::StatusAccepted) { + continue; + } + + env.TestWaitNotification(runtime, txId); + TestDescribeResult(DescribePrivatePath(runtime, Sprintf("/MyRoot/Table/Stream%lu/streamImpl", rp.Seconds())), { + NLs::PathExist, + NLs::RetentionPeriod(rp), + }); + } + } + Y_UNIT_TEST(Negative) { TTestBasicRuntime runtime; TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true)); diff --git a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp index d9c663ee54c..cd08f875dd8 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp @@ -699,6 +699,13 @@ TCheckFunc StreamState(NKikimrSchemeOp::ECdcStreamState state) { }; } +TCheckFunc RetentionPeriod(const TDuration& value) { + return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) { + UNIT_ASSERT_VALUES_EQUAL(value.Seconds(), record.GetPathDescription().GetPersQueueGroup() + .GetPQTabletConfig().GetPartitionConfig().GetLifetimeSeconds()); + }; +} + void NoChildren(const NKikimrScheme::TEvDescribeSchemeResult& record) { ChildrenCount(0)(record); } diff --git a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h index 60f3960bae6..8e0d5f4b941 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h +++ b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h @@ -120,6 +120,7 @@ namespace NLs { TCheckFunc StreamMode(NKikimrSchemeOp::ECdcStreamMode mode); TCheckFunc StreamFormat(NKikimrSchemeOp::ECdcStreamFormat format); TCheckFunc StreamState(NKikimrSchemeOp::ECdcStreamState state); + TCheckFunc RetentionPeriod(const TDuration& value); TCheckFunc HasBackupInFly(ui64 txId); void NoBackupInFly(const NKikimrScheme::TEvDescribeSchemeResult& record); |
