aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkruall <kruall@ydb.tech>2024-11-20 12:52:26 +0300
committerGitHub <noreply@github.com>2024-11-20 12:52:26 +0300
commit2058d8c118468d2bd6a31116b51f5d91b1048991 (patch)
tree5507b404634e1ae49a4e43290ab68eeb7d8621d3
parent2cf848f3e1004307b2b619c0263c6d7379acf7f4 (diff)
downloadydb-2058d8c118468d2bd6a31116b51f5d91b1048991.tar.gz
Add group_slots option to balance in ydb-dstool (#11589)
-rw-r--r--ydb/apps/dstool/lib/common.py12
-rw-r--r--ydb/apps/dstool/lib/dstool_cmd_cluster_balance.py803
2 files changed, 583 insertions, 232 deletions
diff --git a/ydb/apps/dstool/lib/common.py b/ydb/apps/dstool/lib/common.py
index fbaa41fb96..9f7b539126 100644
--- a/ydb/apps/dstool/lib/common.py
+++ b/ydb/apps/dstool/lib/common.py
@@ -201,7 +201,8 @@ class ConnectionParams:
self.parse_token(args.token_file)
self.domain = 1
- self.verbose = args.verbose
+ self.verbose = args.verbose or args.debug
+ self.debug = args.debug
self.quiet = args.quiet
self.http_timeout = args.http_timeout
self.cafile = args.cafile
@@ -210,6 +211,7 @@ class ConnectionParams:
def add_host_access_options(self, parser, with_endpoint=True):
self.parser = parser
parser.add_argument('--verbose', '-v', action='store_true', help='Be verbose during operation')
+ parser.add_argument('--debug', '-d', action='store_true', help='Be very verbose during operation')
parser.add_argument('--quiet', '-q', action='store_true', help="Don't show non-vital messages")
g = parser.add_argument_group('Server access options')
if with_endpoint:
@@ -440,7 +442,7 @@ def fetch(path, params={}, explicit_host=None, fmt='json', host=None, cache=True
if endpoint.protocol not in ('http', 'https'):
endpoint = connection_params.make_endpoint_info(f'{connection_params.mon_protocol}://{endpoint.host_with_mon_port}')
url = connection_params.make_url(endpoint, path, params)
- if connection_params.verbose:
+ if connection_params.debug:
print('INFO: fetching %s' % url, file=sys.stderr)
request = urllib.request.Request(url, data=data, method=method)
if connection_params.token and url.startswith('http'):
@@ -467,7 +469,7 @@ def invoke_grpc(func, *params, explicit_host=None, endpoint=None):
options = [
('grpc.max_receive_message_length', 256 << 20), # 256 MiB
]
- if connection_params.verbose:
+ if connection_params.debug:
p = ', '.join('<<< %s >>>' % text_format.MessageToString(param, as_one_line=True) for param in params)
print('INFO: issuing %s(%s) @%s:%d protocol %s' % (func, p, endpoint.host, endpoint.grpc_port,
endpoint.protocol), file=sys.stderr)
@@ -476,11 +478,11 @@ def invoke_grpc(func, *params, explicit_host=None, endpoint=None):
try:
stub = kikimr_grpc.TGRpcServerStub(channel)
res = getattr(stub, func)(*params)
- if connection_params.verbose:
+ if connection_params.debug:
print('INFO: result <<< %s >>>' % text_format.MessageToString(res, as_one_line=True), file=sys.stderr)
return res
except Exception as e:
- if connection_params.verbose:
+ if connection_params.debug:
print('ERROR: exception %s' % e, file=sys.stderr)
raise ConnectionError("Can't connect to specified addresses by gRPC protocol")
diff --git a/ydb/apps/dstool/lib/dstool_cmd_cluster_balance.py b/ydb/apps/dstool/lib/dstool_cmd_cluster_balance.py
index dd85a51ab4..0aa3831965 100644
--- a/ydb/apps/dstool/lib/dstool_cmd_cluster_balance.py
+++ b/ydb/apps/dstool/lib/dstool_cmd_cluster_balance.py
@@ -7,262 +7,611 @@ from collections import defaultdict, Counter
description = 'Move vdisks out from overpopulated pdisks.'
+class Constants:
+ WAITING_TIME = 15
+ TIME_BERWEEN_REASSIGNINGS = 0
+
+ @classmethod
+ def apply_args(cls, args):
+ if args.waiting_time:
+ cls.WAITING_TIME = args.waiting_time
+ if args.time_between_reassignings:
+ cls.TIME_BERWEEN_REASSIGNINGS = args.time_between_reassignings
+
+
def add_options(p):
p.add_argument('--max-replicating-pdisks', type=int, help='Limit number of maximum replicating PDisks in the cluster')
p.add_argument('--only-from-overpopulated-pdisks', action='store_true', help='Move vdisks out only from pdisks with over expected slot count')
- p.add_argument('--sort-by', choices=['slots', 'space_ratio'], default='slots', help='First to reassign disks with the most slots or with the highest space ratio')
- p.add_argument('--storage-pool', type=str, help='Storage pool to balance')
+ p.add_argument(
+ '--sort-by',
+ choices=['slots', 'space_ratio', 'group_slots'],
+ default='slots',
+ help='First to reassign disks with the most slots or with the highest space ratio or with the most vslots in the group'
+ )
+ g = p.add_mutually_exclusive_group()
+ g.add_argument('--storage-pool', type=str, help='Storage pool to balance')
+ common.add_group_ids_option(g)
p.add_argument('--max-donors-per-pdisk', type=int, default=0, help='Limit number of donors per pdisk')
+ p.add_argument('--allow-same-node', action='store_true', help='Allow to relocate vdisks from one group to the same node')
+ p.add_argument('--waiting-time', type=int, default=Constants.WAITING_TIME, help='Time to wait when there are no vdisks to reassign')
+ p.add_argument('--time-between-reassignings', type=int, default=Constants.TIME_BERWEEN_REASSIGNINGS, help='Time to wait between reassignings')
common.add_basic_format_options(p)
-def build_pdisk_statistics(base_config, pdisk_map, vsolts):
- pdisks_statistics = {
- pdisk_id: {
- "PDiskId": pdisk_id,
- "AvailableSize": pdisk.PDiskMetrics.AvailableSize,
- "TotalSize": pdisk.PDiskMetrics.TotalSize,
- "CandidateVSlots": [],
- "DonorVSlots": [],
+class ClusterInfo:
+ def __init__(self):
+ self.base_config = None
+ self.storage_pools = None
+ self.node_mon_map = None
+ self.vslot_map = None
+ self.pdisk_map = None
+ self.pdisk_usage = None
+ self.storage_pool_names_map = None
+ self.group_id_to_storage_pool_name_map = None
+ self.vdisks_groups_count_map = None
+ self.replicating_pdisks = None
+ self.pdisk_usage_w_donors = None
+
+ @staticmethod
+ def collect_cluster_info(count_replicating_pdisks=False):
+ info = ClusterInfo()
+ info.base_config = common.fetch_base_config()
+ info.storage_pools = common.fetch_storage_pools()
+ info.node_mon_map = common.fetch_node_mon_map({vslot.VSlotId.NodeId for vslot in info.base_config.VSlot})
+ info.vslot_map = common.build_vslot_map(info.base_config)
+ info.pdisk_map = common.build_pdisk_map(info.base_config)
+ info.pdisk_usage = common.build_pdisk_usage_map(info.base_config, count_donors=False)
+ info.pdisk_usage_w_donors = common.build_pdisk_usage_map(info.base_config, count_donors=True)
+
+ info.storage_pool_names_map = common.build_storage_pool_names_map(info.storage_pools)
+ info.group_id_to_storage_pool_name_map = {
+ group_id: info.storage_pool_names_map[(group.BoxId, group.StoragePoolId)]
+ for group_id, group in common.build_group_map(info.base_config).items()
+ if (group.BoxId, group.StoragePoolId) != (0, 0) # static group
}
- for pdisk_id, pdisk in pdisk_map.items()
- if pdisk.PDiskMetrics.TotalSize > 0 # pdisk works
- }
- for vslot in vsolts:
- pdisk_id = common.get_pdisk_id(vslot.VSlotId)
- pdisks_statistics[pdisk_id]["CandidateVSlots"].append(vslot)
- for vslot in base_config.VSlot:
- for donor in vslot.Donors:
- pdisk_id = common.get_pdisk_id(donor.VSlotId)
- pdisks_statistics[pdisk_id]["DonorVSlots"].append(donor)
- return pdisks_statistics
+ info.vdisks_groups_count_map = defaultdict(int)
+ for group in info.base_config.Group:
+ num = sum(vslot.Status == 'READY' for vslot in common.vslots_of_group(group, info.vslot_map)) - len(group.VSlotId)
+ info.vdisks_groups_count_map[num] += 1
+ return info
+
+ def list_replicating_pdisks(self):
+ if self.replicating_pdisks is not None:
+ return self.replicating_pdisks
+
+ self.replicating_pdisks = set()
+ for vslot in self.base_config.VSlot:
+ if vslot.Status != 'READY' and vslot.Status != 'ERROR':
+ self.replicating_pdisks.add(common.get_pdisk_id(vslot.VSlotId))
+ return self.replicating_pdisks
+
+
+class GroupsInfo:
+ def __init__(self):
+ self.all_groups = None
+ self.healthy_groups = None
+ self.unhealthy_groups = None
+
+ @staticmethod
+ def collect_groups_info(cluster_info):
+ groups_info = GroupsInfo()
+ groups_info.all_groups = common.select_groups(cluster_info.base_config)
+ groups_info.healthy_groups = common.filter_healthy_groups(groups_info.all_groups, cluster_info.node_mon_map, cluster_info.base_config, cluster_info.vslot_map)
+ groups_info.unhealthy_groups = groups_info.all_groups - groups_info.healthy_groups
+ groups_info.healthy_vslots = [
+ vslot
+ for vslot in cluster_info.base_config.VSlot
+ if vslot.GroupId in groups_info.healthy_groups
+ ]
+ return groups_info
-def do(args):
- while True:
- common.flush_cache()
+ def filter_healthy_vslots(self, vslots):
+ return [vslot for vslot in vslots if vslot.GroupId in self.healthy_groups]
- base_config = common.fetch_base_config()
- storage_pools = common.fetch_storage_pools()
- node_mon_map = common.fetch_node_mon_map({vslot.VSlotId.NodeId for vslot in base_config.VSlot})
- vslot_map = common.build_vslot_map(base_config)
- pdisk_map = common.build_pdisk_map(base_config)
- pdisk_usage = common.build_pdisk_usage_map(base_config, count_donors=False)
- pdisk_usage_w_donors = common.build_pdisk_usage_map(base_config, count_donors=True)
-
- storage_pool_names_map = common.build_storage_pool_names_map(storage_pools)
- group_id_to_storage_pool_name_map = {
- group_id: storage_pool_names_map[(group.BoxId, group.StoragePoolId)]
- for group_id, group in common.build_group_map(base_config).items()
- if (group.BoxId, group.StoragePoolId) != (0, 0) # static group
- }
- vdisks_groups_count_map = defaultdict(int)
- for group in base_config.Group:
- num = sum(vslot.Status == 'READY' for vslot in common.vslots_of_group(group, vslot_map)) - len(group.VSlotId)
- vdisks_groups_count_map[num] += 1
-
- if any(k < -1 for k in vdisks_groups_count_map.keys()):
- common.print_if_not_quiet(args, 'There are groups with more than one non READY vslot, waiting...', sys.stdout)
- common.print_if_verbose(args, f'Number of non READY vdisks -> number of groups: {sorted(vdisks_groups_count_map.items())}', file=sys.stdout)
- time.sleep(15)
- continue
-
- if args.max_replicating_pdisks is not None:
- replicating_pdisks = set()
- for vslot in base_config.VSlot:
- if vslot.Status != 'READY' and vslot.Status != 'ERROR':
- replicating_pdisks.add(common.get_pdisk_id(vslot.VSlotId))
-
- if len(replicating_pdisks) > args.max_replicating_pdisks:
- common.print_if_not_quiet(args, 'Waiting for %d pdisks to finish replication...' % (len(replicating_pdisks) - args.max_replicating_pdisks), sys.stdout)
- common.print_if_verbose(args, 'Replicating pdisks: ' + ', '.join('[%d:%d]' % x for x in sorted(replicating_pdisks)), file=sys.stdout)
- time.sleep(15)
- continue
+def list_overpopulated_pdisks(cluster_info):
+ overpopulated_pdisks = set()
+ for pdisk_id in cluster_info.pdisk_map.keys():
+ expected_slot_count = cluster_info.pdisk_map[pdisk_id].ExpectedSlotCount
+ pdisk_usage = cluster_info.pdisk_usage[pdisk_id]
+ if expected_slot_count and pdisk_usage > expected_slot_count:
+ overpopulated_pdisks.add(pdisk_id)
+ return overpopulated_pdisks
- all_groups = common.select_groups(base_config)
- healthy_groups = common.filter_healthy_groups(all_groups, node_mon_map, base_config, vslot_map)
- unhealthy_groups = all_groups - healthy_groups
- if unhealthy_groups:
- common.print_if_verbose(args, 'Skipping vdisks from unhealthy groups: %s' % (unhealthy_groups), file=sys.stdout)
- healthy_vslots = [
- vslot
- for vslot in base_config.VSlot
- if vslot.GroupId in healthy_groups
- ]
+def filter_vslots_by_group_ids(vslots, group_ids):
+ if group_ids is None:
+ return vslots
+ return [vslot for vslot in vslots if vslot.GroupId in group_ids]
- overpopulated_pdisks = set()
- for pdisk_id in pdisk_map.keys():
- if pdisk_map[pdisk_id].ExpectedSlotCount and pdisk_usage[pdisk_id] > pdisk_map[pdisk_id].ExpectedSlotCount:
- overpopulated_pdisks.add(pdisk_id)
- if not overpopulated_pdisks:
- common.print_if_not_quiet(args, 'No overpopulated pdisks found', sys.stdout)
- if args.only_from_overpopulated_pdisks:
- common.print_status(args, success=True, error_reason='')
- break
+def filter_vslots_by_pdisks(vslots, pdisks):
+ if pdisks is None:
+ return vslots
+ return [vslot for vslot in vslots if common.get_pdisk_id(vslot.VSlotId) in pdisks]
- healthy_vslots_from_overpopulated_pdisks = []
- for vslot in base_config.VSlot:
- pdisk_id = common.get_pdisk_id(vslot.VSlotId)
- if pdisk_id not in overpopulated_pdisks:
- continue
- if vslot.GroupId not in healthy_groups:
- continue
- healthy_vslots_from_overpopulated_pdisks.append(vslot)
-
- candidate_vslots = []
- if healthy_vslots_from_overpopulated_pdisks:
- common.print_if_not_quiet(args, f'Found {len(healthy_vslots_from_overpopulated_pdisks)} vdisks in healthy groups from overpopulated pdisks', sys.stdout)
- candidate_vslots = healthy_vslots_from_overpopulated_pdisks
- elif healthy_vslots and not args.only_from_overpopulated_pdisks:
- common.print_if_not_quiet(args, f'Found {len(healthy_vslots)} vdisks in healthy groups', sys.stdout)
- candidate_vslots = healthy_vslots
-
- if args.storage_pool is not None:
- existing_storage_pools = set(group_id_to_storage_pool_name_map.values())
- if args.storage_pool not in existing_storage_pools:
- print(f"Storage pool {args.storage_pool} not found in existing storage pools: {existing_storage_pools}")
- sys.exit(1)
- candidate_vslots = [vslot for vslot in candidate_vslots if group_id_to_storage_pool_name_map[vslot.GroupId] == args.storage_pool]
- common.print_if_not_quiet(args, f'Found {len(candidate_vslots)} vdisks in {args.storage_pool} sotrage pool', sys.stdout)
-
- if args.max_donors_per_pdisk > 0:
- donors_per_pdisk = common.build_donors_per_pdisk_map(base_config)
- candidate_vslots = [vslot for vslot in candidate_vslots if donors_per_pdisk[common.get_pdisk_id(vslot.VSlotId)] < args.max_donors_per_pdisk]
- common.print_if_not_quiet(args, f'Found {len(candidate_vslots)} vdisks with donors per pdisk < {args.max_donors_per_pdisk}', sys.stdout)
-
- if len(candidate_vslots) == 0:
- common.print_if_not_quiet(args, 'No vdisks suitable for relocation found, waiting..', sys.stdout)
- time.sleep(10)
- continue
-
- histo = Counter(pdisk_usage.values())
- common.print_if_verbose(args, 'Number of used slots -> number pdisks: ' + ' '.join('%d=>%d' % (k, histo[k]) for k in sorted(histo)), file=sys.stdout)
-
- def do_reassign(vslot, try_blocking):
- pdisk_id = common.get_pdisk_id(vslot.VSlotId)
- vslot_id = common.get_vslot_id(vslot.VSlotId)
-
- common.print_if_verbose(args, 'Checking to relocate vdisk from vslot %s on pdisk %s with slot usage %d' % (vslot_id, pdisk_id, pdisk_usage[pdisk_id]), file=sys.stdout)
-
- current_usage = pdisk_usage[pdisk_id]
- if not healthy_vslots_from_overpopulated_pdisks:
- for i in range(0, current_usage - 1):
- if histo[i]:
- break
- else:
- return False
-
- def add_update_drive_status(request, pdisk, status):
- cmd = request.Command.add().UpdateDriveStatus
- cmd.HostKey.NodeId = pdisk.NodeId
- cmd.PDiskId = pdisk.PDiskId
- cmd.Status = status
-
- def add_reassign_cmd(request, vslot):
- cmd = request.Command.add().ReassignGroupDisk
- cmd.GroupId = vslot.GroupId
- cmd.GroupGeneration = vslot.GroupGeneration
- cmd.FailRealmIdx = vslot.FailRealmIdx
- cmd.FailDomainIdx = vslot.FailDomainIdx
- cmd.VDiskIdx = vslot.VDiskIdx
+def filter_vslots_by_donors_per_pdisk(vslots, cluster_info, max_donors_per_pdisk):
+ if max_donors_per_pdisk == 0:
+ return vslots
+ donors_per_pdisk = common.build_donors_per_pdisk_map(cluster_info.base_config)
+ return [vslot for vslot in vslots if donors_per_pdisk[common.get_pdisk_id(vslot.VSlotId)] < max_donors_per_pdisk]
+
+
+class IBalancingStrategy:
+ def __init__(self, args, cluster_info, groups_info):
+ self.args = args
+ self.cluster_info = cluster_info
+ self.groups_info = groups_info
+
+ def verify_cluster_state(self):
+ return False
+
+ def check_waiting_conditions(self):
+ return False
+
+ def check_success_conditions(self):
+ return False
+
+ def calculate_extra_info(self):
+ return False
+ def list_candidate_vslots(self):
+ return self.groups_info.healthy_vslots, False
+
+ def filter_must_first_vslots(self, candidate_vslots):
+ return candidate_vslots
+
+ def order_candidate_vslots(self, candidate_vslots):
+ return [candidate_vslots]
+
+ def reassign_vslot(self, vslot, try_blocking):
+ return False
+
+
+class BalancingStrategy(IBalancingStrategy):
+ def __init__(self, args, cluster_info, groups_info):
+ super().__init__(args, cluster_info, groups_info)
+ self.histo = None
+
+ def verify_cluster_state(self):
+ existing_storage_pools = set(self.cluster_info.group_id_to_storage_pool_name_map.values())
+ if self.args.storage_pool is not None and self.args.storage_pool not in existing_storage_pools:
+ print(f"Storage pool {self.args.storage_pool} not found in existing storage pools: {list(sorted(existing_storage_pools))}")
+ return True
+ return False
+
+ def check_waiting_conditions(self):
+ if any(k < -1 for k in self.cluster_info.vdisks_groups_count_map.keys()):
+ common.print_if_not_quiet(self.args, 'There are groups with more than one non READY vslot, waiting...', sys.stdout)
+ groups_count_str = ', '.join(f'{k}: {v}' for k, v in sorted(self.cluster_info.vdisks_groups_count_map.items()))
+ common.print_if_verbose(self.args, f'Number of non READY vdisks -> number of groups: {groups_count_str}', file=sys.stdout)
+ return True
+
+ if self.args.max_replicating_pdisks is not None:
+ replicating_pdisks = self.cluster_info.list_replicating_pdisks()
+ if len(replicating_pdisks) > self.args.max_replicating_pdisks:
+ over_replication_count = len(replicating_pdisks) - self.args.max_replicating_pdisks
+ common.print_if_not_quiet(self.args, f'Waiting for {over_replication_count} pdisks to finish replication...', sys.stdout)
+ replicating_pdisks_str = ', '.join(f'[{x[0]}:{x[1]}]' for x in sorted(replicating_pdisks))
+ common.print_if_verbose(self.args, f'Replicating pdisks: {replicating_pdisks_str}', file=sys.stdout)
+ return True
+
+ def calculate_extra_info(self):
+ self.histo = Counter(self.cluster_info.pdisk_usage.values())
+ common.print_if_verbose(self.args, 'Number of used slots -> number pdisks: ' + ' '.join('%d=>%d' % (k, self.histo[k]) for k in sorted(self.histo)), file=sys.stdout)
+ return False
+
+ def filter_must_first_vslots(self, candidate_vslots):
+ if self.args.only_from_overpopulated_pdisks:
+ overpopulated_pdisks = list_overpopulated_pdisks(self.cluster_info)
+ return filter_vslots_by_pdisks(candidate_vslots, overpopulated_pdisks)
+ return candidate_vslots
+
+ def list_candidate_vslots(self):
+ candidate_vslots = self.cluster_info.base_config.VSlot
+ candidate_vslots = filter_vslots_by_group_ids(candidate_vslots, self.args.group_ids)
+ candidate_vslots = self.filter_must_first_vslots(candidate_vslots)
+ if not candidate_vslots:
+ return [], True
+ candidate_vslots = self.groups_info.filter_healthy_vslots(candidate_vslots)
+ candidate_vslots = filter_vslots_by_donors_per_pdisk(candidate_vslots, self.cluster_info, self.args.max_donors_per_pdisk)
+ return candidate_vslots, False
+
+ def order_candidate_vslots(self, candidate_vslots):
+ return []
+
+ def _add_update_drive_status(self, request, pdisk, status):
+ cmd = request.Command.add().UpdateDriveStatus
+ cmd.HostKey.NodeId = pdisk.NodeId
+ cmd.PDiskId = pdisk.PDiskId
+ cmd.Status = status
+
+ def _add_reassign_cmd(self, request, vslot):
+ cmd = request.Command.add().ReassignGroupDisk
+ cmd.GroupId = vslot.GroupId
+ cmd.GroupGeneration = vslot.GroupGeneration
+ cmd.FailRealmIdx = vslot.FailRealmIdx
+ cmd.FailDomainIdx = vslot.FailDomainIdx
+ cmd.VDiskIdx = vslot.VDiskIdx
+
+ def reassign_vslot(self, vslot, try_blocking):
+ pdisk_id = common.get_pdisk_id(vslot.VSlotId)
+ vslot_id = common.get_vslot_id(vslot.VSlotId)
+
+ pdisk_usage = self.cluster_info.pdisk_usage
+ pdisk_usage_w_donors = self.cluster_info.pdisk_usage_w_donors
+ pdisk_map = self.cluster_info.pdisk_map
+ histo = self.histo
+
+ common.print_if_verbose(self.args, 'Checking to relocate vdisk from vslot %s on pdisk %s with slot usage %d' % (vslot_id, pdisk_id, pdisk_usage[pdisk_id]), file=sys.stdout)
+
+ current_usage = pdisk_usage[pdisk_id]
+ if not self.args.only_from_overpopulated_pdisks:
+ for i in range(0, current_usage - 1):
+ if histo[i]:
+ break
+ else:
+ return False
+
+ request = common.kikimr_bsconfig.TConfigRequest(Rollback=True)
+ index = len(request.Command)
+ self._add_reassign_cmd(request, vslot)
+ response = common.invoke_bsc_request(request)
+ if len(response.Status) != 1 or not response.Status[0].Success:
+ return False
+ item = response.Status[index].ReassignedItem[0]
+ pdisk_from = item.From.NodeId, item.From.PDiskId
+ pdisk_to = item.To.NodeId, item.To.PDiskId
+ if pdisk_usage[pdisk_to] + 1 > pdisk_usage[pdisk_from] - 1:
+ if pdisk_usage_w_donors[pdisk_to] + 1 > pdisk_map[pdisk_to].ExpectedSlotCount:
+ common.print_if_not_quiet(
+ self.args,
+ 'NOTICE: Attempted to reassign vdisk from pdisk [%d:%d] to pdisk [%d:%d] with slot usage %d and slot limit %d on latter',
+ *pdisk_from, *pdisk_to, pdisk_usage_w_donors[pdisk_to], pdisk_map[pdisk_to].ExpectedSlotCount)
+ return False
+
+ if not try_blocking:
+ return False
request = common.kikimr_bsconfig.TConfigRequest(Rollback=True)
+ inactive = []
+ for pdisk in self.cluster_info.base_config.PDisk:
+ check_pdisk_id = common.get_pdisk_id(pdisk)
+ disk_is_better = pdisk_usage_w_donors[check_pdisk_id] + 1 <= pdisk_map[check_pdisk_id].ExpectedSlotCount
+ if disk_is_better:
+ if not self.args.healthy_vslots_from_overpopulated_pdisks and pdisk_usage[check_pdisk_id] + 1 > pdisk_usage[pdisk_id] - 1:
+ disk_is_better = False
+ if self.args.healthy_vslots_from_overpopulated_pdisks:
+ disk_is_better = False
+
+ if not disk_is_better:
+ self._add_update_drive_status(request, pdisk, common.kikimr_bsconfig.EDriveStatus.INACTIVE)
+ inactive.append(pdisk)
index = len(request.Command)
- add_reassign_cmd(request, vslot)
+ self._add_reassign_cmd(request, vslot)
+ for pdisk in inactive:
+ self._add_update_drive_status(request, pdisk, pdisk.DriveStatus)
response = common.invoke_bsc_request(request)
- if len(response.Status) != 1 or not response.Status[0].Success:
+ if len(response.Status) != 1 or not response.Status[index].Success:
return False
- item = response.Status[index].ReassignedItem[0]
- pdisk_from = item.From.NodeId, item.From.PDiskId
- pdisk_to = item.To.NodeId, item.To.PDiskId
- if pdisk_usage[pdisk_to] + 1 > pdisk_usage[pdisk_from] - 1:
- if pdisk_usage_w_donors[pdisk_to] + 1 > pdisk_map[pdisk_to].ExpectedSlotCount:
- common.print_if_not_quiet(
- args,
- 'NOTICE: Attempted to reassign vdisk from pdisk [%d:%d] to pdisk [%d:%d] with slot usage %d and slot limit %d on latter',
- *pdisk_from, *pdisk_to, pdisk_usage_w_donors[pdisk_to], pdisk_map[pdisk_to].ExpectedSlotCount)
- return False
-
- if not try_blocking:
- return False
- request = common.kikimr_bsconfig.TConfigRequest(Rollback=True)
- inactive = []
- for pdisk in base_config.PDisk:
- check_pdisk_id = common.get_pdisk_id(pdisk)
- disk_is_better = pdisk_usage_w_donors[check_pdisk_id] + 1 <= pdisk_map[check_pdisk_id].ExpectedSlotCount
- if disk_is_better:
- if not healthy_vslots_from_overpopulated_pdisks and pdisk_usage[check_pdisk_id] + 1 > pdisk_usage[pdisk_id] - 1:
- disk_is_better = False
- if healthy_vslots_from_overpopulated_pdisks:
- disk_is_better = False
-
- if not disk_is_better:
- add_update_drive_status(request, pdisk, common.kikimr_bsconfig.EDriveStatus.INACTIVE)
- inactive.append(pdisk)
- index = len(request.Command)
- add_reassign_cmd(request, vslot)
- for pdisk in inactive:
- add_update_drive_status(request, pdisk, pdisk.DriveStatus)
- response = common.invoke_bsc_request(request)
- if len(response.Status) != 1 or not response.Status[index].Success:
- return False
-
- request.Rollback = args.dry_run
- response = common.invoke_bsc_request(request)
- if response.Status[index].Success:
- from_pdisk_id = common.get_pdisk_id(response.Status[index].ReassignedItem[0].From)
- to_pdisk_id = common.get_pdisk_id(response.Status[index].ReassignedItem[0].To)
- common.print_if_not_quiet(
- args,
- 'Relocated vdisk from pdisk [%d:%d] to pdisk [%d:%d] with slot usages (%d -> %d)' % (*from_pdisk_id, *to_pdisk_id, pdisk_usage[from_pdisk_id], pdisk_usage[to_pdisk_id]),
- file=sys.stdout)
+ request.Rollback = self.args.dry_run
+ response = common.invoke_bsc_request(request)
- if not common.is_successful_bsc_response(response):
- common.print_request_result(args, request, response)
- sys.exit(1)
+ if response.Status[index].Success:
+ from_pdisk_id = common.get_pdisk_id(response.Status[index].ReassignedItem[0].From)
+ to_pdisk_id = common.get_pdisk_id(response.Status[index].ReassignedItem[0].To)
+ common.print_if_not_quiet(
+ self.args,
+ 'Relocated vdisk from pdisk [%d:%d] to pdisk [%d:%d] with slot usages (%d -> %d)' % (*from_pdisk_id, *to_pdisk_id, pdisk_usage[from_pdisk_id], pdisk_usage[to_pdisk_id]),
+ file=sys.stdout)
- return True
- # end of do_reassign()
-
- vslots_ordered_groups_to_reassign = None
- if args.sort_by == 'slots':
- vslots_by_pdisk_slot_usage = defaultdict(list)
- for vslot in candidate_vslots:
- pdisk_id = common.get_pdisk_id(vslot.VSlotId)
- pdisk_slot_usage = pdisk_usage[pdisk_id]
- vslots_by_pdisk_slot_usage[pdisk_slot_usage].append(vslot)
- vslots_ordered_groups_to_reassign = [vslots for _, vslots in sorted(vslots_by_pdisk_slot_usage.items(), reverse=True)]
- elif args.sort_by == 'space_ratio':
- pdisks = {
- pdisk_id: {
- "FreeSpaceRatio": float(pdisk.PDiskMetrics.AvailableSize) / float(pdisk.PDiskMetrics.TotalSize),
- "CandidateVSlots": [],
- }
- for pdisk_id, pdisk in pdisk_map.items()
- if pdisk.PDiskMetrics.TotalSize > 0 # pdisk works
+ if not common.is_successful_bsc_response(response):
+ common.print_request_result(self.args, request, response)
+ sys.exit(1)
+
+ return True
+
+
+def order_vslots_by_pdisk_usage(vslots, pdisk_usage):
+ vslots_by_pdisk_slot_usage = defaultdict(list)
+ for vslot in vslots:
+ pdisk_id = common.get_pdisk_id(vslot.VSlotId)
+ pdisk_slot_usage = pdisk_usage[pdisk_id]
+ vslots_by_pdisk_slot_usage[pdisk_slot_usage].append(vslot)
+ return [vslots for _, vslots in sorted(vslots_by_pdisk_slot_usage.items(), reverse=True)]
+
+
+class VSlotBalancingStrategy(BalancingStrategy):
+ def __init__(self, args, cluster_info, groups_info):
+ super().__init__(args, cluster_info, groups_info)
+
+ def order_candidate_vslots(self, candidate_vslots):
+ return order_vslots_by_pdisk_usage(candidate_vslots, self.cluster_info.pdisk_usage)
+
+
+class SpaceRatioBalancingStrategy(BalancingStrategy):
+ def __init__(self, args, cluster_info, groups_info):
+ super().__init__(args, cluster_info, groups_info)
+
+ def order_candidate_vslots(self, candidate_vslots):
+ pdisks = {
+ pdisk_id: {
+ "FreeSpaceRatio": float(pdisk.PDiskMetrics.AvailableSize) / float(pdisk.PDiskMetrics.TotalSize),
+ "CandidateVSlots": [],
}
- for vslot in candidate_vslots:
- pdisk_id = common.get_pdisk_id(vslot.VSlotId)
- pdisks[pdisk_id]["CandidateVSlots"].append(vslot)
- vslots_ordered_groups_to_reassign = [info["CandidateVSlots"] for _, info in sorted(list(pdisks.items()), key=lambda x: x[1]["FreeSpaceRatio"])]
+ for pdisk_id, pdisk in self.cluster_info.pdisk_map.items()
+ if pdisk.PDiskMetrics.TotalSize > 0 # pdisk works
+ }
+ for vslot in candidate_vslots:
+ pdisk_id = common.get_pdisk_id(vslot.VSlotId)
+ pdisks[pdisk_id]["CandidateVSlots"].append(vslot)
+ return [info["CandidateVSlots"] for _, info in sorted(list(pdisks.items()), key=lambda x: x[1]["FreeSpaceRatio"])]
+
+
+class GroupVSlotsBalancingStrategy(BalancingStrategy):
+ def __init__(self, args, cluster_info, groups_info):
+ super().__init__(args, cluster_info, groups_info)
+ self.pdisk_groups_usage = None
+ self.total_vdisks = 0
+ self.ideal_distribution = None
+ self.max_vdisks_per_pdisk = None
+ self.min_vdisks_per_pdisk = None
+ self.group_pdisks = None
+ self.group_nodes = None
+ self.pdisk_ids = None
+ self.pdisk_type = None
+
+ def calculate_extra_info(self):
+ pdisk_types = set()
+ for pdisk in self.cluster_info.pdisk_map.values():
+ pdisk_types.add((pdisk.BoxId, pdisk.Type))
+
+ if len(pdisk_types) != 1:
+ print("All pdisks must be of the same type", file=sys.stderr)
+ return True
+
+ self.pdisk_type = pdisk_types.pop()
+ pdisk_box_id, pdisk_type = self.pdisk_type
- for vslots in vslots_ordered_groups_to_reassign:
- random.shuffle(vslots)
+ self.pdisk_ids = []
+ for pdisk_id, pdisk in self.cluster_info.pdisk_map.items():
+ if pdisk.BoxId == pdisk_box_id and pdisk.Type == pdisk_type and pdisk.DriveStatus == 1:
+ self.pdisk_ids.append(pdisk_id)
+
+ total_pdisks = len(self.pdisk_ids)
+
+ self.pdisk_groups_usage = defaultdict(int)
+
+ for pdisk_id in self.pdisk_ids:
+ self.pdisk_groups_usage[pdisk_id] = 0
+
+ vslots = filter_vslots_by_group_ids(self.cluster_info.base_config.VSlot, self.args.group_ids)
+
+ for vslot in vslots:
+ pdisk_id = common.get_pdisk_id(vslot.VSlotId)
+ self.pdisk_groups_usage[pdisk_id] += 1
+
+ self.total_vdisks = len(vslots)
+
+ self.ideal_distribution = (self.total_vdisks + total_pdisks - 1) // total_pdisks
+
+ self.max_vdisks_per_pdisk = max(self.pdisk_groups_usage.values())
+ self.min_vdisks_per_pdisk = min(self.pdisk_groups_usage.values())
+
+ self.group_pdisks = defaultdict(set)
+ self.group_nodes = defaultdict(set)
+ for vslot in vslots:
+ self.group_pdisks[vslot.GroupId].add(common.get_pdisk_id(vslot.VSlotId))
+ self.group_nodes[vslot.GroupId].add(vslot.VSlotId.NodeId)
+
+ common.print_if_not_quiet(self.args, f"VSlots distribuition: {self.max_vdisks_per_pdisk}..{self.min_vdisks_per_pdisk} (ideal {self.ideal_distribution})", file=sys.stdout)
+
+ def filter_must_first_vslots(self, candidate_vslots):
+ filtered_vslots = []
+ for vslot in candidate_vslots:
+ pdisk_id = common.get_pdisk_id(vslot.VSlotId)
+ if self.pdisk_groups_usage[pdisk_id] > self.ideal_distribution:
+ filtered_vslots.append(vslot)
+ if not filtered_vslots:
+ return candidate_vslots
+ return filtered_vslots
+
+ def order_candidate_vslots(self, candidate_vslots):
+ return order_vslots_by_pdisk_usage(candidate_vslots, self.pdisk_groups_usage)
+
+ def check_success_conditions(self):
+ if self.max_vdisks_per_pdisk <= self.ideal_distribution and self.min_vdisks_per_pdisk + 1 >= self.ideal_distribution:
+ common.print_if_verbose(self.args, "VDisk distribution is close to ideal", file=sys.stdout)
+ return True
+ return False
+
+ def _make_reassign_request(self, vslot, target_pdisk_id):
+ request = common.create_bsc_request(self.args)
+ cmd = request.Command.add().ReassignGroupDisk
+ cmd.GroupId = vslot.GroupId
+ cmd.GroupGeneration = vslot.GroupGeneration
+ cmd.FailRealmIdx = vslot.FailRealmIdx
+ cmd.FailDomainIdx = vslot.FailDomainIdx
+ cmd.VDiskIdx = 0
+ target = cmd.TargetPDiskId
+ target.NodeId = target_pdisk_id[0]
+ target.PDiskId = target_pdisk_id[1]
+ return request
+
+ def reassign_vslot(self, vslot, try_blocking):
+ pdisk_id = common.get_pdisk_id(vslot.VSlotId)
+ vslot_id = common.get_vslot_id(vslot.VSlotId)
+ common.print_if_verbose(self.args, f"Reassigning vdisk {vslot_id} of group {vslot.GroupId} from pdisk {pdisk_id}", sys.stdout)
+
+ current_pool_vdisks = self.pdisk_groups_usage[pdisk_id]
+ ideal_vdisks = self.ideal_distribution
+
+ if self.max_vdisks_per_pdisk > ideal_vdisks and current_pool_vdisks <= ideal_vdisks:
+ common.print_if_verbose(
+ self.args,
+ f"Current pool vdisks {current_pool_vdisks} on pdisk {pdisk_id} is less than ideal {ideal_vdisks}",
+ sys.stdout,
+ )
+ return False
+
+ min_pool_vdisks = self.max_vdisks_per_pdisk
+ target_pdisk = None
+
+ for check_pdisk_id, current_count in self.pdisk_groups_usage.items():
+ if check_pdisk_id == pdisk_id:
+ continue
+
+ if check_pdisk_id in self.group_pdisks[vslot.GroupId]:
+ continue
+
+ if check_pdisk_id[0] != pdisk_id[0] and not self.args.allow_same_node and check_pdisk_id[0] in self.group_nodes[vslot.GroupId]:
+ continue
+
+ check_pdisk = self.cluster_info.pdisk_map[check_pdisk_id]
+ if check_pdisk.DriveStatus != 1:
+ continue
+
+ if current_count < min_pool_vdisks:
+ min_pool_vdisks = current_count
+ target_pdisk = check_pdisk_id
+
+ if current_pool_vdisks <= min_pool_vdisks + 1:
+ common.print_if_verbose(
+ self.args,
+ f"Current pool vdisks {current_pool_vdisks} on pdisk {pdisk_id} is not greater than min pool vdisks + 1 ({min_pool_vdisks + 1})",
+ sys.stdout,
+ )
+ return False
+
+ if not target_pdisk:
+ common.print_if_verbose(
+ self.args,
+ f"Target pdisk {target_pdisk} not found",
+ sys.stdout,
+ )
+ return False
+
+ if min_pool_vdisks >= ideal_vdisks:
+ common.print_if_verbose(
+ self.args,
+ f"Min pool vdisks {min_pool_vdisks} on pdisk {pdisk_id} is more than ideal {ideal_vdisks}",
+ sys.stdout,
+ )
+ return False
+
+ request = self._make_reassign_request(vslot, target_pdisk)
+
+ response = common.invoke_bsc_request(request)
+ if not common.is_successful_bsc_response(response):
+ common.print_request_result(self.args, request, response)
+ return False
+
+ common.print_if_not_quiet(
+ self.args,
+ f"Reassigned vdisk {vslot_id} from pdisk {pdisk_id} ({current_pool_vdisks} vdisks) to pdisk {target_pdisk} ({min_pool_vdisks} vdisks)",
+ sys.stdout,
+ )
+ return True
+
+
+def balance_iteration(args, strategy, iteration_number):
+ if strategy.calculate_extra_info():
+ common.print_status(args, success=False, error_reason='Failed to calculate extra info')
+ return False
+ if strategy.verify_cluster_state():
+ common.print_status(args, success=False, error_reason='Cluster state is not valid')
+ return False
+ if strategy.check_waiting_conditions():
+ common.print_if_verbose(args, "Waiting for cluster state to become ready", file=sys.stdout)
+ time.sleep(Constants.WAITING_TIME)
+ return None
+ if strategy.check_success_conditions():
+ common.print_status(args, success=True, error_reason='')
+ return True
+
+ candidate_vslots, actually_no_vslots = strategy.list_candidate_vslots()
+ if actually_no_vslots:
+ common.print_status(args, success=True, error_reason='No vdisks suitable for relocation found')
+ return True
+
+ if not candidate_vslots:
+ common.print_if_not_quiet(args, 'No vdisks suitable for relocation found, waiting..', sys.stdout)
+ time.sleep(Constants.WAITING_TIME)
+ return None
+
+ vslots_ordered_groups_to_reassign = strategy.order_candidate_vslots(candidate_vslots)
+ common.print_if_verbose(args, f"Found {len(candidate_vslots)} candidate vslots, {len(vslots_ordered_groups_to_reassign)} groups to reassign", file=sys.stdout)
+ was_sent = False
+ for vslots in vslots_ordered_groups_to_reassign:
+ random.shuffle(vslots)
+ for vslot in vslots:
+ if strategy.reassign_vslot(vslot, False):
+ was_sent = True
+ break
+ else:
for vslot in vslots:
- if do_reassign(vslot, False):
+ if strategy.reassign_vslot(vslot, True):
+ was_sent = True
break
else:
- for vslot in vslots:
- if do_reassign(vslot, True):
- break
- else:
- continue
- break
- else:
- common.print_status(args, success=True, error_reason='')
+ continue
+ break
+ else:
+ common.print_if_verbose(args, "No vdisks were sent for relocation", file=sys.stdout)
+ common.print_status(args, success=True, error_reason='')
+ return True
+ if not was_sent:
+ common.print_if_not_quiet(args, 'No relocation requests were successful, waiting..', sys.stdout)
+ time.sleep(Constants.WAITING_TIME)
+ elif Constants.TIME_BERWEEN_REASSIGNINGS > 0:
+ time.sleep(Constants.TIME_BERWEEN_REASSIGNINGS)
+ return None
+
+
+def do(args):
+ Constants.apply_args(args)
+ if args.sort_by == 'slots':
+ strategy_factory = VSlotBalancingStrategy
+ elif args.sort_by == 'space_ratio':
+ strategy_factory = SpaceRatioBalancingStrategy
+ elif args.sort_by == 'group_slots':
+ strategy_factory = GroupVSlotsBalancingStrategy
+ else:
+ print(f"Unknown sort by option: {args.sort_by}", file=sys.stderr)
+ sys.exit(1)
+
+ cluster_info = ClusterInfo.collect_cluster_info()
+ existing_storage_pools = set(cluster_info.storage_pool_names_map.values())
+ if args.storage_pool is not None and args.storage_pool not in existing_storage_pools:
+ print(f"Storage pool {args.storage_pool} not found in existing storage pools: {list(sorted(existing_storage_pools))}")
+ sys.exit(1)
+
+ groups_info = GroupsInfo.collect_groups_info(cluster_info)
+ if args.sort_by == 'group_slots':
+ if args.group_ids is None and args.storage_pool is None:
+ print("Group vslots balancing requires --group-ids or --storage-pool option", file=sys.stderr)
+ sys.exit(1)
+
+ if args.group_ids is not None:
+ args.group_ids = set(int(group_id) for group_id in args.group_ids)
+ elif args.storage_pool is not None:
+ args.group_ids = set(id for id, pool_name in cluster_info.group_id_to_storage_pool_name_map.items() if pool_name == args.storage_pool)
+
+ iteration_number = 1
+ while True:
+ common.print_if_not_quiet(args, f"\nStart balancing iteration {iteration_number}", file=sys.stdout)
+ if groups_info.unhealthy_groups:
+ common.print_if_verbose(args, f'Skipping vdisks from unhealthy groups: {groups_info.unhealthy_groups}', file=sys.stdout)
+
+ strategy = strategy_factory(args, cluster_info, groups_info)
+ balancing_result = balance_iteration(args, strategy, iteration_number)
+ if balancing_result is not None:
break
+ common.flush_cache()
+ cluster_info = ClusterInfo.collect_cluster_info()
+ groups_info = GroupsInfo.collect_groups_info(cluster_info)
+ iteration_number += 1
+ if balancing_result is False:
+ sys.exit(1)