diff options
author | ilnaz <ilnaz@ydb.tech> | 2022-09-14 17:21:05 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2022-09-14 17:21:05 +0300 |
commit | 459f5cf25ea6d6180fbbb8ef048d73a41876b124 (patch) | |
tree | 71920a04deec8e8aedad58b3ee60041c1dd40fc3 | |
parent | 8bed9e08c47d2f7765bc3637d17cf6be3bead2ed (diff) | |
download | ydb-459f5cf25ea6d6180fbbb8ef048d73a41876b124.tar.gz |
Abortable CreateCdcStream
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp | 23 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_cdc_stream.cpp | 30 |
2 files changed, 41 insertions, 12 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp index e7d71e7213..d2a08fc11c 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp @@ -271,8 +271,9 @@ public: return result; } - void AbortPropose(TOperationContext&) override { - Y_FAIL("no AbortPropose for TNewCdcStream"); + void AbortPropose(TOperationContext& context) override { + LOG_N("TNewCdcStream AbortPropose" + << ": opId# " << OperationId); } void AbortUnsafe(TTxId txId, TOperationContext& context) override { @@ -458,6 +459,10 @@ public: return result; } + auto guard = context.DbGuard(); + context.MemChanges.GrabPath(context.SS, tablePath.Base()->PathId); + context.MemChanges.GrabNewTxState(context.SS, OperationId); + context.DbChanges.PersistTxState(OperationId); Y_VERIFY(context.SS->Tables.contains(tablePath.Base()->PathId)); @@ -485,8 +490,9 @@ public: return result; } - void AbortPropose(TOperationContext&) override { - Y_FAIL("no AbortPropose for TNewCdcStreamAtTable"); + void AbortPropose(TOperationContext& context) override { + LOG_N("TNewCdcStreamAtTable AbortPropose" + << ": opId# " << OperationId); } void AbortUnsafe(TTxId txId, TOperationContext& context) override { @@ -600,13 +606,6 @@ TVector<ISubOperationBase::TPtr> CreateNewCdcStream(TOperationId opId, const TTx << "Invalid stream mode: " << static_cast<ui32>(streamDesc.GetMode()))}; } - const auto retentionPeriod = TDuration::Seconds(op.GetRetentionPeriodSeconds()); - if (retentionPeriod.Seconds() > TSchemeShard::MaxPQLifetimeSeconds) { - return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, TStringBuilder() - << "Invalid retention period specified: " << retentionPeriod.Seconds() - << ", limit: " << TSchemeShard::MaxPQLifetimeSeconds)}; - } - const ui64 aliveStreams = context.SS->GetAliveChildren(tablePath.Base(), NKikimrSchemeOp::EPathTypeCdcStream); if (aliveStreams + 1 > tablePath.DomainInfo()->GetSchemeLimits().MaxTableCdcStreams) { return {CreateReject(opId, NKikimrScheme::EStatus::StatusResourceExhausted, TStringBuilder() @@ -666,7 +665,7 @@ TVector<ISubOperationBase::TPtr> CreateNewCdcStream(TOperationId opId, const TTx pqConfig.SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS); auto& partitionConfig = *pqConfig.MutablePartitionConfig(); - partitionConfig.SetLifetimeSeconds(retentionPeriod.Seconds()); + partitionConfig.SetLifetimeSeconds(op.GetRetentionPeriodSeconds()); partitionConfig.SetWriteSpeedInBytesPerSecond(1_MB); // TODO: configurable write speed partitionConfig.SetBurstSize(1_MB); // TODO: configurable burst diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp index 1707160fc0..8725a18e59 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp @@ -557,6 +557,8 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { limits.MaxTableCdcStreams = 2; SetSchemeshardSchemaLimits(runtime, limits); + ui32 nStreams = 0; + for (ui32 i = 0; i <= limits.MaxTableCdcStreams; ++i) { const auto status = i < limits.MaxTableCdcStreams ? NKikimrScheme::StatusAccepted @@ -571,6 +573,10 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { } )", i), {status}); env.TestWaitNotification(runtime, txId); + + if (status == NKikimrScheme::StatusAccepted) { + nStreams++; + } } limits.MaxChildrenInDir = limits.MaxTableCdcStreams + 1 + 1 /* for index */; @@ -591,6 +597,30 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { } )", i), {status}); env.TestWaitNotification(runtime, txId); + + if (status == NKikimrScheme::StatusAccepted) { + nStreams++; + } + } + + limits = TSchemeLimits(); + limits.MaxPQPartitions = 3; + SetSchemeshardSchemaLimits(runtime, limits); + + for (ui32 i = nStreams; i <= limits.MaxPQPartitions; ++i) { + const auto status = i < limits.MaxPQPartitions + ? NKikimrScheme::StatusAccepted + : NKikimrScheme::StatusResourceExhausted; + + TestCreateCdcStream(runtime, ++txId, "/MyRoot", Sprintf(R"( + TableName: "Table" + StreamDescription { + Name: "Stream%u" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + } + )", i), {status}); + env.TestWaitNotification(runtime, txId); } } |