aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2022-09-14 17:21:05 +0300
committerilnaz <ilnaz@ydb.tech>2022-09-14 17:21:05 +0300
commit459f5cf25ea6d6180fbbb8ef048d73a41876b124 (patch)
tree71920a04deec8e8aedad58b3ee60041c1dd40fc3
parent8bed9e08c47d2f7765bc3637d17cf6be3bead2ed (diff)
downloadydb-459f5cf25ea6d6180fbbb8ef048d73a41876b124.tar.gz
Abortable CreateCdcStream
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp23
-rw-r--r--ydb/core/tx/schemeshard/ut_cdc_stream.cpp30
2 files changed, 41 insertions, 12 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
index e7d71e7213..d2a08fc11c 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
@@ -271,8 +271,9 @@ public:
return result;
}
- void AbortPropose(TOperationContext&) override {
- Y_FAIL("no AbortPropose for TNewCdcStream");
+ void AbortPropose(TOperationContext& context) override {
+ LOG_N("TNewCdcStream AbortPropose"
+ << ": opId# " << OperationId);
}
void AbortUnsafe(TTxId txId, TOperationContext& context) override {
@@ -458,6 +459,10 @@ public:
return result;
}
+ auto guard = context.DbGuard();
+ context.MemChanges.GrabPath(context.SS, tablePath.Base()->PathId);
+ context.MemChanges.GrabNewTxState(context.SS, OperationId);
+
context.DbChanges.PersistTxState(OperationId);
Y_VERIFY(context.SS->Tables.contains(tablePath.Base()->PathId));
@@ -485,8 +490,9 @@ public:
return result;
}
- void AbortPropose(TOperationContext&) override {
- Y_FAIL("no AbortPropose for TNewCdcStreamAtTable");
+ void AbortPropose(TOperationContext& context) override {
+ LOG_N("TNewCdcStreamAtTable AbortPropose"
+ << ": opId# " << OperationId);
}
void AbortUnsafe(TTxId txId, TOperationContext& context) override {
@@ -600,13 +606,6 @@ TVector<ISubOperationBase::TPtr> CreateNewCdcStream(TOperationId opId, const TTx
<< "Invalid stream mode: " << static_cast<ui32>(streamDesc.GetMode()))};
}
- const auto retentionPeriod = TDuration::Seconds(op.GetRetentionPeriodSeconds());
- if (retentionPeriod.Seconds() > TSchemeShard::MaxPQLifetimeSeconds) {
- return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, TStringBuilder()
- << "Invalid retention period specified: " << retentionPeriod.Seconds()
- << ", limit: " << TSchemeShard::MaxPQLifetimeSeconds)};
- }
-
const ui64 aliveStreams = context.SS->GetAliveChildren(tablePath.Base(), NKikimrSchemeOp::EPathTypeCdcStream);
if (aliveStreams + 1 > tablePath.DomainInfo()->GetSchemeLimits().MaxTableCdcStreams) {
return {CreateReject(opId, NKikimrScheme::EStatus::StatusResourceExhausted, TStringBuilder()
@@ -666,7 +665,7 @@ TVector<ISubOperationBase::TPtr> CreateNewCdcStream(TOperationId opId, const TTx
pqConfig.SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS);
auto& partitionConfig = *pqConfig.MutablePartitionConfig();
- partitionConfig.SetLifetimeSeconds(retentionPeriod.Seconds());
+ partitionConfig.SetLifetimeSeconds(op.GetRetentionPeriodSeconds());
partitionConfig.SetWriteSpeedInBytesPerSecond(1_MB); // TODO: configurable write speed
partitionConfig.SetBurstSize(1_MB); // TODO: configurable burst
diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp
index 1707160fc0..8725a18e59 100644
--- a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp
@@ -557,6 +557,8 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
limits.MaxTableCdcStreams = 2;
SetSchemeshardSchemaLimits(runtime, limits);
+ ui32 nStreams = 0;
+
for (ui32 i = 0; i <= limits.MaxTableCdcStreams; ++i) {
const auto status = i < limits.MaxTableCdcStreams
? NKikimrScheme::StatusAccepted
@@ -571,6 +573,10 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
}
)", i), {status});
env.TestWaitNotification(runtime, txId);
+
+ if (status == NKikimrScheme::StatusAccepted) {
+ nStreams++;
+ }
}
limits.MaxChildrenInDir = limits.MaxTableCdcStreams + 1 + 1 /* for index */;
@@ -591,6 +597,30 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
}
)", i), {status});
env.TestWaitNotification(runtime, txId);
+
+ if (status == NKikimrScheme::StatusAccepted) {
+ nStreams++;
+ }
+ }
+
+ limits = TSchemeLimits();
+ limits.MaxPQPartitions = 3;
+ SetSchemeshardSchemaLimits(runtime, limits);
+
+ for (ui32 i = nStreams; i <= limits.MaxPQPartitions; ++i) {
+ const auto status = i < limits.MaxPQPartitions
+ ? NKikimrScheme::StatusAccepted
+ : NKikimrScheme::StatusResourceExhausted;
+
+ TestCreateCdcStream(runtime, ++txId, "/MyRoot", Sprintf(R"(
+ TableName: "Table"
+ StreamDescription {
+ Name: "Stream%u"
+ Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
+ }
+ )", i), {status});
+ env.TestWaitNotification(runtime, txId);
}
}