summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksei Borzenkov <[email protected]>2022-06-09 18:31:51 +0300
committerAleksei Borzenkov <[email protected]>2022-06-09 18:31:51 +0300
commit523dd687ec846447ca07d38c12ed29a85479cf68 (patch)
tree8eed926f471f22c3e4eaf5c59a2990e23ccf7148
parent0cfac1cb0d218fe93b0417b0663a9a876fa71e67 (diff)
Configure force shard split with icb, KIKIMR-15059
ref:a3c4cadd1169fb56856454e2cd331e6a8c43be87
-rw-r--r--ydb/core/protos/config.proto14
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp3
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__table_stats.cpp5
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__table_stats_histogram.cpp3
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.cpp16
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.h77
6 files changed, 84 insertions, 34 deletions
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index c324d24be15..05c9e4e7fc4 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1277,9 +1277,23 @@ message TImmediateControlsConfig {
DefaultValue: 250000 }];
}
+ message TSchemeShardControls {
+ optional uint64 ForceShardSplitDataSize = 1 [(ControlOptions) = {
+ Description: "Forces shards to split when reaching the given data size (2 GiB by default)",
+ MinValue: 10485760, // 10 MiB
+ MaxValue: 17179869184, // 16 GiB
+ DefaultValue: 2147483648 }];
+ optional uint64 DisableForceShardSplit = 2 [(ControlOptions) = {
+ Description: "Disables forced shard splits, for special cases only",
+ MinValue: 0,
+ MaxValue: 1,
+ DefaultValue: 0 }];
+ }
+
optional TDataShardControls DataShardControls = 1;
optional TTxLimitControls TxLimitControls = 2;
optional TCoordinatorControls CoordinatorControls = 3;
+ optional TSchemeShardControls SchemeShardControls = 4;
};
message TMeteringConfig {
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp
index 055525640ff..6b1b99425e0 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp
@@ -651,9 +651,10 @@ public:
}
auto srcShardIdx = tableInfo->GetPartitions()[srcPartitionIdx].ShardIdx;
+ const auto forceShardSplitSettings = context.SS->SplitSettings.GetForceShardSplitSettings();
if (tableInfo->GetExpectedPartitionCount() + count - 1 > tableInfo->GetMaxPartitionsCount() &&
- !tableInfo->IsForceSplitBySizeShardIdx(srcShardIdx))
+ !tableInfo->IsForceSplitBySizeShardIdx(srcShardIdx, forceShardSplitSettings))
{
errStr = "Reached MaxPartitionsCount limit: " + ToString(tableInfo->GetMaxPartitionsCount());
return false;
diff --git a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
index 065836b85d1..66100f74349 100644
--- a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
@@ -141,6 +141,7 @@ bool TTxStorePartitionStats::Execute(TTransactionContext& txc, const TActorConte
}
auto shardIdx = Self->TabletIdToShardIdx[datashardId];
+ const auto forceShardSplitSettings = Self->SplitSettings.GetForceShardSplitSettings();
TTableInfo::TPartitionStats newStats;
newStats.SeqNo = TMessageSeqNo(rec.GetGeneration(), rec.GetRound());
@@ -258,7 +259,7 @@ bool TTxStorePartitionStats::Execute(TTransactionContext& txc, const TActorConte
}
TVector<TShardIdx> shardsToMerge;
- if (table->CheckCanMergePartitions(Self->SplitSettings, shardIdx, shardsToMerge)) {
+ if (table->CheckCanMergePartitions(Self->SplitSettings, forceShardSplitSettings, shardIdx, shardsToMerge)) {
TTxId txId = Self->GetCachedTxId(ctx);
if (!txId) {
@@ -291,7 +292,7 @@ bool TTxStorePartitionStats::Execute(TTransactionContext& txc, const TActorConte
ui64 dataSizeResolution = 0; // Datashard will use default resolution
ui64 rowCountResolution = 0; // Datashard will use default resolution
bool collectKeySample = false;
- if (table->ShouldSplitBySize(dataSize)) {
+ if (table->ShouldSplitBySize(dataSize, forceShardSplitSettings)) {
// We would like to split by size and do this no matter how many partitions there are
} else if (table->GetPartitions().size() >= table->GetMaxPartitionsCount()) {
// We cannot split as there are max partitions already
diff --git a/ydb/core/tx/schemeshard/schemeshard__table_stats_histogram.cpp b/ydb/core/tx/schemeshard/schemeshard__table_stats_histogram.cpp
index 4f15001413c..0c5adea88cc 100644
--- a/ydb/core/tx/schemeshard/schemeshard__table_stats_histogram.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__table_stats_histogram.cpp
@@ -308,9 +308,10 @@ bool TTxPartitionHistogram::Execute(TTransactionContext& txc, const TActorContex
return true;
auto shardIdx = Self->TabletIdToShardIdx[datashardId];
+ const auto forceShardSplitSettings = Self->SplitSettings.GetForceShardSplitSettings();
ESplitReason splitReason = ESplitReason::NO_SPLIT;
- if (table->ShouldSplitBySize(dataSize)) {
+ if (table->ShouldSplitBySize(dataSize, forceShardSplitSettings)) {
splitReason = ESplitReason::SPLIT_BY_SIZE;
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp
index 9057806cb87..ebd7d4aa67a 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp
@@ -1419,6 +1419,7 @@ void TTableInfo::FinishSplitMergeOp(TOperationId opId) {
bool TTableInfo::TryAddShardToMerge(const TSplitSettings& splitSettings,
+ const TForceShardSplitSettings& forceShardSplitSettings,
TShardIdx shardIdx, TVector<TShardIdx>& shardsToMerge,
THashSet<TTabletId>& partOwners, ui64& totalSize, float& totalLoad) const
{
@@ -1451,7 +1452,7 @@ bool TTableInfo::TryAddShardToMerge(const TSplitSettings& splitSettings,
bool canMerge = false;
// Check if we can try merging by size
- if (IsMergeBySizeEnabled() && stats->DataSize + totalSize <= GetSizeToMerge()) {
+ if (IsMergeBySizeEnabled(forceShardSplitSettings) && stats->DataSize + totalSize <= GetSizeToMerge(forceShardSplitSettings)) {
canMerge = true;
}
@@ -1466,7 +1467,7 @@ bool TTableInfo::TryAddShardToMerge(const TSplitSettings& splitSettings,
return false;
// Check that total size doesn't exceed the limits
- if (IsSplitBySizeEnabled() && stats->DataSize + totalSize >= GetShardSizeToSplit()*0.9) {
+ if (IsSplitBySizeEnabled(forceShardSplitSettings) && stats->DataSize + totalSize >= GetShardSizeToSplit(forceShardSplitSettings)*0.9) {
return false;
}
@@ -1500,7 +1501,10 @@ bool TTableInfo::TryAddShardToMerge(const TSplitSettings& splitSettings,
return true;
}
-bool TTableInfo::CheckCanMergePartitions(const TSplitSettings& splitSettings, TShardIdx shardIdx, TVector<TShardIdx>& shardsToMerge) const {
+bool TTableInfo::CheckCanMergePartitions(const TSplitSettings& splitSettings,
+ const TForceShardSplitSettings& forceShardSplitSettings,
+ TShardIdx shardIdx, TVector<TShardIdx>& shardsToMerge) const
+{
// Don't split/merge backup tables
if (IsBackup) {
return false;
@@ -1527,12 +1531,12 @@ bool TTableInfo::CheckCanMergePartitions(const TSplitSettings& splitSettings, TS
THashSet<TTabletId> partOwners;
// Make sure we can actually merge current shard first
- if (!TryAddShardToMerge(splitSettings, shardIdx, shardsToMerge, partOwners, totalSize, totalLoad)) {
+ if (!TryAddShardToMerge(splitSettings, forceShardSplitSettings, shardIdx, shardsToMerge, partOwners, totalSize, totalLoad)) {
return false;
}
for (i64 pi = partitionIdx - 1; pi >= 0; --pi) {
- if (!TryAddShardToMerge(splitSettings, GetPartitions()[pi].ShardIdx, shardsToMerge, partOwners, totalSize, totalLoad)) {
+ if (!TryAddShardToMerge(splitSettings, forceShardSplitSettings, GetPartitions()[pi].ShardIdx, shardsToMerge, partOwners, totalSize, totalLoad)) {
break;
}
}
@@ -1540,7 +1544,7 @@ bool TTableInfo::CheckCanMergePartitions(const TSplitSettings& splitSettings, TS
Reverse(shardsToMerge.begin(), shardsToMerge.end());
for (ui64 pi = partitionIdx + 1; pi < GetPartitions().size(); ++pi) {
- if (!TryAddShardToMerge(splitSettings, GetPartitions()[pi].ShardIdx, shardsToMerge, partOwners, totalSize, totalLoad)) {
+ if (!TryAddShardToMerge(splitSettings, forceShardSplitSettings, GetPartitions()[pi].ShardIdx, shardsToMerge, partOwners, totalSize, totalLoad)) {
break;
}
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h
index 041bd035bf5..b08827004f9 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.h
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h
@@ -33,6 +33,11 @@
namespace NKikimr {
namespace NSchemeShard {
+struct TForceShardSplitSettings {
+ ui64 ForceShardSplitDataSize;
+ bool DisableForceShardSplit;
+};
+
struct TSplitSettings {
TControlWrapper SplitMergePartCountLimit;
TControlWrapper FastSplitSizeThreshold;
@@ -42,6 +47,8 @@ struct TSplitSettings {
TControlWrapper SplitByLoadMaxShardsDefault;
TControlWrapper MergeByLoadMinUptimeSec;
TControlWrapper MergeByLoadMinLowLoadDurationSec;
+ TControlWrapper ForceShardSplitDataSize;
+ TControlWrapper DisableForceShardSplit;
TSplitSettings()
: SplitMergePartCountLimit(2000, -1, 1000000)
@@ -52,6 +59,8 @@ struct TSplitSettings {
, SplitByLoadMaxShardsDefault(50, 0, 10000)
, MergeByLoadMinUptimeSec(10*60, 0, 4ll*1000*1000*1000)
, MergeByLoadMinLowLoadDurationSec(1*60*60, 0, 4ll*1000*1000*1000)
+ , ForceShardSplitDataSize(2ULL * 1024 * 1024 * 1024, 10 * 1024 * 1024, 16ULL * 1024 * 1024 * 1024)
+ , DisableForceShardSplit(0, 0, 1)
{}
void Register(TIntrusivePtr<NKikimr::TControlBoard>& icb) {
@@ -64,6 +73,16 @@ struct TSplitSettings {
icb->RegisterSharedControl(SplitByLoadMaxShardsDefault, "SchemeShard_SplitByLoadMaxShardsDefault");
icb->RegisterSharedControl(MergeByLoadMinUptimeSec, "SchemeShard_MergeByLoadMinUptimeSec");
icb->RegisterSharedControl(MergeByLoadMinLowLoadDurationSec,"SchemeShard_MergeByLoadMinLowLoadDurationSec");
+
+ icb->RegisterSharedControl(ForceShardSplitDataSize, "SchemeShardControls.ForceShardSplitDataSize");
+ icb->RegisterSharedControl(DisableForceShardSplit, "SchemeShardControls.DisableForceShardSplit");
+ }
+
+ TForceShardSplitSettings GetForceShardSplitSettings() const {
+ return TForceShardSplitSettings{
+ .ForceShardSplitDataSize = ui64(ForceShardSplitDataSize),
+ .DisableForceShardSplit = ui64(DisableForceShardSplit) != 0,
+ };
}
};
@@ -557,22 +576,30 @@ public:
return ExpectedPartitionCount;
}
- bool TryAddShardToMerge(const TSplitSettings& splitSettings, TShardIdx shardIdx, TVector<TShardIdx>& shardsToMerge,
+ bool TryAddShardToMerge(const TSplitSettings& splitSettings,
+ const TForceShardSplitSettings& forceShardSplitSettings,
+ TShardIdx shardIdx, TVector<TShardIdx>& shardsToMerge,
THashSet<TTabletId>& partOwners, ui64& totalSize, float& totalLoad) const;
- bool CheckCanMergePartitions(const TSplitSettings& splitSettings, TShardIdx shardIdx, TVector<TShardIdx>& shardsToMerge) const;
+ bool CheckCanMergePartitions(const TSplitSettings& splitSettings,
+ const TForceShardSplitSettings& forceShardSplitSettings,
+ TShardIdx shardIdx, TVector<TShardIdx>& shardsToMerge) const;
bool CheckFastSplitForPartition(const TSplitSettings& splitSettings, TShardIdx shardIdx, ui64 dataSize, ui64 rowCount) const;
bool CheckSplitByLoad(const TSplitSettings& splitSettings, TShardIdx shardIdx, ui64 dataSize, ui64 rowCount) const;
- bool IsSplitBySizeEnabled() const {
+ bool IsSplitBySizeEnabled(const TForceShardSplitSettings& params) const {
+ // Respect unspecified SizeToSplit when force shard splits are disabled
+ if (params.DisableForceShardSplit && PartitionConfig().GetPartitioningPolicy().GetSizeToSplit() == 0) {
+ return false;
+ }
// Auto split is always enabled, unless table is using external blobs
return !PartitionConfigHasExternalBlobsEnabled(PartitionConfig());
}
- bool IsMergeBySizeEnabled() const {
+ bool IsMergeBySizeEnabled(const TForceShardSplitSettings& params) const {
// Auto merge is only enabled when auto split is also enabled
- if (!IsSplitBySizeEnabled()) {
+ if (!IsSplitBySizeEnabled(params)) {
return false;
}
// We want auto merge enabled when user has explicitly specified the
@@ -585,7 +612,7 @@ public:
// We also want auto merge enabled when table has more shards than the
// specified maximum number of partitions. This way when something
// splits by size over the limit we merge some smaller partitions.
- return Partitions.size() > GetMaxPartitionsCount();
+ return Partitions.size() > GetMaxPartitionsCount() && !params.DisableForceShardSplit;
}
bool IsSplitByLoadEnabled() const {
@@ -596,27 +623,29 @@ public:
return IsSplitByLoadEnabled();
}
- static ui64 GetShardForceSizeToSplit() {
- return 2ULL * 1024 * 1024 * 1024; // 2GiB
- }
-
- ui64 GetShardSizeToSplit() const {
- if (!IsSplitBySizeEnabled()) {
+ ui64 GetShardSizeToSplit(const TForceShardSplitSettings& params) const {
+ if (!IsSplitBySizeEnabled(params)) {
return Max<ui64>();
}
ui64 threshold = PartitionConfig().GetPartitioningPolicy().GetSizeToSplit();
- if (threshold == 0 || threshold >= GetShardForceSizeToSplit()) {
- return GetShardForceSizeToSplit();
+ if (params.DisableForceShardSplit) {
+ if (threshold == 0) {
+ return Max<ui64>();
+ }
+ } else {
+ if (threshold == 0 || threshold >= params.ForceShardSplitDataSize) {
+ return params.ForceShardSplitDataSize;
+ }
}
return threshold;
}
- ui64 GetSizeToMerge() const {
- if (!IsMergeBySizeEnabled()) {
+ ui64 GetSizeToMerge(const TForceShardSplitSettings& params) const {
+ if (!IsMergeBySizeEnabled(params)) {
// Disable auto-merge by default
return 0;
} else {
- return GetShardSizeToSplit() / 2;
+ return GetShardSizeToSplit(params) / 2;
}
}
@@ -630,24 +659,24 @@ public:
return val == 0 ? 32*1024 : val;
}
- bool IsForceSplitBySizeShardIdx(TShardIdx shardIdx) const {
- if (!Stats.PartitionStats.contains(shardIdx)) {
+ bool IsForceSplitBySizeShardIdx(TShardIdx shardIdx, const TForceShardSplitSettings& params) const {
+ if (!Stats.PartitionStats.contains(shardIdx) || params.DisableForceShardSplit) {
return false;
}
const auto& stats = Stats.PartitionStats.at(shardIdx);
- return stats.DataSize >= GetShardForceSizeToSplit();
+ return stats.DataSize >= params.ForceShardSplitDataSize;
}
- bool ShouldSplitBySize(ui64 dataSize) const {
- if (!IsSplitBySizeEnabled()) {
+ bool ShouldSplitBySize(ui64 dataSize, const TForceShardSplitSettings& params) const {
+ if (!IsSplitBySizeEnabled(params)) {
return false;
}
// When shard is over the maximum size we split even when over max partitions
- if (dataSize >= GetShardForceSizeToSplit()) {
+ if (dataSize >= params.ForceShardSplitDataSize && !params.DisableForceShardSplit) {
return true;
}
// Otherwise we split when we may add one more partition
- return Partitions.size() < GetMaxPartitionsCount() && dataSize >= GetShardSizeToSplit();
+ return Partitions.size() < GetMaxPartitionsCount() && dataSize >= GetShardSizeToSplit(params);
}
bool NeedRecreateParts() const {