aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorandrew-rykov <arykov@ydb.tech>2022-08-12 13:10:40 +0300
committerandrew-rykov <arykov@ydb.tech>2022-08-12 13:10:40 +0300
commit4e6523601af8960a820bdd1cf3f3f0c3e1c0929a (patch)
treefbd6deb1a80b4372a8906b098ccea6062c95982b
parent31d848f5b32d3baffd1209315c00234e753a755b (diff)
downloadydb-4e6523601af8960a820bdd1cf3f3f0c3e1c0929a.tar.gz
limit infly operations
-rw-r--r--ydb/core/protos/config.proto9
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_alter_bsv.cpp5
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp11
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_alter_extsubdomain.cpp5
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_alter_fs.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_alter_index.cpp10
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_alter_kesus.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_store.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_alter_solomon.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_alter_subdomain.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_alter_table.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_alter_user_attrs.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_backup_restore_common.h4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_bsv.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp8
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_extsubdomain.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_fs.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_index.cpp14
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_kesus.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_lock.cpp10
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_replication.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_rtmr.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_sequence.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_solomon.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_subdomain.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_table.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_bsv.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp9
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_extsubdomain.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_fs.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_index.cpp5
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp5
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_kesus.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_lock.cpp10
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_store.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_replication.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_sequence.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_solomon.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_subdomain.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_table.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_unsafe.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_finalize_build_index.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_initiate_build_index.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_mkdir.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_move_index.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_move_table.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_move_table_index.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_rmdir.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_upgrade_subdomain.cpp8
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp44
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.h9
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_tx_infly.h8
-rw-r--r--ydb/core/tx/schemeshard/ut_split_merge.cpp82
62 files changed, 415 insertions, 17 deletions
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 7b066290da..a77dff0c1e 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -6,6 +6,7 @@ import "ydb/core/protos/blobstorage_vdisk_config.proto";
import "ydb/core/protos/blobstorage_pdisk_config.proto";
import "ydb/core/protos/cms.proto";
import "ydb/core/protos/config_units.proto";
+import "ydb/core/protos/counters_schemeshard.proto";
import "ydb/core/protos/drivemodel.proto";
import "ydb/core/protos/flat_scheme_op.proto";
import "ydb/core/protos/http_config.proto";
@@ -1436,6 +1437,12 @@ message TDataShardConfig {
}
message TSchemeShardConfig {
+ message TInFlightCounterConfig {
+ optional NKikimr.NSchemeShard.ESimpleCounters Type = 1;
+ // after this amount scheme shard begin to abort the operations
+ // to disable set to 0
+ optional uint32 InFlightLimit = 2 [default = 10000];
+ }
// after this amount of time we forcely write full stats to local DB
// to disable set to 0
optional uint32 StatsBatchTimeoutMs = 1 [default = 100];
@@ -1445,6 +1452,8 @@ message TSchemeShardConfig {
optional uint32 StatsMaxBatchSize = 2 [default = 100];
optional uint32 StatsMaxExecuteMs = 3 [default = 10];
+
+ repeated TInFlightCounterConfig InFlightCounterConfig = 4;
}
message TCompactionConfig {
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_bsv.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_bsv.cpp
index 899be0d8e9..12e0ae984e 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_bsv.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_bsv.cpp
@@ -622,6 +622,11 @@ public:
volume->ForgetAlter();
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxAlterBlockStoreVolume, errStr))
+ {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
// Increase in occupied space is applied immediately
domainDir->ChangeVolumeSpaceBegin(newVolumeSpace, oldVolumeSpace);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp
index b8c2dbc973..ab296763d3 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp
@@ -172,6 +172,12 @@ public:
return result;
}
}
+ TString errStr;
+ if (!context.SS->CheckInFlightLimit(TTxState::TxAlterCdcStream, errStr))
+ {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
auto guard = context.DbGuard();
context.DbChanges.PersistAlterCdcStream(streamPath.Base()->PathId);
@@ -393,6 +399,11 @@ public:
result->SetError(NKikimrScheme::StatusMultipleModifications, errStr);
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxAlterCdcStreamAtTable, errStr))
+ {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
context.DbChanges.PersistTxState(OperationId);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_extsubdomain.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_extsubdomain.cpp
index dd3ad4bfe4..bef536e106 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_extsubdomain.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_extsubdomain.cpp
@@ -382,6 +382,11 @@ public:
result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr);
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxAlterExtSubDomain, errStr))
+ {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
if (settings.HasDeclaredSchemeQuotas()) {
alterData->SetDeclaredSchemeQuotas(settings.GetDeclaredSchemeQuotas());
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_fs.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_fs.cpp
index 64a1cb8ba4..b602718eb5 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_fs.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_fs.cpp
@@ -404,6 +404,10 @@ THolder<TProposeResponse> TAlterFileStore::Propose(
result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr);
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxAlterFileStore, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
fs->PrepareAlter(*alterConfig);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_index.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_index.cpp
index c4c3261740..25aa7885e4 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_index.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_index.cpp
@@ -205,10 +205,14 @@ public:
result->SetPathId(dstPath.Base()->PathId.LocalPathId);
- TString errMsg;
+ TString errStr;
- if (!context.SS->CheckLocks(parentPath.Base()->PathId, Transaction, errMsg)) {
- result->SetError(NKikimrScheme::StatusMultipleModifications, errMsg);
+ if (!context.SS->CheckLocks(parentPath.Base()->PathId, Transaction, errStr)) {
+ result->SetError(NKikimrScheme::StatusMultipleModifications, errStr);
+ return result;
+ }
+ if (!context.SS->CheckInFlightLimit(TTxState::TxAlterTableIndex, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
return result;
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_kesus.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_kesus.cpp
index 33240bada8..4bff97af01 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_kesus.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_kesus.cpp
@@ -361,6 +361,10 @@ public:
result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr);
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxAlterKesus, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
kesus->AlterConfig.Reset(new Ydb::Coordination::Config);
kesus->AlterConfig->CopyFrom(kesus->Config);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_store.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_store.cpp
index c94721a26c..d8423e390e 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_store.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_store.cpp
@@ -669,6 +669,10 @@ public:
result->SetError(status, errStr);
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxAlterOlapStore, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
NIceDb::TNiceDb db(context.GetDB());
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp
index 14e1114e85..75b65912fe 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp
@@ -496,6 +496,10 @@ public:
result->SetError(status, errStr);
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxAlterColumnTable, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
Y_VERIFY(storeInfo->ColumnTables.contains(path->PathId));
storeInfo->ColumnTablesUnderOperation.insert(path->PathId);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp
index da870c9a5b..52ccd544ba 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp
@@ -589,6 +589,10 @@ public:
"Unable to construct channel binding for PQ with the storage pool");
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxAlterPQGroup, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
// This channel bindings are for PersQueue shards. They either use
// explicit channel profiles, or reuse channel profile above.
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_solomon.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_solomon.cpp
index b7c1275ff8..9aa2d32328 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_solomon.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_solomon.cpp
@@ -291,6 +291,10 @@ public:
result->SetError(NKikimrScheme::StatusInvalidParameter, "Unable to construct channel binding with the storage pool");
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxAlterSolomonVolume, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
result->SetPathId(path.Base()->PathId.LocalPathId);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_subdomain.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_subdomain.cpp
index 6e977dc4f6..884308c708 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_subdomain.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_subdomain.cpp
@@ -312,6 +312,10 @@ public:
result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr);
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxAlterSubDomain, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
if (settings.HasDeclaredSchemeQuotas()) {
alterData->SetDeclaredSchemeQuotas(settings.GetDeclaredSchemeQuotas());
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_table.cpp
index fec8cb34e6..5378350ecf 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_table.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_table.cpp
@@ -600,6 +600,10 @@ public:
return result;
}
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxAlterTable, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
table->PrepareAlter(alterData);
PrepareChanges(OperationId, path.Base(), table, bindingChanges, context);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_user_attrs.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_user_attrs.cpp
index 96f7bca898..d648fc5c8c 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_user_attrs.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_user_attrs.cpp
@@ -83,6 +83,10 @@ public:
result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr);
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxAlterUserAttributes, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
NIceDb::TNiceDb db(context.GetDB());
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_backup_restore_common.h b/ydb/core/tx/schemeshard/schemeshard__operation_backup_restore_common.h
index cda438a963..b65b3b1715 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_backup_restore_common.h
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_backup_restore_common.h
@@ -638,6 +638,10 @@ public:
result->SetError(NKikimrScheme::StatusMultipleModifications, errStr);
return result;
}
+ if (!context.SS->CheckInFlightLimit(TxType, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
PrepareChanges(path.Base(), context);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp
index 5e6612a946..3f41a88f4e 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp
@@ -545,6 +545,10 @@ public:
return result;
}
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxCopyTable, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
auto guard = context.DbGuard();
TPathId allocatedPathId = context.SS->AllocatePathId();
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_bsv.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_bsv.cpp
index ea2c10eb13..cbb66037aa 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_bsv.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_bsv.cpp
@@ -373,6 +373,10 @@ public:
result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr);
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxCreateBlockStoreVolume, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
dstPath.MaterializeLeaf(owner);
result->SetPathId(dstPath.Base()->PathId.LocalPathId);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
index de5dc705ba..f2cd544cad 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
@@ -224,6 +224,10 @@ public:
result->SetError(NKikimrScheme::StatusMultipleModifications, errStr);
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxCreateCdcStream, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
auto stream = TCdcStreamInfo::Create(streamDesc);
Y_VERIFY(stream);
@@ -449,6 +453,10 @@ public:
result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr);
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxCreateCdcStreamAtTable, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
context.DbChanges.PersistTxState(OperationId);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_extsubdomain.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_extsubdomain.cpp
index b608520af1..2f020ba4d8 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_extsubdomain.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_extsubdomain.cpp
@@ -217,6 +217,10 @@ public:
result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr);
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxCreateExtSubDomain, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
dstPath.MaterializeLeaf(owner);
result->SetPathId(dstPath.Base()->PathId.LocalPathId);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_fs.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_fs.cpp
index 54b9125fd0..e835cd2403 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_fs.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_fs.cpp
@@ -428,6 +428,10 @@ THolder<TProposeResponse> TCreateFileStore::Propose(
result->SetError(status, errStr);
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxCreateFileStore, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
dstPath.MaterializeLeaf(owner);
result->SetPathId(dstPath.Base()->PathId.LocalPathId);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_index.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_index.cpp
index cfcec10af4..a7f7184b94 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_index.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_index.cpp
@@ -215,21 +215,25 @@ public:
}
}
- TString errMsg;
+ TString errStr;
- if (!context.SS->CheckLocks(parentPath.Base()->PathId, Transaction, errMsg)) {
- result->SetError(NKikimrScheme::StatusMultipleModifications, errMsg);
+ if (!context.SS->CheckLocks(parentPath.Base()->PathId, Transaction, errStr)) {
+ result->SetError(NKikimrScheme::StatusMultipleModifications, errStr);
return result;
}
TTableIndexInfo::TPtr newIndexData = nullptr;
{
- newIndexData = TTableIndexInfo::Create(tableIndexCreation, errMsg);
+ newIndexData = TTableIndexInfo::Create(tableIndexCreation, errStr);
if (!newIndexData) {
- result->SetError(TEvSchemeShard::EStatus::StatusInvalidParameter, errMsg);
+ result->SetError(TEvSchemeShard::EStatus::StatusInvalidParameter, errStr);
return result;
}
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxCreateTableIndex, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
auto guard = context.DbGuard();
TPathId allocatedPathId = context.SS->AllocatePathId();
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_kesus.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_kesus.cpp
index f44ee9e0a2..9cc2b446ac 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_kesus.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_kesus.cpp
@@ -411,6 +411,10 @@ public:
result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr);
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxCreateKesus, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
dstPath.MaterializeLeaf(owner);
result->SetPathId(dstPath.Base()->PathId.LocalPathId);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_lock.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_lock.cpp
index b2a2e90320..4e56045032 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_lock.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_lock.cpp
@@ -248,9 +248,13 @@ public:
return result;
}
- TString errMsg;
- if (!context.SS->CheckLocks(pathId, Transaction, errMsg)) {
- result->SetError(NKikimrScheme::StatusMultipleModifications, errMsg);
+ TString errStr;
+ if (!context.SS->CheckLocks(pathId, Transaction, errStr)) {
+ result->SetError(NKikimrScheme::StatusMultipleModifications, errStr);
+ return result;
+ }
+ if (!context.SS->CheckInFlightLimit(TTxState::TxCreateLockForIndexBuild, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
return result;
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp
index ad91e488ec..ffa94ef319 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp
@@ -645,6 +645,10 @@ public:
result->SetError(NKikimrScheme::StatusInvalidParameter, errStr);
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxCreateOlapStore, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
const ui64 shardsToCreate = storeInfo->ColumnShards.size();
{
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp
index 07006a0203..9ae12a29f7 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp
@@ -729,6 +729,10 @@ public:
result->SetError(status, errStr);
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxCreateColumnTable, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
dstPath.MaterializeLeaf(owner);
result->SetPathId(dstPath.Base()->PathId.LocalPathId);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp
index f5976ca648..4aca35cf9a 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp
@@ -471,6 +471,10 @@ public:
} else {
pqChannelsBinding = tabletChannelsBinding;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxCreatePQGroup, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
dstPath.MaterializeLeaf(owner);
result->SetPathId(dstPath.Base()->PathId.LocalPathId);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_replication.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_replication.cpp
index d61c5f1134..666ebb6e46 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_replication.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_replication.cpp
@@ -349,6 +349,10 @@ public:
return result;
}
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxCreateReplication, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
path.MaterializeLeaf(owner);
path->CreateTxId = OperationId.GetTxId();
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_rtmr.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_rtmr.cpp
index d2c169f432..52f7007f25 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_rtmr.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_rtmr.cpp
@@ -329,6 +329,10 @@ public:
result->SetError(NKikimrScheme::StatusInvalidParameter, "Unable to construct channel binding with the storage pool");
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxCreateRtmrVolume, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
dstPath.MaterializeLeaf(owner);
result->SetPathId(dstPath.Base()->PathId.LocalPathId);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_sequence.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_sequence.cpp
index f66755dce1..4ca83560e5 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_sequence.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_sequence.cpp
@@ -434,6 +434,10 @@ public:
return result;
}
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxCreateSequence, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
dstPath.MaterializeLeaf(owner);
result->SetPathId(dstPath->PathId.LocalPathId);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_solomon.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_solomon.cpp
index 6aa4b20d30..9a67d43987 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_solomon.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_solomon.cpp
@@ -364,6 +364,10 @@ public:
result->SetError(NKikimrScheme::StatusInvalidParameter, "Unable to construct channel binding with the storage pool");
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxCreateSolomonVolume, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
dstPath.MaterializeLeaf(owner);
result->SetPathId(dstPath.Base()->PathId.LocalPathId);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_subdomain.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_subdomain.cpp
index 2d9d180e36..f863406ec9 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_subdomain.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_subdomain.cpp
@@ -272,6 +272,10 @@ public:
result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr);
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxCreateSubDomain, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
dstPath.MaterializeLeaf(owner);
result->SetPathId(dstPath.Base()->PathId.LocalPathId);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_table.cpp
index 0d111051cd..07825405ee 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_table.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_table.cpp
@@ -619,6 +619,10 @@ public:
result->SetError(NKikimrScheme::StatusInvalidParameter, errStr);
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxCreateTable, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
dstPath.MaterializeLeaf(owner);
result->SetPathId(dstPath.Base()->PathId.LocalPathId);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_bsv.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_bsv.cpp
index 69efddbf05..66638d4efc 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_bsv.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_bsv.cpp
@@ -249,6 +249,10 @@ public:
result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr);
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxDropBlockStoreVolume, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxDropBlockStoreVolume, path.Base()->PathId);
// Dirty hack: drop step must not be zero because 0 is treated as "hasn't been dropped"
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp
index c0d3dd5ba3..db02b475aa 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp
@@ -180,6 +180,11 @@ public:
return result;
}
}
+ TString errStr;
+ if (!context.SS->CheckInFlightLimit(TTxState::TxDropCdcStream, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
auto guard = context.DbGuard();
context.DbChanges.PersistTxState(OperationId);
@@ -384,6 +389,10 @@ public:
result->SetError(NKikimrScheme::StatusMultipleModifications, errStr);
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxDropCdcStreamAtTable, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
context.DbChanges.PersistTxState(OperationId);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_extsubdomain.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_extsubdomain.cpp
index 9dd0dae03a..5db794ccf0 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_extsubdomain.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_extsubdomain.cpp
@@ -327,6 +327,10 @@ public:
result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr);
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxForceDropExtSubDomain, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxForceDropExtSubDomain, path.Base()->PathId);
txState.State = TTxState::Waiting;
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_fs.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_fs.cpp
index d76e89c4f8..2f99a3b367 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_fs.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_fs.cpp
@@ -302,6 +302,10 @@ THolder<TProposeResponse> TDropFileStore::Propose(
result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr);
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxDropFileStore, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxDropFileStore, path.Base()->PathId);
// Dirty hack: operation step must not be zero because 0 is treated as "hasn't been operationped"
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_index.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_index.cpp
index 40dde95922..61958f7d75 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_index.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_index.cpp
@@ -362,7 +362,10 @@ public:
return result;
}
}
-
+ if (!context.SS->CheckInFlightLimit(TTxState::TxDropTableIndexAtMainTable, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
Y_VERIFY(context.SS->Tables.contains(tablePath.Base()->PathId));
TTableInfo::TPtr table = context.SS->Tables.at(tablePath.Base()->PathId);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp
index d471698590..446c86b93b 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp
@@ -367,6 +367,11 @@ public:
return result;
}
}
+ TString errStr;
+ if (!context.SS->CheckInFlightLimit(TTxState::TxDropTableIndex, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
Y_VERIFY(context.SS->Indexes.contains(index.Base()->PathId));
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_kesus.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_kesus.cpp
index 947ca40808..c8830da7f1 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_kesus.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_kesus.cpp
@@ -244,6 +244,10 @@ public:
result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr);
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxDropKesus, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxDropKesus, path.Base()->PathId);
// Dirty hack: drop step must not be zero because 0 is treated as "hasn't been dropped"
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_lock.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_lock.cpp
index c99cc5664a..464aa1e142 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_lock.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_lock.cpp
@@ -147,9 +147,13 @@ public:
return result;
}
- TString errMsg;
- if (!context.SS->CheckLocks(pathId, Transaction, errMsg)) {
- result->SetError(NKikimrScheme::StatusMultipleModifications, errMsg);
+ TString errStr;
+ if (!context.SS->CheckLocks(pathId, Transaction, errStr)) {
+ result->SetError(NKikimrScheme::StatusMultipleModifications, errStr);
+ return result;
+ }
+ if (!context.SS->CheckInFlightLimit(TTxState::TxDropLock, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
return result;
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_store.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_store.cpp
index 02cd024c7c..369b25fa8b 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_store.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_store.cpp
@@ -354,6 +354,10 @@ public:
result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr);
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxDropOlapStore, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxDropOlapStore, path.Base()->PathId);
txState.State = TTxState::DropParts;
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp
index 15462236ae..21719ccdde 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp
@@ -407,6 +407,10 @@ public:
result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr);
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxDropColumnTable, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
Y_VERIFY(storeInfo->ColumnTables.contains(path->PathId));
storeInfo->ColumnTablesUnderOperation.insert(path->PathId);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp
index a85a65dca2..6a9fd0e230 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp
@@ -469,6 +469,10 @@ public:
result->SetError(NKikimrScheme::StatusMultipleModifications, "Drop over Create/Alter");
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxDropPQGroup, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxDropPQGroup, path.Base()->PathId);
// Dirty hack: drop step must not be zero because 0 is treated as "hasn't been dropped"
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_replication.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_replication.cpp
index f6acf0a4d8..92ea900e21 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_replication.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_replication.cpp
@@ -301,6 +301,10 @@ public:
result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr);
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxDropReplication, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
Y_VERIFY(context.SS->Replications.contains(path->PathId));
auto replication = context.SS->Replications.at(path->PathId);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_sequence.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_sequence.cpp
index 16e1cad401..a37c4a204b 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_sequence.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_sequence.cpp
@@ -364,6 +364,10 @@ public:
result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr);
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxDropSequence, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxDropSequence, path->PathId);
txState.State = TTxState::DropParts;
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_solomon.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_solomon.cpp
index 71b3042e6b..949a4b4fa9 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_solomon.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_solomon.cpp
@@ -228,6 +228,10 @@ public:
result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr);
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxDropSolomonVolume, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxDropSolomonVolume, path.Base()->PathId);
// Dirty hack: drop step must not be zero because 0 is treated as "hasn't been dropped"
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_subdomain.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_subdomain.cpp
index e29d217d9b..42b51ed9c9 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_subdomain.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_subdomain.cpp
@@ -227,6 +227,10 @@ public:
result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr);
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxDropSubDomain, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxDropSubDomain, path.Base()->PathId);
// Dirty hack: drop step must not be zero because 0 is treated as "hasn't been dropped"
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_table.cpp
index c558b21fbc..2d9e55d2ca 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_table.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_table.cpp
@@ -597,6 +597,10 @@ public:
result->SetError(NKikimrScheme::StatusMultipleModifications, errStr);
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxDropTable, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
auto guard = context.DbGuard();
context.MemChanges.GrabNewTxState(context.SS, OperationId);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_unsafe.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_unsafe.cpp
index 3652b0b24c..b341707a66 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_unsafe.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_unsafe.cpp
@@ -263,6 +263,10 @@ public:
result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr);
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxForceDropSubDomain, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxForceDropSubDomain, path.Base()->PathId);
txState.State = TTxState::Waiting;
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_finalize_build_index.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_finalize_build_index.cpp
index 60b96ae464..e816ce300a 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_finalize_build_index.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_finalize_build_index.cpp
@@ -406,6 +406,10 @@ public:
result->SetError(TEvSchemeShard::EStatus::StatusPathDoesNotExist, errStr);
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxFinalizeBuildIndex, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
NIceDb::TNiceDb db(context.GetDB());
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_initiate_build_index.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_initiate_build_index.cpp
index 4d41f250b4..0428224913 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_initiate_build_index.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_initiate_build_index.cpp
@@ -425,6 +425,10 @@ public:
result->SetError(TEvSchemeShard::EStatus::StatusSchemeError, errStr);
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxInitializeBuildIndex, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
NIceDb::TNiceDb db(context.GetDB());
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_mkdir.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_mkdir.cpp
index 9263411105..8d6e6e9262 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_mkdir.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_mkdir.cpp
@@ -221,6 +221,10 @@ public:
result->SetError(NKikimrScheme::StatusInvalidParameter, errStr);
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxMkDir, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
auto guard = context.DbGuard();
TPathId allocatedPathId = context.SS->AllocatePathId();
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_move_index.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_move_index.cpp
index 8ebf3bd757..b565e4ac90 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_move_index.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_move_index.cpp
@@ -436,6 +436,10 @@ public:
result->SetError(NKikimrScheme::StatusMultipleModifications, errStr);
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxUpdateMainTableOnIndexMove, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
Y_VERIFY(context.SS->Tables.contains(tablePath.Base()->PathId));
TTableInfo::TPtr table = context.SS->Tables.at(tablePath.Base()->PathId);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_move_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_move_table.cpp
index dc46563bab..0a6c643789 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_move_table.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_move_table.cpp
@@ -701,6 +701,10 @@ public:
result->SetError(NKikimrScheme::StatusMultipleModifications, errStr);
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxMoveTable, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
auto guard = context.DbGuard();
TPathId allocatedPathId = context.SS->AllocatePathId();
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_move_table_index.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_move_table_index.cpp
index 360126a2b5..ae6aa44978 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_move_table_index.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_move_table_index.cpp
@@ -515,6 +515,10 @@ public:
result->SetError(NKikimrScheme::StatusMultipleModifications, errStr);
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxMoveTableIndex, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
auto guard = context.DbGuard();
TPathId allocatedPathId = context.SS->AllocatePathId();
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_rmdir.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_rmdir.cpp
index 3701119300..b0ecf787b4 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_rmdir.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_rmdir.cpp
@@ -76,6 +76,10 @@ public:
result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr);
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxRmDir, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
NIceDb::TNiceDb db(context.GetDB());
TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxRmDir, path.Base()->PathId);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp
index 6716e81d75..ace2f95390 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp
@@ -954,6 +954,10 @@ public:
result->SetError(NKikimrScheme::StatusInvalidParameter, "Invalid request: only 1->N or N->1 are supported");
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxSplitTablePartition, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
///////////
/// Accept operation
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_upgrade_subdomain.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_upgrade_subdomain.cpp
index 500bac6ec4..b83f4e6028 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_upgrade_subdomain.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_upgrade_subdomain.cpp
@@ -1231,6 +1231,10 @@ public:
result->SetError(NKikimrScheme::StatusMultipleModifications, msg);
return result;
}
+ if (!context.SS->CheckInFlightLimit(TTxState::TxUpgradeSubDomain, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
TSubDomainInfo::TPtr alterData = new TSubDomainInfo(*subDomain,
subDomain->GetPlanResolution(),
@@ -1496,6 +1500,10 @@ public:
result->SetError(NKikimrScheme::StatusInvalidParameter, errStr);
return result;
};
+ if (!context.SS->CheckInFlightLimit(TTxState::TxUpgradeSubDomainDecision, errStr)) {
+ result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ return result;
+ }
TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxUpgradeSubDomainDecision, path.Base()->PathId);
txState.State = TTxState::Waiting;
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index 6a1c138573..49c45cf3ab 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -3839,6 +3839,7 @@ void TSchemeShard::OnActivateExecutor(const TActorContext &ctx) {
ConfigureCompactionQueues(appData->CompactionConfig, ctx);
ConfigureStatsBatching(appData->SchemeShardConfig, ctx);
+ ConfigureStatsOperations(appData->SchemeShardConfig, ctx);
if (appData->ChannelProfiles) {
ChannelProfiles = appData->ChannelProfiles;
@@ -6099,7 +6100,9 @@ void TSchemeShard::ApplyConsoleConfigs(const NKikimrConfig::TAppConfig& appConfi
}
if (appConfig.HasSchemeShardConfig()) {
- ConfigureStatsBatching(appConfig.GetSchemeShardConfig(), ctx);
+ const auto& schemeShardConfig = appConfig.GetSchemeShardConfig();
+ ConfigureStatsBatching(schemeShardConfig, ctx);
+ ConfigureStatsOperations(schemeShardConfig, ctx);
}
if (IsShemeShardConfigured()) {
@@ -6132,6 +6135,45 @@ void TSchemeShard::ConfigureStatsBatching(const NKikimrConfig::TSchemeShardConfi
<< ", StatsMaxExecuteTime# " << StatsMaxExecuteTime);
}
+void TSchemeShard::ConfigureStatsOperations(const NKikimrConfig::TSchemeShardConfig& config, const TActorContext& ctx) {
+ for (const auto& operationConfig: config.GetInFlightCounterConfig()) {
+ ui32 limit = operationConfig.GetInFlightLimit();
+ auto txState = TTxState::ConvertToTxType(operationConfig.GetType());
+ InFlightLimits[txState] = limit;
+ }
+
+ if (InFlightLimits.empty()) {
+ NKikimrConfig::TSchemeShardConfig_TInFlightCounterConfig inFlightCounterConfig;
+ auto defaultInFlightLimit = inFlightCounterConfig.GetInFlightLimit();
+ InFlightLimits[TTxState::ETxType::TxSplitTablePartition] = defaultInFlightLimit;
+ InFlightLimits[TTxState::ETxType::TxMergeTablePartition] = defaultInFlightLimit;
+ LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "OperationsProcessing config: using default configuration");
+ }
+
+ for (auto it = InFlightLimits.begin(); it != InFlightLimits.end(); ++it) {
+ LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "OperationsProcessing config: type " << TTxState::TypeName(it->first)
+ << ", limit " << it->second);
+ }
+}
+
+bool TSchemeShard::CheckInFlightLimit(const TTxState::ETxType txType, TString& errStr) const {
+ auto it = InFlightLimits.find(txType);
+ if (it == InFlightLimits.end()) {
+ return true;
+ }
+ if (it->second != 0 && TabletCounters->Simple()[TTxState::TxTypeInFlightCounter(txType)].Get() >= it->second)
+ {
+ errStr = TStringBuilder() << "the limit of operations with type " << TTxState::TypeName(txType)
+ << " has been exceeded"
+ << ", limit: " << it->second;
+ return false;
+ }
+
+ return true;
+}
+
void TSchemeShard::ConfigureCompactionQueues(
const NKikimrConfig::TCompactionConfig& compactionConfig,
const TActorContext &ctx)
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h
index d68e328952..6c6be75b96 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.h
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.h
@@ -258,6 +258,7 @@ public:
TDuration StatsMaxExecuteTime;
TDuration StatsBatchTimeout;
ui32 StatsMaxBatchSize = 0;
+ THashMap<TTxState::ETxType, ui32> InFlightLimits;
// time when we opened the batch
TMonotonic StatsBatchStartTs;
@@ -382,6 +383,10 @@ public:
const NKikimrConfig::TSchemeShardConfig& config,
const TActorContext &ctx);
+ void ConfigureStatsOperations(
+ const NKikimrConfig::TSchemeShardConfig& config,
+ const TActorContext &ctx);
+
void ConfigureCompactionQueues(
const NKikimrConfig::TCompactionConfig& config,
const TActorContext &ctx);
@@ -394,6 +399,10 @@ public:
const NKikimrConfig::TCompactionConfig::TBorrowedCompactionConfig& config,
const TActorContext &ctx);
+ bool CheckInFlightLimit(
+ const TTxState::ETxType txType,
+ TString& errStr) const;
+
void StartStopCompactionQueues();
bool ApplyStorageConfig(const TStoragePools& storagePools,
diff --git a/ydb/core/tx/schemeshard/schemeshard_tx_infly.h b/ydb/core/tx/schemeshard/schemeshard_tx_infly.h
index e6260a0e34..c1dc7fbe91 100644
--- a/ydb/core/tx/schemeshard/schemeshard_tx_infly.h
+++ b/ydb/core/tx/schemeshard/schemeshard_tx_infly.h
@@ -38,6 +38,7 @@ struct TTxState {
#define TX_STATE_ENUM_NAME(n, ...) case n: return #n;
#define TX_STATE_IN_FLIGHT_COUNTER(n, ...) case n: return COUNTER_IN_FLIGHT_OPS_##n ;
#define TX_STATE_FINISHED_COUNTER(n, ...) case n: return COUNTER_FINISHED_OPS_##n ;
+ #define TX_STATE_FROM_COUNTER(n, ...) case ESimpleCounters::COUNTER_IN_FLIGHT_OPS_##n: return ETxType::n;
// WARNING: DO NOT REORDER this constants
// reordering breaks update
@@ -144,6 +145,13 @@ struct TTxState {
return COUNTER_FINISHED_OPS_UNKNOWN;
}
}
+ static ETxType ConvertToTxType(ESimpleCounters t) {
+ switch(t) {
+ TX_STATE_TYPE_ENUM(TX_STATE_FROM_COUNTER)
+ default:
+ return TTxState::ETxType::TxInvalid;
+ }
+ }
#undef TX_STATE_TYPE_ENUM
diff --git a/ydb/core/tx/schemeshard/ut_split_merge.cpp b/ydb/core/tx/schemeshard/ut_split_merge.cpp
index 45d8d8e0b1..9b70c3c73b 100644
--- a/ydb/core/tx/schemeshard/ut_split_merge.cpp
+++ b/ydb/core/tx/schemeshard/ut_split_merge.cpp
@@ -326,4 +326,84 @@ Y_UNIT_TEST_SUITE(TSchemeShardSplitTest) {
}, true);
}
-}
+ void TryMergeWithInflyLimit(TTestActorRuntime &runtime, TTestEnv &env, const ui64 mergeNum, const ui64 remainMergeNum, const ui64 acceptedMergeNum, ui64 &txId) {
+ const ui64 shardsNum = mergeNum * 2;
+ const ui64 startMergePart = mergeNum - remainMergeNum;
+ TSet<ui64> txIds;
+ ui64 startTxId = txId;
+ for (ui64 i = startMergePart * 2; i < shardsNum; i += 2) {
+ AsyncSplitTable(runtime, txId, "/MyRoot/Table",
+ Sprintf(R"(
+ SourceTabletId: %lu
+ SourceTabletId: %lu
+ )", TTestTxConfig::FakeHiveTablets + i, TTestTxConfig::FakeHiveTablets + i + 1));
+ txIds.insert(txId++);
+ }
+
+ for (ui64 i = startTxId; i < startTxId + acceptedMergeNum ; i++)
+ TestModificationResult(runtime, i, NKikimrScheme::StatusAccepted);
+ for (ui64 i = startTxId + acceptedMergeNum; i < txId; i++)
+ TestModificationResult(runtime, i, NKikimrScheme::StatusResourceExhausted);
+
+ env.TestWaitNotification(runtime, txIds);
+ TestDescribeResult(DescribePath(runtime, "/MyRoot/Table"), {
+ NLs::ShardsInsideDomain(mergeNum + remainMergeNum - acceptedMergeNum)
+ });
+ };
+
+ void AsyncMergeWithInflyLimit(const ui64 mergeNum, const ui64 mergeLimit) {
+ const ui64 shardsNum = mergeNum * 2;
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+ ui64 txId = 123;
+ auto& appData = runtime.GetAppData();
+
+ // set batching only by timeout
+ NKikimrConfig::TSchemeShardConfig_TInFlightCounterConfig *inFlightCounter = appData.SchemeShardConfig.AddInFlightCounterConfig();
+ inFlightCounter->SetType(NKikimr::NSchemeShard::ESimpleCounters::COUNTER_IN_FLIGHT_OPS_TxSplitTablePartition);
+ inFlightCounter->SetInFlightLimit(mergeLimit);
+ // apply config via reboot
+ TActorId sender = runtime.AllocateEdgeActor();
+ GracefulRestartTablet(runtime, TTestTxConfig::SchemeShard, sender);
+
+ TestCreateTable(runtime, txId++, "/MyRoot", Sprintf(R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64"}
+ Columns { Name: "value" Type: "Utf8"}
+ KeyColumnNames: ["key"]
+ UniformPartitionsCount: %lu
+ PartitionConfig {
+ PartitioningPolicy {
+ MinPartitionsCount: 0
+ }
+ })", shardsNum));
+
+ env.TestWaitNotification(runtime, txId - 1);
+ TestDescribeResult(DescribePath(runtime, "/MyRoot/Table"),
+ {NLs::IsTable,
+ NLs::ShardsInsideDomain(shardsNum)});
+ ui64 remainMergeNum = mergeNum;
+
+ while (remainMergeNum > 0)
+ {
+ ui64 acceptedMergeNum = mergeLimit == 0
+ ? remainMergeNum
+ : std::min(remainMergeNum, mergeLimit);
+ TryMergeWithInflyLimit(runtime, env, mergeNum, remainMergeNum, acceptedMergeNum, txId);
+ remainMergeNum -= acceptedMergeNum;
+ }
+ }
+
+ Y_UNIT_TEST(Make11MergeOperationsWithInflyLimit10) {
+ AsyncMergeWithInflyLimit(11, 10);
+ }
+
+ Y_UNIT_TEST(Make20MergeOperationsWithInflyLimit5) {
+ AsyncMergeWithInflyLimit(20, 5);
+ }
+
+ Y_UNIT_TEST(Make20MergeOperationsWithoutLimit) {
+ AsyncMergeWithInflyLimit(20, 0);
+ }
+
+} \ No newline at end of file