diff options
author | andrew-rykov <arykov@ydb.tech> | 2022-08-12 13:10:40 +0300 |
---|---|---|
committer | andrew-rykov <arykov@ydb.tech> | 2022-08-12 13:10:40 +0300 |
commit | 4e6523601af8960a820bdd1cf3f3f0c3e1c0929a (patch) | |
tree | fbd6deb1a80b4372a8906b098ccea6062c95982b | |
parent | 31d848f5b32d3baffd1209315c00234e753a755b (diff) | |
download | ydb-4e6523601af8960a820bdd1cf3f3f0c3e1c0929a.tar.gz |
limit infly operations
62 files changed, 415 insertions, 17 deletions
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 7b066290da..a77dff0c1e 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -6,6 +6,7 @@ import "ydb/core/protos/blobstorage_vdisk_config.proto"; import "ydb/core/protos/blobstorage_pdisk_config.proto"; import "ydb/core/protos/cms.proto"; import "ydb/core/protos/config_units.proto"; +import "ydb/core/protos/counters_schemeshard.proto"; import "ydb/core/protos/drivemodel.proto"; import "ydb/core/protos/flat_scheme_op.proto"; import "ydb/core/protos/http_config.proto"; @@ -1436,6 +1437,12 @@ message TDataShardConfig { } message TSchemeShardConfig { + message TInFlightCounterConfig { + optional NKikimr.NSchemeShard.ESimpleCounters Type = 1; + // after this amount scheme shard begin to abort the operations + // to disable set to 0 + optional uint32 InFlightLimit = 2 [default = 10000]; + } // after this amount of time we forcely write full stats to local DB // to disable set to 0 optional uint32 StatsBatchTimeoutMs = 1 [default = 100]; @@ -1445,6 +1452,8 @@ message TSchemeShardConfig { optional uint32 StatsMaxBatchSize = 2 [default = 100]; optional uint32 StatsMaxExecuteMs = 3 [default = 10]; + + repeated TInFlightCounterConfig InFlightCounterConfig = 4; } message TCompactionConfig { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_bsv.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_bsv.cpp index 899be0d8e9..12e0ae984e 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_bsv.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_bsv.cpp @@ -622,6 +622,11 @@ public: volume->ForgetAlter(); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxAlterBlockStoreVolume, errStr)) + { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } // Increase in occupied space is applied immediately domainDir->ChangeVolumeSpaceBegin(newVolumeSpace, oldVolumeSpace); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp index b8c2dbc973..ab296763d3 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp @@ -172,6 +172,12 @@ public: return result; } } + TString errStr; + if (!context.SS->CheckInFlightLimit(TTxState::TxAlterCdcStream, errStr)) + { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } auto guard = context.DbGuard(); context.DbChanges.PersistAlterCdcStream(streamPath.Base()->PathId); @@ -393,6 +399,11 @@ public: result->SetError(NKikimrScheme::StatusMultipleModifications, errStr); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxAlterCdcStreamAtTable, errStr)) + { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } context.DbChanges.PersistTxState(OperationId); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_extsubdomain.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_extsubdomain.cpp index dd3ad4bfe4..bef536e106 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_extsubdomain.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_extsubdomain.cpp @@ -382,6 +382,11 @@ public: result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxAlterExtSubDomain, errStr)) + { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } if (settings.HasDeclaredSchemeQuotas()) { alterData->SetDeclaredSchemeQuotas(settings.GetDeclaredSchemeQuotas()); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_fs.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_fs.cpp index 64a1cb8ba4..b602718eb5 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_fs.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_fs.cpp @@ -404,6 +404,10 @@ THolder<TProposeResponse> TAlterFileStore::Propose( result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxAlterFileStore, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } fs->PrepareAlter(*alterConfig); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_index.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_index.cpp index c4c3261740..25aa7885e4 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_index.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_index.cpp @@ -205,10 +205,14 @@ public: result->SetPathId(dstPath.Base()->PathId.LocalPathId); - TString errMsg; + TString errStr; - if (!context.SS->CheckLocks(parentPath.Base()->PathId, Transaction, errMsg)) { - result->SetError(NKikimrScheme::StatusMultipleModifications, errMsg); + if (!context.SS->CheckLocks(parentPath.Base()->PathId, Transaction, errStr)) { + result->SetError(NKikimrScheme::StatusMultipleModifications, errStr); + return result; + } + if (!context.SS->CheckInFlightLimit(TTxState::TxAlterTableIndex, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); return result; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_kesus.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_kesus.cpp index 33240bada8..4bff97af01 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_kesus.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_kesus.cpp @@ -361,6 +361,10 @@ public: result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxAlterKesus, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } kesus->AlterConfig.Reset(new Ydb::Coordination::Config); kesus->AlterConfig->CopyFrom(kesus->Config); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_store.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_store.cpp index c94721a26c..d8423e390e 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_store.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_store.cpp @@ -669,6 +669,10 @@ public: result->SetError(status, errStr); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxAlterOlapStore, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } NIceDb::TNiceDb db(context.GetDB()); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp index 14e1114e85..75b65912fe 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp @@ -496,6 +496,10 @@ public: result->SetError(status, errStr); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxAlterColumnTable, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } Y_VERIFY(storeInfo->ColumnTables.contains(path->PathId)); storeInfo->ColumnTablesUnderOperation.insert(path->PathId); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp index da870c9a5b..52ccd544ba 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp @@ -589,6 +589,10 @@ public: "Unable to construct channel binding for PQ with the storage pool"); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxAlterPQGroup, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } // This channel bindings are for PersQueue shards. They either use // explicit channel profiles, or reuse channel profile above. diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_solomon.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_solomon.cpp index b7c1275ff8..9aa2d32328 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_solomon.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_solomon.cpp @@ -291,6 +291,10 @@ public: result->SetError(NKikimrScheme::StatusInvalidParameter, "Unable to construct channel binding with the storage pool"); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxAlterSolomonVolume, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } result->SetPathId(path.Base()->PathId.LocalPathId); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_subdomain.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_subdomain.cpp index 6e977dc4f6..884308c708 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_subdomain.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_subdomain.cpp @@ -312,6 +312,10 @@ public: result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxAlterSubDomain, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } if (settings.HasDeclaredSchemeQuotas()) { alterData->SetDeclaredSchemeQuotas(settings.GetDeclaredSchemeQuotas()); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_table.cpp index fec8cb34e6..5378350ecf 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_table.cpp @@ -600,6 +600,10 @@ public: return result; } } + if (!context.SS->CheckInFlightLimit(TTxState::TxAlterTable, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } table->PrepareAlter(alterData); PrepareChanges(OperationId, path.Base(), table, bindingChanges, context); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_user_attrs.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_user_attrs.cpp index 96f7bca898..d648fc5c8c 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_user_attrs.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_user_attrs.cpp @@ -83,6 +83,10 @@ public: result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxAlterUserAttributes, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } NIceDb::TNiceDb db(context.GetDB()); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_backup_restore_common.h b/ydb/core/tx/schemeshard/schemeshard__operation_backup_restore_common.h index cda438a963..b65b3b1715 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_backup_restore_common.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_backup_restore_common.h @@ -638,6 +638,10 @@ public: result->SetError(NKikimrScheme::StatusMultipleModifications, errStr); return result; } + if (!context.SS->CheckInFlightLimit(TxType, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } PrepareChanges(path.Base(), context); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp index 5e6612a946..3f41a88f4e 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp @@ -545,6 +545,10 @@ public: return result; } } + if (!context.SS->CheckInFlightLimit(TTxState::TxCopyTable, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } auto guard = context.DbGuard(); TPathId allocatedPathId = context.SS->AllocatePathId(); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_bsv.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_bsv.cpp index ea2c10eb13..cbb66037aa 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_bsv.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_bsv.cpp @@ -373,6 +373,10 @@ public: result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxCreateBlockStoreVolume, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } dstPath.MaterializeLeaf(owner); result->SetPathId(dstPath.Base()->PathId.LocalPathId); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp index de5dc705ba..f2cd544cad 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp @@ -224,6 +224,10 @@ public: result->SetError(NKikimrScheme::StatusMultipleModifications, errStr); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxCreateCdcStream, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } auto stream = TCdcStreamInfo::Create(streamDesc); Y_VERIFY(stream); @@ -449,6 +453,10 @@ public: result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxCreateCdcStreamAtTable, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } context.DbChanges.PersistTxState(OperationId); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_extsubdomain.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_extsubdomain.cpp index b608520af1..2f020ba4d8 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_extsubdomain.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_extsubdomain.cpp @@ -217,6 +217,10 @@ public: result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxCreateExtSubDomain, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } dstPath.MaterializeLeaf(owner); result->SetPathId(dstPath.Base()->PathId.LocalPathId); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_fs.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_fs.cpp index 54b9125fd0..e835cd2403 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_fs.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_fs.cpp @@ -428,6 +428,10 @@ THolder<TProposeResponse> TCreateFileStore::Propose( result->SetError(status, errStr); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxCreateFileStore, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } dstPath.MaterializeLeaf(owner); result->SetPathId(dstPath.Base()->PathId.LocalPathId); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_index.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_index.cpp index cfcec10af4..a7f7184b94 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_index.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_index.cpp @@ -215,21 +215,25 @@ public: } } - TString errMsg; + TString errStr; - if (!context.SS->CheckLocks(parentPath.Base()->PathId, Transaction, errMsg)) { - result->SetError(NKikimrScheme::StatusMultipleModifications, errMsg); + if (!context.SS->CheckLocks(parentPath.Base()->PathId, Transaction, errStr)) { + result->SetError(NKikimrScheme::StatusMultipleModifications, errStr); return result; } TTableIndexInfo::TPtr newIndexData = nullptr; { - newIndexData = TTableIndexInfo::Create(tableIndexCreation, errMsg); + newIndexData = TTableIndexInfo::Create(tableIndexCreation, errStr); if (!newIndexData) { - result->SetError(TEvSchemeShard::EStatus::StatusInvalidParameter, errMsg); + result->SetError(TEvSchemeShard::EStatus::StatusInvalidParameter, errStr); return result; } } + if (!context.SS->CheckInFlightLimit(TTxState::TxCreateTableIndex, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } auto guard = context.DbGuard(); TPathId allocatedPathId = context.SS->AllocatePathId(); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_kesus.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_kesus.cpp index f44ee9e0a2..9cc2b446ac 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_kesus.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_kesus.cpp @@ -411,6 +411,10 @@ public: result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxCreateKesus, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } dstPath.MaterializeLeaf(owner); result->SetPathId(dstPath.Base()->PathId.LocalPathId); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_lock.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_lock.cpp index b2a2e90320..4e56045032 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_lock.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_lock.cpp @@ -248,9 +248,13 @@ public: return result; } - TString errMsg; - if (!context.SS->CheckLocks(pathId, Transaction, errMsg)) { - result->SetError(NKikimrScheme::StatusMultipleModifications, errMsg); + TString errStr; + if (!context.SS->CheckLocks(pathId, Transaction, errStr)) { + result->SetError(NKikimrScheme::StatusMultipleModifications, errStr); + return result; + } + if (!context.SS->CheckInFlightLimit(TTxState::TxCreateLockForIndexBuild, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); return result; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp index ad91e488ec..ffa94ef319 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp @@ -645,6 +645,10 @@ public: result->SetError(NKikimrScheme::StatusInvalidParameter, errStr); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxCreateOlapStore, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } const ui64 shardsToCreate = storeInfo->ColumnShards.size(); { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp index 07006a0203..9ae12a29f7 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp @@ -729,6 +729,10 @@ public: result->SetError(status, errStr); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxCreateColumnTable, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } dstPath.MaterializeLeaf(owner); result->SetPathId(dstPath.Base()->PathId.LocalPathId); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp index f5976ca648..4aca35cf9a 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp @@ -471,6 +471,10 @@ public: } else { pqChannelsBinding = tabletChannelsBinding; } + if (!context.SS->CheckInFlightLimit(TTxState::TxCreatePQGroup, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } dstPath.MaterializeLeaf(owner); result->SetPathId(dstPath.Base()->PathId.LocalPathId); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_replication.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_replication.cpp index d61c5f1134..666ebb6e46 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_replication.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_replication.cpp @@ -349,6 +349,10 @@ public: return result; } } + if (!context.SS->CheckInFlightLimit(TTxState::TxCreateReplication, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } path.MaterializeLeaf(owner); path->CreateTxId = OperationId.GetTxId(); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_rtmr.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_rtmr.cpp index d2c169f432..52f7007f25 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_rtmr.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_rtmr.cpp @@ -329,6 +329,10 @@ public: result->SetError(NKikimrScheme::StatusInvalidParameter, "Unable to construct channel binding with the storage pool"); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxCreateRtmrVolume, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } dstPath.MaterializeLeaf(owner); result->SetPathId(dstPath.Base()->PathId.LocalPathId); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_sequence.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_sequence.cpp index f66755dce1..4ca83560e5 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_sequence.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_sequence.cpp @@ -434,6 +434,10 @@ public: return result; } } + if (!context.SS->CheckInFlightLimit(TTxState::TxCreateSequence, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } dstPath.MaterializeLeaf(owner); result->SetPathId(dstPath->PathId.LocalPathId); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_solomon.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_solomon.cpp index 6aa4b20d30..9a67d43987 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_solomon.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_solomon.cpp @@ -364,6 +364,10 @@ public: result->SetError(NKikimrScheme::StatusInvalidParameter, "Unable to construct channel binding with the storage pool"); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxCreateSolomonVolume, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } dstPath.MaterializeLeaf(owner); result->SetPathId(dstPath.Base()->PathId.LocalPathId); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_subdomain.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_subdomain.cpp index 2d9d180e36..f863406ec9 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_subdomain.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_subdomain.cpp @@ -272,6 +272,10 @@ public: result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxCreateSubDomain, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } dstPath.MaterializeLeaf(owner); result->SetPathId(dstPath.Base()->PathId.LocalPathId); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_table.cpp index 0d111051cd..07825405ee 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_table.cpp @@ -619,6 +619,10 @@ public: result->SetError(NKikimrScheme::StatusInvalidParameter, errStr); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxCreateTable, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } dstPath.MaterializeLeaf(owner); result->SetPathId(dstPath.Base()->PathId.LocalPathId); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_bsv.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_bsv.cpp index 69efddbf05..66638d4efc 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_bsv.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_bsv.cpp @@ -249,6 +249,10 @@ public: result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxDropBlockStoreVolume, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxDropBlockStoreVolume, path.Base()->PathId); // Dirty hack: drop step must not be zero because 0 is treated as "hasn't been dropped" diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp index c0d3dd5ba3..db02b475aa 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp @@ -180,6 +180,11 @@ public: return result; } } + TString errStr; + if (!context.SS->CheckInFlightLimit(TTxState::TxDropCdcStream, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } auto guard = context.DbGuard(); context.DbChanges.PersistTxState(OperationId); @@ -384,6 +389,10 @@ public: result->SetError(NKikimrScheme::StatusMultipleModifications, errStr); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxDropCdcStreamAtTable, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } context.DbChanges.PersistTxState(OperationId); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_extsubdomain.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_extsubdomain.cpp index 9dd0dae03a..5db794ccf0 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_extsubdomain.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_extsubdomain.cpp @@ -327,6 +327,10 @@ public: result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxForceDropExtSubDomain, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxForceDropExtSubDomain, path.Base()->PathId); txState.State = TTxState::Waiting; diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_fs.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_fs.cpp index d76e89c4f8..2f99a3b367 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_fs.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_fs.cpp @@ -302,6 +302,10 @@ THolder<TProposeResponse> TDropFileStore::Propose( result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxDropFileStore, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxDropFileStore, path.Base()->PathId); // Dirty hack: operation step must not be zero because 0 is treated as "hasn't been operationped" diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_index.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_index.cpp index 40dde95922..61958f7d75 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_index.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_index.cpp @@ -362,7 +362,10 @@ public: return result; } } - + if (!context.SS->CheckInFlightLimit(TTxState::TxDropTableIndexAtMainTable, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } Y_VERIFY(context.SS->Tables.contains(tablePath.Base()->PathId)); TTableInfo::TPtr table = context.SS->Tables.at(tablePath.Base()->PathId); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp index d471698590..446c86b93b 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp @@ -367,6 +367,11 @@ public: return result; } } + TString errStr; + if (!context.SS->CheckInFlightLimit(TTxState::TxDropTableIndex, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } Y_VERIFY(context.SS->Indexes.contains(index.Base()->PathId)); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_kesus.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_kesus.cpp index 947ca40808..c8830da7f1 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_kesus.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_kesus.cpp @@ -244,6 +244,10 @@ public: result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxDropKesus, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxDropKesus, path.Base()->PathId); // Dirty hack: drop step must not be zero because 0 is treated as "hasn't been dropped" diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_lock.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_lock.cpp index c99cc5664a..464aa1e142 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_lock.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_lock.cpp @@ -147,9 +147,13 @@ public: return result; } - TString errMsg; - if (!context.SS->CheckLocks(pathId, Transaction, errMsg)) { - result->SetError(NKikimrScheme::StatusMultipleModifications, errMsg); + TString errStr; + if (!context.SS->CheckLocks(pathId, Transaction, errStr)) { + result->SetError(NKikimrScheme::StatusMultipleModifications, errStr); + return result; + } + if (!context.SS->CheckInFlightLimit(TTxState::TxDropLock, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); return result; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_store.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_store.cpp index 02cd024c7c..369b25fa8b 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_store.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_store.cpp @@ -354,6 +354,10 @@ public: result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxDropOlapStore, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxDropOlapStore, path.Base()->PathId); txState.State = TTxState::DropParts; diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp index 15462236ae..21719ccdde 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp @@ -407,6 +407,10 @@ public: result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxDropColumnTable, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } Y_VERIFY(storeInfo->ColumnTables.contains(path->PathId)); storeInfo->ColumnTablesUnderOperation.insert(path->PathId); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp index a85a65dca2..6a9fd0e230 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp @@ -469,6 +469,10 @@ public: result->SetError(NKikimrScheme::StatusMultipleModifications, "Drop over Create/Alter"); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxDropPQGroup, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxDropPQGroup, path.Base()->PathId); // Dirty hack: drop step must not be zero because 0 is treated as "hasn't been dropped" diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_replication.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_replication.cpp index f6acf0a4d8..92ea900e21 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_replication.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_replication.cpp @@ -301,6 +301,10 @@ public: result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxDropReplication, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } Y_VERIFY(context.SS->Replications.contains(path->PathId)); auto replication = context.SS->Replications.at(path->PathId); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_sequence.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_sequence.cpp index 16e1cad401..a37c4a204b 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_sequence.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_sequence.cpp @@ -364,6 +364,10 @@ public: result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxDropSequence, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxDropSequence, path->PathId); txState.State = TTxState::DropParts; diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_solomon.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_solomon.cpp index 71b3042e6b..949a4b4fa9 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_solomon.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_solomon.cpp @@ -228,6 +228,10 @@ public: result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxDropSolomonVolume, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxDropSolomonVolume, path.Base()->PathId); // Dirty hack: drop step must not be zero because 0 is treated as "hasn't been dropped" diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_subdomain.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_subdomain.cpp index e29d217d9b..42b51ed9c9 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_subdomain.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_subdomain.cpp @@ -227,6 +227,10 @@ public: result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxDropSubDomain, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxDropSubDomain, path.Base()->PathId); // Dirty hack: drop step must not be zero because 0 is treated as "hasn't been dropped" diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_table.cpp index c558b21fbc..2d9e55d2ca 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_table.cpp @@ -597,6 +597,10 @@ public: result->SetError(NKikimrScheme::StatusMultipleModifications, errStr); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxDropTable, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } auto guard = context.DbGuard(); context.MemChanges.GrabNewTxState(context.SS, OperationId); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_unsafe.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_unsafe.cpp index 3652b0b24c..b341707a66 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_unsafe.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_unsafe.cpp @@ -263,6 +263,10 @@ public: result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxForceDropSubDomain, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxForceDropSubDomain, path.Base()->PathId); txState.State = TTxState::Waiting; diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_finalize_build_index.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_finalize_build_index.cpp index 60b96ae464..e816ce300a 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_finalize_build_index.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_finalize_build_index.cpp @@ -406,6 +406,10 @@ public: result->SetError(TEvSchemeShard::EStatus::StatusPathDoesNotExist, errStr); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxFinalizeBuildIndex, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } NIceDb::TNiceDb db(context.GetDB()); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_initiate_build_index.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_initiate_build_index.cpp index 4d41f250b4..0428224913 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_initiate_build_index.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_initiate_build_index.cpp @@ -425,6 +425,10 @@ public: result->SetError(TEvSchemeShard::EStatus::StatusSchemeError, errStr); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxInitializeBuildIndex, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } NIceDb::TNiceDb db(context.GetDB()); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_mkdir.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_mkdir.cpp index 9263411105..8d6e6e9262 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_mkdir.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_mkdir.cpp @@ -221,6 +221,10 @@ public: result->SetError(NKikimrScheme::StatusInvalidParameter, errStr); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxMkDir, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } auto guard = context.DbGuard(); TPathId allocatedPathId = context.SS->AllocatePathId(); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_move_index.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_move_index.cpp index 8ebf3bd757..b565e4ac90 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_move_index.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_move_index.cpp @@ -436,6 +436,10 @@ public: result->SetError(NKikimrScheme::StatusMultipleModifications, errStr); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxUpdateMainTableOnIndexMove, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } Y_VERIFY(context.SS->Tables.contains(tablePath.Base()->PathId)); TTableInfo::TPtr table = context.SS->Tables.at(tablePath.Base()->PathId); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_move_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_move_table.cpp index dc46563bab..0a6c643789 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_move_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_move_table.cpp @@ -701,6 +701,10 @@ public: result->SetError(NKikimrScheme::StatusMultipleModifications, errStr); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxMoveTable, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } auto guard = context.DbGuard(); TPathId allocatedPathId = context.SS->AllocatePathId(); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_move_table_index.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_move_table_index.cpp index 360126a2b5..ae6aa44978 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_move_table_index.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_move_table_index.cpp @@ -515,6 +515,10 @@ public: result->SetError(NKikimrScheme::StatusMultipleModifications, errStr); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxMoveTableIndex, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } auto guard = context.DbGuard(); TPathId allocatedPathId = context.SS->AllocatePathId(); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_rmdir.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_rmdir.cpp index 3701119300..b0ecf787b4 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_rmdir.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_rmdir.cpp @@ -76,6 +76,10 @@ public: result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxRmDir, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } NIceDb::TNiceDb db(context.GetDB()); TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxRmDir, path.Base()->PathId); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp index 6716e81d75..ace2f95390 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp @@ -954,6 +954,10 @@ public: result->SetError(NKikimrScheme::StatusInvalidParameter, "Invalid request: only 1->N or N->1 are supported"); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxSplitTablePartition, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } /////////// /// Accept operation diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_upgrade_subdomain.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_upgrade_subdomain.cpp index 500bac6ec4..b83f4e6028 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_upgrade_subdomain.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_upgrade_subdomain.cpp @@ -1231,6 +1231,10 @@ public: result->SetError(NKikimrScheme::StatusMultipleModifications, msg); return result; } + if (!context.SS->CheckInFlightLimit(TTxState::TxUpgradeSubDomain, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } TSubDomainInfo::TPtr alterData = new TSubDomainInfo(*subDomain, subDomain->GetPlanResolution(), @@ -1496,6 +1500,10 @@ public: result->SetError(NKikimrScheme::StatusInvalidParameter, errStr); return result; }; + if (!context.SS->CheckInFlightLimit(TTxState::TxUpgradeSubDomainDecision, errStr)) { + result->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + return result; + } TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxUpgradeSubDomainDecision, path.Base()->PathId); txState.State = TTxState::Waiting; diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 6a1c138573..49c45cf3ab 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -3839,6 +3839,7 @@ void TSchemeShard::OnActivateExecutor(const TActorContext &ctx) { ConfigureCompactionQueues(appData->CompactionConfig, ctx); ConfigureStatsBatching(appData->SchemeShardConfig, ctx); + ConfigureStatsOperations(appData->SchemeShardConfig, ctx); if (appData->ChannelProfiles) { ChannelProfiles = appData->ChannelProfiles; @@ -6099,7 +6100,9 @@ void TSchemeShard::ApplyConsoleConfigs(const NKikimrConfig::TAppConfig& appConfi } if (appConfig.HasSchemeShardConfig()) { - ConfigureStatsBatching(appConfig.GetSchemeShardConfig(), ctx); + const auto& schemeShardConfig = appConfig.GetSchemeShardConfig(); + ConfigureStatsBatching(schemeShardConfig, ctx); + ConfigureStatsOperations(schemeShardConfig, ctx); } if (IsShemeShardConfigured()) { @@ -6132,6 +6135,45 @@ void TSchemeShard::ConfigureStatsBatching(const NKikimrConfig::TSchemeShardConfi << ", StatsMaxExecuteTime# " << StatsMaxExecuteTime); } +void TSchemeShard::ConfigureStatsOperations(const NKikimrConfig::TSchemeShardConfig& config, const TActorContext& ctx) { + for (const auto& operationConfig: config.GetInFlightCounterConfig()) { + ui32 limit = operationConfig.GetInFlightLimit(); + auto txState = TTxState::ConvertToTxType(operationConfig.GetType()); + InFlightLimits[txState] = limit; + } + + if (InFlightLimits.empty()) { + NKikimrConfig::TSchemeShardConfig_TInFlightCounterConfig inFlightCounterConfig; + auto defaultInFlightLimit = inFlightCounterConfig.GetInFlightLimit(); + InFlightLimits[TTxState::ETxType::TxSplitTablePartition] = defaultInFlightLimit; + InFlightLimits[TTxState::ETxType::TxMergeTablePartition] = defaultInFlightLimit; + LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "OperationsProcessing config: using default configuration"); + } + + for (auto it = InFlightLimits.begin(); it != InFlightLimits.end(); ++it) { + LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "OperationsProcessing config: type " << TTxState::TypeName(it->first) + << ", limit " << it->second); + } +} + +bool TSchemeShard::CheckInFlightLimit(const TTxState::ETxType txType, TString& errStr) const { + auto it = InFlightLimits.find(txType); + if (it == InFlightLimits.end()) { + return true; + } + if (it->second != 0 && TabletCounters->Simple()[TTxState::TxTypeInFlightCounter(txType)].Get() >= it->second) + { + errStr = TStringBuilder() << "the limit of operations with type " << TTxState::TypeName(txType) + << " has been exceeded" + << ", limit: " << it->second; + return false; + } + + return true; +} + void TSchemeShard::ConfigureCompactionQueues( const NKikimrConfig::TCompactionConfig& compactionConfig, const TActorContext &ctx) diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index d68e328952..6c6be75b96 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -258,6 +258,7 @@ public: TDuration StatsMaxExecuteTime; TDuration StatsBatchTimeout; ui32 StatsMaxBatchSize = 0; + THashMap<TTxState::ETxType, ui32> InFlightLimits; // time when we opened the batch TMonotonic StatsBatchStartTs; @@ -382,6 +383,10 @@ public: const NKikimrConfig::TSchemeShardConfig& config, const TActorContext &ctx); + void ConfigureStatsOperations( + const NKikimrConfig::TSchemeShardConfig& config, + const TActorContext &ctx); + void ConfigureCompactionQueues( const NKikimrConfig::TCompactionConfig& config, const TActorContext &ctx); @@ -394,6 +399,10 @@ public: const NKikimrConfig::TCompactionConfig::TBorrowedCompactionConfig& config, const TActorContext &ctx); + bool CheckInFlightLimit( + const TTxState::ETxType txType, + TString& errStr) const; + void StartStopCompactionQueues(); bool ApplyStorageConfig(const TStoragePools& storagePools, diff --git a/ydb/core/tx/schemeshard/schemeshard_tx_infly.h b/ydb/core/tx/schemeshard/schemeshard_tx_infly.h index e6260a0e34..c1dc7fbe91 100644 --- a/ydb/core/tx/schemeshard/schemeshard_tx_infly.h +++ b/ydb/core/tx/schemeshard/schemeshard_tx_infly.h @@ -38,6 +38,7 @@ struct TTxState { #define TX_STATE_ENUM_NAME(n, ...) case n: return #n; #define TX_STATE_IN_FLIGHT_COUNTER(n, ...) case n: return COUNTER_IN_FLIGHT_OPS_##n ; #define TX_STATE_FINISHED_COUNTER(n, ...) case n: return COUNTER_FINISHED_OPS_##n ; + #define TX_STATE_FROM_COUNTER(n, ...) case ESimpleCounters::COUNTER_IN_FLIGHT_OPS_##n: return ETxType::n; // WARNING: DO NOT REORDER this constants // reordering breaks update @@ -144,6 +145,13 @@ struct TTxState { return COUNTER_FINISHED_OPS_UNKNOWN; } } + static ETxType ConvertToTxType(ESimpleCounters t) { + switch(t) { + TX_STATE_TYPE_ENUM(TX_STATE_FROM_COUNTER) + default: + return TTxState::ETxType::TxInvalid; + } + } #undef TX_STATE_TYPE_ENUM diff --git a/ydb/core/tx/schemeshard/ut_split_merge.cpp b/ydb/core/tx/schemeshard/ut_split_merge.cpp index 45d8d8e0b1..9b70c3c73b 100644 --- a/ydb/core/tx/schemeshard/ut_split_merge.cpp +++ b/ydb/core/tx/schemeshard/ut_split_merge.cpp @@ -326,4 +326,84 @@ Y_UNIT_TEST_SUITE(TSchemeShardSplitTest) { }, true); } -} + void TryMergeWithInflyLimit(TTestActorRuntime &runtime, TTestEnv &env, const ui64 mergeNum, const ui64 remainMergeNum, const ui64 acceptedMergeNum, ui64 &txId) { + const ui64 shardsNum = mergeNum * 2; + const ui64 startMergePart = mergeNum - remainMergeNum; + TSet<ui64> txIds; + ui64 startTxId = txId; + for (ui64 i = startMergePart * 2; i < shardsNum; i += 2) { + AsyncSplitTable(runtime, txId, "/MyRoot/Table", + Sprintf(R"( + SourceTabletId: %lu + SourceTabletId: %lu + )", TTestTxConfig::FakeHiveTablets + i, TTestTxConfig::FakeHiveTablets + i + 1)); + txIds.insert(txId++); + } + + for (ui64 i = startTxId; i < startTxId + acceptedMergeNum ; i++) + TestModificationResult(runtime, i, NKikimrScheme::StatusAccepted); + for (ui64 i = startTxId + acceptedMergeNum; i < txId; i++) + TestModificationResult(runtime, i, NKikimrScheme::StatusResourceExhausted); + + env.TestWaitNotification(runtime, txIds); + TestDescribeResult(DescribePath(runtime, "/MyRoot/Table"), { + NLs::ShardsInsideDomain(mergeNum + remainMergeNum - acceptedMergeNum) + }); + }; + + void AsyncMergeWithInflyLimit(const ui64 mergeNum, const ui64 mergeLimit) { + const ui64 shardsNum = mergeNum * 2; + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 123; + auto& appData = runtime.GetAppData(); + + // set batching only by timeout + NKikimrConfig::TSchemeShardConfig_TInFlightCounterConfig *inFlightCounter = appData.SchemeShardConfig.AddInFlightCounterConfig(); + inFlightCounter->SetType(NKikimr::NSchemeShard::ESimpleCounters::COUNTER_IN_FLIGHT_OPS_TxSplitTablePartition); + inFlightCounter->SetInFlightLimit(mergeLimit); + // apply config via reboot + TActorId sender = runtime.AllocateEdgeActor(); + GracefulRestartTablet(runtime, TTestTxConfig::SchemeShard, sender); + + TestCreateTable(runtime, txId++, "/MyRoot", Sprintf(R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64"} + Columns { Name: "value" Type: "Utf8"} + KeyColumnNames: ["key"] + UniformPartitionsCount: %lu + PartitionConfig { + PartitioningPolicy { + MinPartitionsCount: 0 + } + })", shardsNum)); + + env.TestWaitNotification(runtime, txId - 1); + TestDescribeResult(DescribePath(runtime, "/MyRoot/Table"), + {NLs::IsTable, + NLs::ShardsInsideDomain(shardsNum)}); + ui64 remainMergeNum = mergeNum; + + while (remainMergeNum > 0) + { + ui64 acceptedMergeNum = mergeLimit == 0 + ? remainMergeNum + : std::min(remainMergeNum, mergeLimit); + TryMergeWithInflyLimit(runtime, env, mergeNum, remainMergeNum, acceptedMergeNum, txId); + remainMergeNum -= acceptedMergeNum; + } + } + + Y_UNIT_TEST(Make11MergeOperationsWithInflyLimit10) { + AsyncMergeWithInflyLimit(11, 10); + } + + Y_UNIT_TEST(Make20MergeOperationsWithInflyLimit5) { + AsyncMergeWithInflyLimit(20, 5); + } + + Y_UNIT_TEST(Make20MergeOperationsWithoutLimit) { + AsyncMergeWithInflyLimit(20, 0); + } + +}
\ No newline at end of file |