aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkruall <kruall@ydb.tech>2025-07-21 10:34:08 +0300
committerGitHub <noreply@github.com>2025-07-21 10:34:08 +0300
commit544aa34bb5fd6d8844f1b2edeb1324c7292d44d2 (patch)
treef19609b677eb32dfdde1b31de4732086b5b9b331
parentd1efe7c612de3b141de5f7b887514bcf893b7772 (diff)
downloadydb-544aa34bb5fd6d8844f1b2edeb1324c7292d44d2.tar.gz
Add disconnet pile to workload (#21300)
-rw-r--r--ydb/apps/dstool/lib/common.py56
-rw-r--r--ydb/apps/dstool/lib/dstool_cmd_cluster_workload_run.py34
2 files changed, 86 insertions, 4 deletions
diff --git a/ydb/apps/dstool/lib/common.py b/ydb/apps/dstool/lib/common.py
index 6f9e8a0ab0e..7564f0a9619 100644
--- a/ydb/apps/dstool/lib/common.py
+++ b/ydb/apps/dstool/lib/common.py
@@ -373,6 +373,7 @@ def query_random_host_with_retry(retries=5, request_type=None):
explicit_host = binded.arguments.pop('explicit_host', None)
host = binded.arguments.pop('host', None)
endpoint = binded.arguments.pop('endpoint', None)
+ endpoints = binded.arguments.pop('endpoints', None)
if endpoint is not None or host is not None:
return func(*args, **kwargs)
@@ -394,6 +395,10 @@ def query_random_host_with_retry(retries=5, request_type=None):
try_index, result = retry_query_with_endpoints(send_query, [explicit_endpoint] * retries, request_type, func.__name__, retries)
return result
+ if endpoints:
+ try_index, result = retry_query_with_endpoints(send_query, endpoints, request_type, func.__name__, retries)
+ return result
+
if result is not None:
return result
@@ -448,7 +453,7 @@ def query_random_host_with_retry(retries=5, request_type=None):
@inmemcache('fetch', ['path', 'params', 'explicit_host', 'fmt'], 'cache')
@query_random_host_with_retry(request_type='http')
-def fetch(path, params={}, explicit_host=None, fmt='json', host=None, cache=True, method=None, data=None, content_type=None, accept=None, endpoint=None):
+def fetch(path, params={}, explicit_host=None, fmt='json', host=None, cache=True, method=None, data=None, content_type=None, accept=None, endpoint=None, endpoints=None):
if endpoint is None and host is not None:
endpoint = connection_params.make_endpoint_info(f'{connection_params.mon_protocol}://{host}')
if endpoint.protocol not in ('http', 'https'):
@@ -477,7 +482,7 @@ def fetch(path, params={}, explicit_host=None, fmt='json', host=None, cache=True
@query_random_host_with_retry(request_type='grpc')
-def invoke_grpc(func, *params, explicit_host=None, endpoint=None, stub_factory=kikimr_grpc.TGRpcServerStub):
+def invoke_grpc(func, *params, explicit_host=None, endpoint=None, stub_factory=kikimr_grpc.TGRpcServerStub, endpoints=None):
options = [
('grpc.max_receive_message_length', 256 << 20), # 256 MiB
]
@@ -593,6 +598,32 @@ def set_primary_pile(primary_pile_id, synchronized_piles):
invoke_grpc('UpdateClusterState', request, stub_factory=bridge_grpc_server.BridgeServiceStub)
+def disconnect_pile(pile_id):
+ request = ydb_bridge.UpdateClusterStateRequest()
+ request.updates.add().CopyFrom(ydb_bridge.PileStateUpdate(
+ pile_id=pile_id,
+ state=ydb_bridge.PileState.DISCONNECTED
+ ))
+ invoke_grpc('UpdateClusterState', request, stub_factory=bridge_grpc_server.BridgeServiceStub)
+
+
+def connect_pile(pile_id, pile_to_endpoints):
+ request = ydb_bridge.UpdateClusterStateRequest()
+ request.updates.add().CopyFrom(ydb_bridge.PileStateUpdate(
+ pile_id=pile_id,
+ state=ydb_bridge.PileState.NOT_SYNCHRONIZED,
+ ))
+ request.specific_pile_ids.append(pile_id)
+ invoke_grpc('UpdateClusterState', request, stub_factory=bridge_grpc_server.BridgeServiceStub, endpoints=pile_to_endpoints[pile_id])
+ other_pile_ids = [x for x in pile_to_endpoints.keys() if x != pile_id]
+ request.specific_pile_ids.extend(other_pile_ids)
+ request.updates.add().CopyFrom(ydb_bridge.PileStateUpdate(
+ pile_id=pile_id,
+ state=ydb_bridge.PileState.NOT_SYNCHRONIZED,
+ ))
+ invoke_grpc('UpdateClusterState', request, stub_factory=bridge_grpc_server.BridgeServiceStub, endpoints=pile_to_endpoints[pile_id])
+
+
def create_bsc_request(args):
request = kikimr_bsconfig.TConfigRequest(Rollback=args.dry_run)
@@ -871,6 +902,13 @@ def build_node_fqdn_maps(base_config):
return node_id_to_host, host_to_node_id
+def build_pile_to_node_id_map(base_config):
+ pile_to_node_id_map = defaultdict(list)
+ for node in base_config.Node:
+ pile_to_node_id_map[node.Location.BridgePileName].append(node.NodeId)
+ return pile_to_node_id_map
+
+
def build_pdisk_map(base_config):
pdisk_map = {
get_pdisk_id(pdisk): pdisk
@@ -1080,6 +1118,20 @@ def fetch_node_mon_map(nodes=None):
}
+def fetch_node_to_endpoint_map(nodes=None):
+ res = {}
+ for node_id, sysinfo in fetch_json_info('sysinfo', nodes).items():
+ grpc_port = None
+ mon_port = None
+ for ep in sysinfo.get('Endpoints', []):
+ if ep['Name'] == 'grpc':
+ grpc_port = int(ep['Address'][1:])
+ elif ep['Name'] == 'http-mon':
+ mon_port = int(ep['Address'][1:])
+ res[node_id] = EndpointInfo('grpc', sysinfo['Host'], grpc_port, mon_port)
+ return res
+
+
def get_vslots_by_vdisk_ids(base_config, vdisk_ids):
vdisk_vslot_map = {}
for v in base_config.VSlot:
diff --git a/ydb/apps/dstool/lib/dstool_cmd_cluster_workload_run.py b/ydb/apps/dstool/lib/dstool_cmd_cluster_workload_run.py
index 0c3aa80ba50..50ddfe6d5e5 100644
--- a/ydb/apps/dstool/lib/dstool_cmd_cluster_workload_run.py
+++ b/ydb/apps/dstool/lib/dstool_cmd_cluster_workload_run.py
@@ -26,6 +26,8 @@ def add_options(p):
p.add_argument('--no-fail-model-check', action='store_true', help='Do not check VDisk states before taking action')
p.add_argument('--enable-soft-switch-piles', action='store_true', help='Enable soft switch pile with PROMOTE')
p.add_argument('--enable-hard-switch-piles', action='store_true', help='Enable hard switch pile with setting PRIMARY')
+ p.add_argument('--enable-disconnect-piles', action='store_true', help='Enable disconnect pile')
+ p.add_argument('--fixed-pile-for-disconnect', type=int, help='Pile to disconnect')
def fetch_start_time_map(base_config):
@@ -77,6 +79,17 @@ def do(args):
config_retries = None
+ if args.enable_soft_switch_piles or args.enable_hard_switch_piles or args.enable_disconnect_piles:
+ base_config = common.fetch_base_config()
+ pile_name_to_node_id = common.build_pile_to_node_id_map(base_config)
+ piles_count = len(pile_name_to_node_id)
+ node_id_to_endpoints = common.fetch_node_to_endpoint_map()
+ pile_names = list(sorted(pile_name_to_node_id.keys()))
+ pile_id_to_endpoints = {
+ idx: [node_id_to_endpoints[node_id] for node_id in pile_name_to_node_id[pile_name]]
+ for idx, pile_name in enumerate(pile_names)
+ }
+
while True:
common.flush_cache()
@@ -286,6 +299,14 @@ def do(args):
print(f"Switching primary pile to {pile_id} with setting PRIMARY")
common.set_primary_pile(pile_id, [x for x in all_piles if x != pile_id])
+ def do_disconnect_pile(pile_id):
+ print(f"Disconnecting pile {pile_id}")
+ common.disconnect_pile(pile_id)
+
+ def do_connect_pile(pile_id, pile_id_to_hosts):
+ print(f"Connecting pile {pile_id}")
+ common.connect_pile(pile_id, pile_id_to_hosts)
+
################################################################################################################
now = datetime.now(timezone.utc)
@@ -371,14 +392,16 @@ def do(args):
if restarts:
possible_actions.append(('restart', (pick, restarts)))
- has_pile_operations = args.enable_soft_switch_piles or args.enable_hard_switch_piles
+ has_pile_operations = args.enable_soft_switch_piles or args.enable_hard_switch_piles or args.enable_disconnect_piles
if has_pile_operations:
piles_info = common.get_piles_info()
- piles_count = len(piles_info.per_pile_state)
+ print(piles_info)
+
primary_pile = None
synchronized_piles = []
promoted_piles = []
non_synchronized_piles = []
+ disconnected_piles = []
for idx, pile_state in enumerate(piles_info.per_pile_state):
if pile_state.state == ydb_bridge.PileState.PRIMARY:
primary_pile = idx
@@ -386,6 +409,8 @@ def do(args):
synchronized_piles.append(idx)
elif pile_state.state == ydb_bridge.PileState.PROMOTE:
promoted_piles.append(idx)
+ elif pile_state.state == ydb_bridge.PileState.DISCONNECTED:
+ disconnected_piles.append(idx)
else:
non_synchronized_piles.append(idx)
@@ -396,6 +421,11 @@ def do(args):
possible_actions.append(('soft-switch-pile', (do_soft_switch_pile, random.choice(synchronized_piles))))
if args.enable_hard_switch_piles and can_hard_switch:
possible_actions.append(('hard-switch-pile', (do_hard_switch_pile, random.choice(promoted_piles + synchronized_piles), [primary_pile] + promoted_piles + synchronized_piles)))
+ if len(disconnected_piles) > 0:
+ possible_actions.append(('connect-pile', (do_connect_pile, random.choice(disconnected_piles), pile_id_to_endpoints)))
+ if args.enable_disconnect_piles and len(synchronized_piles) > 0:
+ pile_to_disconnect = args.fixed_pile_for_disconnect if args.fixed_pile_for_disconnect is not None else random.choice([primary_pile] + synchronized_piles)
+ possible_actions.append(('disconnect-pile', (do_disconnect_pile, pile_to_disconnect)))
if not possible_actions:
common.print_if_not_quiet(args, 'Waiting for the next round...', file=sys.stdout)