diff options
author | Aleksei Borzenkov <[email protected]> | 2022-06-09 18:31:51 +0300 |
---|---|---|
committer | Aleksei Borzenkov <[email protected]> | 2022-06-09 18:31:51 +0300 |
commit | 523dd687ec846447ca07d38c12ed29a85479cf68 (patch) | |
tree | 8eed926f471f22c3e4eaf5c59a2990e23ccf7148 | |
parent | 0cfac1cb0d218fe93b0417b0663a9a876fa71e67 (diff) |
Configure force shard split with icb, KIKIMR-15059
ref:a3c4cadd1169fb56856454e2cd331e6a8c43be87
6 files changed, 84 insertions, 34 deletions
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index c324d24be15..05c9e4e7fc4 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1277,9 +1277,23 @@ message TImmediateControlsConfig { DefaultValue: 250000 }]; } + message TSchemeShardControls { + optional uint64 ForceShardSplitDataSize = 1 [(ControlOptions) = { + Description: "Forces shards to split when reaching the given data size (2 GiB by default)", + MinValue: 10485760, // 10 MiB + MaxValue: 17179869184, // 16 GiB + DefaultValue: 2147483648 }]; + optional uint64 DisableForceShardSplit = 2 [(ControlOptions) = { + Description: "Disables forced shard splits, for special cases only", + MinValue: 0, + MaxValue: 1, + DefaultValue: 0 }]; + } + optional TDataShardControls DataShardControls = 1; optional TTxLimitControls TxLimitControls = 2; optional TCoordinatorControls CoordinatorControls = 3; + optional TSchemeShardControls SchemeShardControls = 4; }; message TMeteringConfig { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp index 055525640ff..6b1b99425e0 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp @@ -651,9 +651,10 @@ public: } auto srcShardIdx = tableInfo->GetPartitions()[srcPartitionIdx].ShardIdx; + const auto forceShardSplitSettings = context.SS->SplitSettings.GetForceShardSplitSettings(); if (tableInfo->GetExpectedPartitionCount() + count - 1 > tableInfo->GetMaxPartitionsCount() && - !tableInfo->IsForceSplitBySizeShardIdx(srcShardIdx)) + !tableInfo->IsForceSplitBySizeShardIdx(srcShardIdx, forceShardSplitSettings)) { errStr = "Reached MaxPartitionsCount limit: " + ToString(tableInfo->GetMaxPartitionsCount()); return false; diff --git a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp index 065836b85d1..66100f74349 100644 --- a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp @@ -141,6 +141,7 @@ bool TTxStorePartitionStats::Execute(TTransactionContext& txc, const TActorConte } auto shardIdx = Self->TabletIdToShardIdx[datashardId]; + const auto forceShardSplitSettings = Self->SplitSettings.GetForceShardSplitSettings(); TTableInfo::TPartitionStats newStats; newStats.SeqNo = TMessageSeqNo(rec.GetGeneration(), rec.GetRound()); @@ -258,7 +259,7 @@ bool TTxStorePartitionStats::Execute(TTransactionContext& txc, const TActorConte } TVector<TShardIdx> shardsToMerge; - if (table->CheckCanMergePartitions(Self->SplitSettings, shardIdx, shardsToMerge)) { + if (table->CheckCanMergePartitions(Self->SplitSettings, forceShardSplitSettings, shardIdx, shardsToMerge)) { TTxId txId = Self->GetCachedTxId(ctx); if (!txId) { @@ -291,7 +292,7 @@ bool TTxStorePartitionStats::Execute(TTransactionContext& txc, const TActorConte ui64 dataSizeResolution = 0; // Datashard will use default resolution ui64 rowCountResolution = 0; // Datashard will use default resolution bool collectKeySample = false; - if (table->ShouldSplitBySize(dataSize)) { + if (table->ShouldSplitBySize(dataSize, forceShardSplitSettings)) { // We would like to split by size and do this no matter how many partitions there are } else if (table->GetPartitions().size() >= table->GetMaxPartitionsCount()) { // We cannot split as there are max partitions already diff --git a/ydb/core/tx/schemeshard/schemeshard__table_stats_histogram.cpp b/ydb/core/tx/schemeshard/schemeshard__table_stats_histogram.cpp index 4f15001413c..0c5adea88cc 100644 --- a/ydb/core/tx/schemeshard/schemeshard__table_stats_histogram.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__table_stats_histogram.cpp @@ -308,9 +308,10 @@ bool TTxPartitionHistogram::Execute(TTransactionContext& txc, const TActorContex return true; auto shardIdx = Self->TabletIdToShardIdx[datashardId]; + const auto forceShardSplitSettings = Self->SplitSettings.GetForceShardSplitSettings(); ESplitReason splitReason = ESplitReason::NO_SPLIT; - if (table->ShouldSplitBySize(dataSize)) { + if (table->ShouldSplitBySize(dataSize, forceShardSplitSettings)) { splitReason = ESplitReason::SPLIT_BY_SIZE; } diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp index 9057806cb87..ebd7d4aa67a 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp @@ -1419,6 +1419,7 @@ void TTableInfo::FinishSplitMergeOp(TOperationId opId) { bool TTableInfo::TryAddShardToMerge(const TSplitSettings& splitSettings, + const TForceShardSplitSettings& forceShardSplitSettings, TShardIdx shardIdx, TVector<TShardIdx>& shardsToMerge, THashSet<TTabletId>& partOwners, ui64& totalSize, float& totalLoad) const { @@ -1451,7 +1452,7 @@ bool TTableInfo::TryAddShardToMerge(const TSplitSettings& splitSettings, bool canMerge = false; // Check if we can try merging by size - if (IsMergeBySizeEnabled() && stats->DataSize + totalSize <= GetSizeToMerge()) { + if (IsMergeBySizeEnabled(forceShardSplitSettings) && stats->DataSize + totalSize <= GetSizeToMerge(forceShardSplitSettings)) { canMerge = true; } @@ -1466,7 +1467,7 @@ bool TTableInfo::TryAddShardToMerge(const TSplitSettings& splitSettings, return false; // Check that total size doesn't exceed the limits - if (IsSplitBySizeEnabled() && stats->DataSize + totalSize >= GetShardSizeToSplit()*0.9) { + if (IsSplitBySizeEnabled(forceShardSplitSettings) && stats->DataSize + totalSize >= GetShardSizeToSplit(forceShardSplitSettings)*0.9) { return false; } @@ -1500,7 +1501,10 @@ bool TTableInfo::TryAddShardToMerge(const TSplitSettings& splitSettings, return true; } -bool TTableInfo::CheckCanMergePartitions(const TSplitSettings& splitSettings, TShardIdx shardIdx, TVector<TShardIdx>& shardsToMerge) const { +bool TTableInfo::CheckCanMergePartitions(const TSplitSettings& splitSettings, + const TForceShardSplitSettings& forceShardSplitSettings, + TShardIdx shardIdx, TVector<TShardIdx>& shardsToMerge) const +{ // Don't split/merge backup tables if (IsBackup) { return false; @@ -1527,12 +1531,12 @@ bool TTableInfo::CheckCanMergePartitions(const TSplitSettings& splitSettings, TS THashSet<TTabletId> partOwners; // Make sure we can actually merge current shard first - if (!TryAddShardToMerge(splitSettings, shardIdx, shardsToMerge, partOwners, totalSize, totalLoad)) { + if (!TryAddShardToMerge(splitSettings, forceShardSplitSettings, shardIdx, shardsToMerge, partOwners, totalSize, totalLoad)) { return false; } for (i64 pi = partitionIdx - 1; pi >= 0; --pi) { - if (!TryAddShardToMerge(splitSettings, GetPartitions()[pi].ShardIdx, shardsToMerge, partOwners, totalSize, totalLoad)) { + if (!TryAddShardToMerge(splitSettings, forceShardSplitSettings, GetPartitions()[pi].ShardIdx, shardsToMerge, partOwners, totalSize, totalLoad)) { break; } } @@ -1540,7 +1544,7 @@ bool TTableInfo::CheckCanMergePartitions(const TSplitSettings& splitSettings, TS Reverse(shardsToMerge.begin(), shardsToMerge.end()); for (ui64 pi = partitionIdx + 1; pi < GetPartitions().size(); ++pi) { - if (!TryAddShardToMerge(splitSettings, GetPartitions()[pi].ShardIdx, shardsToMerge, partOwners, totalSize, totalLoad)) { + if (!TryAddShardToMerge(splitSettings, forceShardSplitSettings, GetPartitions()[pi].ShardIdx, shardsToMerge, partOwners, totalSize, totalLoad)) { break; } } diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index 041bd035bf5..b08827004f9 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -33,6 +33,11 @@ namespace NKikimr { namespace NSchemeShard { +struct TForceShardSplitSettings { + ui64 ForceShardSplitDataSize; + bool DisableForceShardSplit; +}; + struct TSplitSettings { TControlWrapper SplitMergePartCountLimit; TControlWrapper FastSplitSizeThreshold; @@ -42,6 +47,8 @@ struct TSplitSettings { TControlWrapper SplitByLoadMaxShardsDefault; TControlWrapper MergeByLoadMinUptimeSec; TControlWrapper MergeByLoadMinLowLoadDurationSec; + TControlWrapper ForceShardSplitDataSize; + TControlWrapper DisableForceShardSplit; TSplitSettings() : SplitMergePartCountLimit(2000, -1, 1000000) @@ -52,6 +59,8 @@ struct TSplitSettings { , SplitByLoadMaxShardsDefault(50, 0, 10000) , MergeByLoadMinUptimeSec(10*60, 0, 4ll*1000*1000*1000) , MergeByLoadMinLowLoadDurationSec(1*60*60, 0, 4ll*1000*1000*1000) + , ForceShardSplitDataSize(2ULL * 1024 * 1024 * 1024, 10 * 1024 * 1024, 16ULL * 1024 * 1024 * 1024) + , DisableForceShardSplit(0, 0, 1) {} void Register(TIntrusivePtr<NKikimr::TControlBoard>& icb) { @@ -64,6 +73,16 @@ struct TSplitSettings { icb->RegisterSharedControl(SplitByLoadMaxShardsDefault, "SchemeShard_SplitByLoadMaxShardsDefault"); icb->RegisterSharedControl(MergeByLoadMinUptimeSec, "SchemeShard_MergeByLoadMinUptimeSec"); icb->RegisterSharedControl(MergeByLoadMinLowLoadDurationSec,"SchemeShard_MergeByLoadMinLowLoadDurationSec"); + + icb->RegisterSharedControl(ForceShardSplitDataSize, "SchemeShardControls.ForceShardSplitDataSize"); + icb->RegisterSharedControl(DisableForceShardSplit, "SchemeShardControls.DisableForceShardSplit"); + } + + TForceShardSplitSettings GetForceShardSplitSettings() const { + return TForceShardSplitSettings{ + .ForceShardSplitDataSize = ui64(ForceShardSplitDataSize), + .DisableForceShardSplit = ui64(DisableForceShardSplit) != 0, + }; } }; @@ -557,22 +576,30 @@ public: return ExpectedPartitionCount; } - bool TryAddShardToMerge(const TSplitSettings& splitSettings, TShardIdx shardIdx, TVector<TShardIdx>& shardsToMerge, + bool TryAddShardToMerge(const TSplitSettings& splitSettings, + const TForceShardSplitSettings& forceShardSplitSettings, + TShardIdx shardIdx, TVector<TShardIdx>& shardsToMerge, THashSet<TTabletId>& partOwners, ui64& totalSize, float& totalLoad) const; - bool CheckCanMergePartitions(const TSplitSettings& splitSettings, TShardIdx shardIdx, TVector<TShardIdx>& shardsToMerge) const; + bool CheckCanMergePartitions(const TSplitSettings& splitSettings, + const TForceShardSplitSettings& forceShardSplitSettings, + TShardIdx shardIdx, TVector<TShardIdx>& shardsToMerge) const; bool CheckFastSplitForPartition(const TSplitSettings& splitSettings, TShardIdx shardIdx, ui64 dataSize, ui64 rowCount) const; bool CheckSplitByLoad(const TSplitSettings& splitSettings, TShardIdx shardIdx, ui64 dataSize, ui64 rowCount) const; - bool IsSplitBySizeEnabled() const { + bool IsSplitBySizeEnabled(const TForceShardSplitSettings& params) const { + // Respect unspecified SizeToSplit when force shard splits are disabled + if (params.DisableForceShardSplit && PartitionConfig().GetPartitioningPolicy().GetSizeToSplit() == 0) { + return false; + } // Auto split is always enabled, unless table is using external blobs return !PartitionConfigHasExternalBlobsEnabled(PartitionConfig()); } - bool IsMergeBySizeEnabled() const { + bool IsMergeBySizeEnabled(const TForceShardSplitSettings& params) const { // Auto merge is only enabled when auto split is also enabled - if (!IsSplitBySizeEnabled()) { + if (!IsSplitBySizeEnabled(params)) { return false; } // We want auto merge enabled when user has explicitly specified the @@ -585,7 +612,7 @@ public: // We also want auto merge enabled when table has more shards than the // specified maximum number of partitions. This way when something // splits by size over the limit we merge some smaller partitions. - return Partitions.size() > GetMaxPartitionsCount(); + return Partitions.size() > GetMaxPartitionsCount() && !params.DisableForceShardSplit; } bool IsSplitByLoadEnabled() const { @@ -596,27 +623,29 @@ public: return IsSplitByLoadEnabled(); } - static ui64 GetShardForceSizeToSplit() { - return 2ULL * 1024 * 1024 * 1024; // 2GiB - } - - ui64 GetShardSizeToSplit() const { - if (!IsSplitBySizeEnabled()) { + ui64 GetShardSizeToSplit(const TForceShardSplitSettings& params) const { + if (!IsSplitBySizeEnabled(params)) { return Max<ui64>(); } ui64 threshold = PartitionConfig().GetPartitioningPolicy().GetSizeToSplit(); - if (threshold == 0 || threshold >= GetShardForceSizeToSplit()) { - return GetShardForceSizeToSplit(); + if (params.DisableForceShardSplit) { + if (threshold == 0) { + return Max<ui64>(); + } + } else { + if (threshold == 0 || threshold >= params.ForceShardSplitDataSize) { + return params.ForceShardSplitDataSize; + } } return threshold; } - ui64 GetSizeToMerge() const { - if (!IsMergeBySizeEnabled()) { + ui64 GetSizeToMerge(const TForceShardSplitSettings& params) const { + if (!IsMergeBySizeEnabled(params)) { // Disable auto-merge by default return 0; } else { - return GetShardSizeToSplit() / 2; + return GetShardSizeToSplit(params) / 2; } } @@ -630,24 +659,24 @@ public: return val == 0 ? 32*1024 : val; } - bool IsForceSplitBySizeShardIdx(TShardIdx shardIdx) const { - if (!Stats.PartitionStats.contains(shardIdx)) { + bool IsForceSplitBySizeShardIdx(TShardIdx shardIdx, const TForceShardSplitSettings& params) const { + if (!Stats.PartitionStats.contains(shardIdx) || params.DisableForceShardSplit) { return false; } const auto& stats = Stats.PartitionStats.at(shardIdx); - return stats.DataSize >= GetShardForceSizeToSplit(); + return stats.DataSize >= params.ForceShardSplitDataSize; } - bool ShouldSplitBySize(ui64 dataSize) const { - if (!IsSplitBySizeEnabled()) { + bool ShouldSplitBySize(ui64 dataSize, const TForceShardSplitSettings& params) const { + if (!IsSplitBySizeEnabled(params)) { return false; } // When shard is over the maximum size we split even when over max partitions - if (dataSize >= GetShardForceSizeToSplit()) { + if (dataSize >= params.ForceShardSplitDataSize && !params.DisableForceShardSplit) { return true; } // Otherwise we split when we may add one more partition - return Partitions.size() < GetMaxPartitionsCount() && dataSize >= GetShardSizeToSplit(); + return Partitions.size() < GetMaxPartitionsCount() && dataSize >= GetShardSizeToSplit(params); } bool NeedRecreateParts() const { |