diff options
author | kruall <kruall@ydb.tech> | 2025-07-21 10:34:08 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-07-21 10:34:08 +0300 |
commit | 544aa34bb5fd6d8844f1b2edeb1324c7292d44d2 (patch) | |
tree | f19609b677eb32dfdde1b31de4732086b5b9b331 | |
parent | d1efe7c612de3b141de5f7b887514bcf893b7772 (diff) | |
download | ydb-544aa34bb5fd6d8844f1b2edeb1324c7292d44d2.tar.gz |
Add disconnet pile to workload (#21300)
-rw-r--r-- | ydb/apps/dstool/lib/common.py | 56 | ||||
-rw-r--r-- | ydb/apps/dstool/lib/dstool_cmd_cluster_workload_run.py | 34 |
2 files changed, 86 insertions, 4 deletions
diff --git a/ydb/apps/dstool/lib/common.py b/ydb/apps/dstool/lib/common.py index 6f9e8a0ab0e..7564f0a9619 100644 --- a/ydb/apps/dstool/lib/common.py +++ b/ydb/apps/dstool/lib/common.py @@ -373,6 +373,7 @@ def query_random_host_with_retry(retries=5, request_type=None): explicit_host = binded.arguments.pop('explicit_host', None) host = binded.arguments.pop('host', None) endpoint = binded.arguments.pop('endpoint', None) + endpoints = binded.arguments.pop('endpoints', None) if endpoint is not None or host is not None: return func(*args, **kwargs) @@ -394,6 +395,10 @@ def query_random_host_with_retry(retries=5, request_type=None): try_index, result = retry_query_with_endpoints(send_query, [explicit_endpoint] * retries, request_type, func.__name__, retries) return result + if endpoints: + try_index, result = retry_query_with_endpoints(send_query, endpoints, request_type, func.__name__, retries) + return result + if result is not None: return result @@ -448,7 +453,7 @@ def query_random_host_with_retry(retries=5, request_type=None): @inmemcache('fetch', ['path', 'params', 'explicit_host', 'fmt'], 'cache') @query_random_host_with_retry(request_type='http') -def fetch(path, params={}, explicit_host=None, fmt='json', host=None, cache=True, method=None, data=None, content_type=None, accept=None, endpoint=None): +def fetch(path, params={}, explicit_host=None, fmt='json', host=None, cache=True, method=None, data=None, content_type=None, accept=None, endpoint=None, endpoints=None): if endpoint is None and host is not None: endpoint = connection_params.make_endpoint_info(f'{connection_params.mon_protocol}://{host}') if endpoint.protocol not in ('http', 'https'): @@ -477,7 +482,7 @@ def fetch(path, params={}, explicit_host=None, fmt='json', host=None, cache=True @query_random_host_with_retry(request_type='grpc') -def invoke_grpc(func, *params, explicit_host=None, endpoint=None, stub_factory=kikimr_grpc.TGRpcServerStub): +def invoke_grpc(func, *params, explicit_host=None, endpoint=None, stub_factory=kikimr_grpc.TGRpcServerStub, endpoints=None): options = [ ('grpc.max_receive_message_length', 256 << 20), # 256 MiB ] @@ -593,6 +598,32 @@ def set_primary_pile(primary_pile_id, synchronized_piles): invoke_grpc('UpdateClusterState', request, stub_factory=bridge_grpc_server.BridgeServiceStub) +def disconnect_pile(pile_id): + request = ydb_bridge.UpdateClusterStateRequest() + request.updates.add().CopyFrom(ydb_bridge.PileStateUpdate( + pile_id=pile_id, + state=ydb_bridge.PileState.DISCONNECTED + )) + invoke_grpc('UpdateClusterState', request, stub_factory=bridge_grpc_server.BridgeServiceStub) + + +def connect_pile(pile_id, pile_to_endpoints): + request = ydb_bridge.UpdateClusterStateRequest() + request.updates.add().CopyFrom(ydb_bridge.PileStateUpdate( + pile_id=pile_id, + state=ydb_bridge.PileState.NOT_SYNCHRONIZED, + )) + request.specific_pile_ids.append(pile_id) + invoke_grpc('UpdateClusterState', request, stub_factory=bridge_grpc_server.BridgeServiceStub, endpoints=pile_to_endpoints[pile_id]) + other_pile_ids = [x for x in pile_to_endpoints.keys() if x != pile_id] + request.specific_pile_ids.extend(other_pile_ids) + request.updates.add().CopyFrom(ydb_bridge.PileStateUpdate( + pile_id=pile_id, + state=ydb_bridge.PileState.NOT_SYNCHRONIZED, + )) + invoke_grpc('UpdateClusterState', request, stub_factory=bridge_grpc_server.BridgeServiceStub, endpoints=pile_to_endpoints[pile_id]) + + def create_bsc_request(args): request = kikimr_bsconfig.TConfigRequest(Rollback=args.dry_run) @@ -871,6 +902,13 @@ def build_node_fqdn_maps(base_config): return node_id_to_host, host_to_node_id +def build_pile_to_node_id_map(base_config): + pile_to_node_id_map = defaultdict(list) + for node in base_config.Node: + pile_to_node_id_map[node.Location.BridgePileName].append(node.NodeId) + return pile_to_node_id_map + + def build_pdisk_map(base_config): pdisk_map = { get_pdisk_id(pdisk): pdisk @@ -1080,6 +1118,20 @@ def fetch_node_mon_map(nodes=None): } +def fetch_node_to_endpoint_map(nodes=None): + res = {} + for node_id, sysinfo in fetch_json_info('sysinfo', nodes).items(): + grpc_port = None + mon_port = None + for ep in sysinfo.get('Endpoints', []): + if ep['Name'] == 'grpc': + grpc_port = int(ep['Address'][1:]) + elif ep['Name'] == 'http-mon': + mon_port = int(ep['Address'][1:]) + res[node_id] = EndpointInfo('grpc', sysinfo['Host'], grpc_port, mon_port) + return res + + def get_vslots_by_vdisk_ids(base_config, vdisk_ids): vdisk_vslot_map = {} for v in base_config.VSlot: diff --git a/ydb/apps/dstool/lib/dstool_cmd_cluster_workload_run.py b/ydb/apps/dstool/lib/dstool_cmd_cluster_workload_run.py index 0c3aa80ba50..50ddfe6d5e5 100644 --- a/ydb/apps/dstool/lib/dstool_cmd_cluster_workload_run.py +++ b/ydb/apps/dstool/lib/dstool_cmd_cluster_workload_run.py @@ -26,6 +26,8 @@ def add_options(p): p.add_argument('--no-fail-model-check', action='store_true', help='Do not check VDisk states before taking action') p.add_argument('--enable-soft-switch-piles', action='store_true', help='Enable soft switch pile with PROMOTE') p.add_argument('--enable-hard-switch-piles', action='store_true', help='Enable hard switch pile with setting PRIMARY') + p.add_argument('--enable-disconnect-piles', action='store_true', help='Enable disconnect pile') + p.add_argument('--fixed-pile-for-disconnect', type=int, help='Pile to disconnect') def fetch_start_time_map(base_config): @@ -77,6 +79,17 @@ def do(args): config_retries = None + if args.enable_soft_switch_piles or args.enable_hard_switch_piles or args.enable_disconnect_piles: + base_config = common.fetch_base_config() + pile_name_to_node_id = common.build_pile_to_node_id_map(base_config) + piles_count = len(pile_name_to_node_id) + node_id_to_endpoints = common.fetch_node_to_endpoint_map() + pile_names = list(sorted(pile_name_to_node_id.keys())) + pile_id_to_endpoints = { + idx: [node_id_to_endpoints[node_id] for node_id in pile_name_to_node_id[pile_name]] + for idx, pile_name in enumerate(pile_names) + } + while True: common.flush_cache() @@ -286,6 +299,14 @@ def do(args): print(f"Switching primary pile to {pile_id} with setting PRIMARY") common.set_primary_pile(pile_id, [x for x in all_piles if x != pile_id]) + def do_disconnect_pile(pile_id): + print(f"Disconnecting pile {pile_id}") + common.disconnect_pile(pile_id) + + def do_connect_pile(pile_id, pile_id_to_hosts): + print(f"Connecting pile {pile_id}") + common.connect_pile(pile_id, pile_id_to_hosts) + ################################################################################################################ now = datetime.now(timezone.utc) @@ -371,14 +392,16 @@ def do(args): if restarts: possible_actions.append(('restart', (pick, restarts))) - has_pile_operations = args.enable_soft_switch_piles or args.enable_hard_switch_piles + has_pile_operations = args.enable_soft_switch_piles or args.enable_hard_switch_piles or args.enable_disconnect_piles if has_pile_operations: piles_info = common.get_piles_info() - piles_count = len(piles_info.per_pile_state) + print(piles_info) + primary_pile = None synchronized_piles = [] promoted_piles = [] non_synchronized_piles = [] + disconnected_piles = [] for idx, pile_state in enumerate(piles_info.per_pile_state): if pile_state.state == ydb_bridge.PileState.PRIMARY: primary_pile = idx @@ -386,6 +409,8 @@ def do(args): synchronized_piles.append(idx) elif pile_state.state == ydb_bridge.PileState.PROMOTE: promoted_piles.append(idx) + elif pile_state.state == ydb_bridge.PileState.DISCONNECTED: + disconnected_piles.append(idx) else: non_synchronized_piles.append(idx) @@ -396,6 +421,11 @@ def do(args): possible_actions.append(('soft-switch-pile', (do_soft_switch_pile, random.choice(synchronized_piles)))) if args.enable_hard_switch_piles and can_hard_switch: possible_actions.append(('hard-switch-pile', (do_hard_switch_pile, random.choice(promoted_piles + synchronized_piles), [primary_pile] + promoted_piles + synchronized_piles))) + if len(disconnected_piles) > 0: + possible_actions.append(('connect-pile', (do_connect_pile, random.choice(disconnected_piles), pile_id_to_endpoints))) + if args.enable_disconnect_piles and len(synchronized_piles) > 0: + pile_to_disconnect = args.fixed_pile_for_disconnect if args.fixed_pile_for_disconnect is not None else random.choice([primary_pile] + synchronized_piles) + possible_actions.append(('disconnect-pile', (do_disconnect_pile, pile_to_disconnect))) if not possible_actions: common.print_if_not_quiet(args, 'Waiting for the next round...', file=sys.stdout) |