diff options
author | kruall <kruall@ydb.tech> | 2024-11-20 12:24:56 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-11-20 12:24:56 +0300 |
commit | 2cf848f3e1004307b2b619c0263c6d7379acf7f4 (patch) | |
tree | eaf66786eaf803213f55fddf653a3eee81391771 | |
parent | 912022f5004fc384fd1996cae3f848bd1d44a28d (diff) | |
download | ydb-2cf848f3e1004307b2b619c0263c6d7379acf7f4.tar.gz |
Improve endpoints experience in ydb-dstool (#11756)
-rw-r--r-- | ydb/apps/dstool/lib/common.py | 306 | ||||
-rw-r--r-- | ydb/apps/dstool/lib/dstool_cmd_group_show_blob_info.py | 5 | ||||
-rw-r--r-- | ydb/apps/dstool/lib/dstool_cmd_group_show_storage_efficiency.py | 6 | ||||
-rw-r--r-- | ydb/apps/dstool/lib/dstool_cmd_pdisk_set.py | 2 | ||||
-rw-r--r-- | ydb/apps/dstool/lib/grouptool.py | 45 | ||||
-rwxr-xr-x | ydb/apps/dstool/main.py | 10 |
6 files changed, 236 insertions, 138 deletions
diff --git a/ydb/apps/dstool/lib/common.py b/ydb/apps/dstool/lib/common.py index f16c365282..fbaa41fb96 100644 --- a/ydb/apps/dstool/lib/common.py +++ b/ydb/apps/dstool/lib/common.py @@ -17,11 +17,13 @@ from functools import wraps from inspect import signature from operator import attrgetter, itemgetter from collections import defaultdict +from itertools import cycle, islice import ydb.core.protos.grpc_pb2_grpc as kikimr_grpc import ydb.core.protos.msgbus_pb2 as kikimr_msgbus import ydb.core.protos.blobstorage_config_pb2 as kikimr_bsconfig import ydb.core.protos.blobstorage_base3_pb2 as kikimr_bs3 import ydb.core.protos.cms_pb2 as kikimr_cms +from ydb.apps.dstool.lib.arg_parser import print_error_with_usage import typing @@ -34,22 +36,59 @@ EVirtualGroupState = kikimr_bs3.EVirtualGroupState TGroupDecommitStatus = kikimr_bs3.TGroupDecommitStatus +class InvalidParameterError(Exception): + """Exception raised for invalid command line parameter.""" + def __init__(self, parser, parameter_name, parameter, message=""): + self.parser = parser + self.parameter = parameter + self.parameter_name = parameter_name + self.message = message + super().__init__(self.message) + + def print(self): + print_error_with_usage(self.parser, f"ERROR: invalid parameter '{self.parameter_name}' with value '{self.parameter}'. {self.message}") + + class EndpointInfo: - def __init__(self, protocol: str, host: str, port: int): + def __init__(self, protocol: str, host: str, grpc_port: int, mon_port: int): self.protocol = protocol - self.port = port + self.grpc_port = grpc_port + self.mon_port = mon_port self.host = host + @property + def full(self): + if self.protocol in ('http', 'https'): + return f'{self.protocol}://{self.host_with_mon_port}' + else: + return f'{self.protocol}://{self.host_with_grpc_port}' + + @property + def host_with_port(self): + if self.protocol in ('http', 'https'): + return self.host_with_mon_port + else: + return self.host_with_grpc_port + + @property + def host_with_grpc_port(self): + return f'{self.host}:{self.grpc_port}' + + @property + def host_with_mon_port(self): + return f'{self.host}:{self.mon_port}' + class ConnectionParams: - ENDPOINT_HELP = 'Default protocol is http, default port is 8765' + ENDPOINT_HELP = 'Endpoint is specified as PROTOCOL://HOST[:PORT]. Can be specified multiple times with different protocols.' def __init__(self): self.hosts = set() self.endpoints = dict() - self.grpc_port = None - self.mon_port = None - self.mon_protocol = None + self.grpc_port = 2135 + self.mon_port = 8765 + self.grpc_protocol = 'grpc' + self.mon_protocol = 'http' self.token_type = None self.token = None self.domain = None @@ -59,10 +98,20 @@ class ConnectionParams: self.cafile = None self.cadata = None self.insecure = None - self.http = None + self.parser = None + self.use_ip = None + self.http_endpoints = dict() + self.grpc_endpoints = dict() + self.args = None + self.printed_warning_about_not_assigned_http_protocol = False + self.printed_warning_about_not_assigned_grpc_protocol = False def make_endpoint_info(self, endpoint: str): - return EndpointInfo(*self.get_protocol_host_port(endpoint)) + protocol, host, port = self.get_protocol_host_port(endpoint) + grpc_port = self.grpc_port if protocol not in ('grpc', 'grpcs') else port + mon_port = self.mon_port if protocol not in ('http', 'https') else port + endpoint_info = EndpointInfo(protocol, host, grpc_port, mon_port) + return endpoint_info def get_protocol_host_port(self, endpoint): protocol, sep, endpoint = endpoint.rpartition('://') @@ -72,7 +121,7 @@ class ConnectionParams: if sep == ':': return protocol, endpoint, int(port) else: - return protocol, endpoint, self.mon_port + return protocol, endpoint, self.grpc_port if protocol in ('grpc', 'grpcs') else self.mon_port def get_cafile_data(self): if self.cafile is None: @@ -96,10 +145,12 @@ class ConnectionParams: netloc = new_netloc return netloc - def make_url(self, host, path, params): - endpoint_info = self.endpoints[host] if host in self.endpoints else self.make_endpoint_info(host) - netloc = self.get_netloc(endpoint_info.host, endpoint_info.port) - return urllib.parse.urlunsplit((endpoint_info.protocol, netloc, path, urllib.parse.urlencode(params), '')) + def make_url(self, endpoint, path, params): + if self.use_ip: + location = self.get_netloc(endpoint.host, endpoint.port) + else: + location = endpoint.host_with_port + return urllib.parse.urlunsplit((endpoint.protocol, location, path, urllib.parse.urlencode(params), '')) def parse_token(self, token_file): if token_file: @@ -123,22 +174,30 @@ class ConnectionParams: self.token_type = 'OAuth' def apply_args(self, args, with_localhost=True): + self.args = args self.grpc_port = args.grpc_port self.mon_port = args.mon_port - self.mon_protocol = args.mon_protocol + protocols = defaultdict(int) if args.endpoint: for endpoint in args.endpoint: endpoint_info = self.make_endpoint_info(endpoint) - if self.mon_protocol is None: - self.mon_protocol = endpoint_info.protocol - host_with_port = '{0}:{1}'.format(endpoint_info.host, endpoint_info.port) + if endpoint_info.protocol not in ('http', 'https', 'grpc', 'grpcs'): + raise InvalidParameterError(self.parser, '--endpoint', endpoint, 'Invalid protocol specified for endpoint') + protocols[endpoint_info.protocol] += 1 + host_with_port = endpoint_info.host_with_port self.hosts.add(endpoint_info.host) self.endpoints[endpoint_info.host] = endpoint_info self.endpoints[host_with_port] = endpoint_info + if endpoint_info.protocol in ('http', 'https'): + self.http_endpoints[host_with_port] = endpoint_info + else: + self.grpc_endpoints[host_with_port] = endpoint_info - if self.mon_protocol is None: - self.mon_protocol = 'http' + if 'grpc' not in protocols and 'grpcs' in protocols: + self.grpc_protocol = 'grpcs' + if 'http' not in protocols and 'https' in protocols: + self.mon_protocol = 'https' self.parse_token(args.token_file) self.domain = 1 @@ -147,9 +206,9 @@ class ConnectionParams: self.http_timeout = args.http_timeout self.cafile = args.cafile self.insecure = args.insecure - self.http = args.http def add_host_access_options(self, parser, with_endpoint=True): + self.parser = parser parser.add_argument('--verbose', '-v', action='store_true', help='Be verbose during operation') parser.add_argument('--quiet', '-q', action='store_true', help="Don't show non-vital messages") g = parser.add_argument_group('Server access options') @@ -157,12 +216,11 @@ class ConnectionParams: g.add_argument('--endpoint', '-e', metavar='[PROTOCOL://]HOST[:PORT]', type=str, required=True, action='append', help=ConnectionParams.ENDPOINT_HELP) g.add_argument('--grpc-port', type=int, default=2135, metavar='PORT', help='GRPC port to use for procedure invocation') g.add_argument('--mon-port', type=int, default=8765, metavar='PORT', help='HTTP monitoring port for viewer JSON access') - g.add_argument('--mon-protocol', type=str, metavar='PROTOCOL', choices=('http', 'https'), help='HTTP monitoring protocol for viewer JSON access') g.add_argument('--token-file', type=FileType(encoding='ascii'), metavar='PATH', help='Path to token file') g.add_argument('--ca-file', metavar='PATH', dest='cafile', type=str, help='Path to a file containing the PEM encoding of the server root certificates for tls connections.') - g.add_argument('--http', action='store_true', help='Use HTTP to connect to blob storage controller instead of GRPC') g.add_argument('--http-timeout', type=int, default=5, help='Timeout for blocking socket I/O operations during HTTP(s) queries') g.add_argument('--insecure', action='store_true', help='Allow insecure HTTPS fetching') + g.add_argument('--use-ip', action='store_true', help='Use IP addresses instead of hostnames when connecting to endpoints') connection_params = ConnectionParams() @@ -173,10 +231,6 @@ def set_connection_params_type(connection_params_type: type): connection_params = connection_params_type() -def make_url(host, path, params): - return connection_params.make_url(host, path, params) - - get_pdisk_id = attrgetter('NodeId', 'PDiskId') get_vslot_id = attrgetter('NodeId', 'PDiskId', 'VSlotId') get_vslot_id_json = itemgetter('NodeId', 'PDiskId', 'VDiskSlotId') @@ -257,48 +311,135 @@ class GroupSelectionError(Exception): pass -def query_random_host_with_retry(retries=5, explicit_host_param=None, http=False): +def filter_good_endpoints(endpoints): + return [endpoint for endpoint in endpoints if endpoint.host_with_port not in bad_hosts] + + +def get_random_endpoints_for_query(request_type=None, items_count=1, filter=None): + if request_type == 'http': + endpoints = connection_params.http_endpoints + elif request_type == 'grpc': + endpoints = connection_params.grpc_endpoints + else: + endpoints = connection_params.endpoints + endpoints = list(endpoints.values()) + if filter: + endpoints = filter(endpoints) + random.shuffle(endpoints) + return endpoints[:items_count] + + +def retry_query_with_endpoints(query, endpoints, request_type, query_name, max_retries=5): + try_index = 0 + result = None + for endpoint in endpoints: + try: + result = query(endpoint) + break + except Exception as e: + try_index += 1 + if isinstance(e, urllib.error.URLError): + bad_hosts.add(endpoint.host_with_port) + if not connection_params.quiet: + print(f'WARNING: failed to fetch data from host {endpoint.host_with_port} in {query_name}: {e}', file=sys.stderr) + if request_type == 'http' and try_index == max_retries: + print('HINT: consider trying different protocol for endpoints when experiencing massive fetch failures from different hosts', file=sys.stderr) + if try_index == max_retries: + raise ConnectionError("Can't connect to specified addresses") + return try_index, result + + +def query_random_host_with_retry(retries=5, request_type=None): def wrapper(func): sig = signature(func) @wraps(func) def wrapped(*args, **kwargs): - explicit_host = None - if explicit_host_param is not None: - explicit_host = sig.bind(*args, **kwargs).arguments.get(explicit_host_param) + binded = sig.bind(*args, **kwargs) + explicit_host = binded.arguments.pop('explicit_host', None) + host = binded.arguments.pop('host', None) + endpoint = binded.arguments.pop('endpoint', None) + + if endpoint is not None or host is not None: + return func(*args, **kwargs) + + if explicit_host is not None and explicit_host in connection_params.endpoints: + explicit_endpoint = connection_params.endpoints[explicit_host] + elif explicit_host is not None: + explicit_endpoint = connection_params.make_endpoint_info(f'{connection_params.mon_protocol}://{explicit_host}') + else: + explicit_endpoint = None - allowed_hosts = {explicit_host} if explicit_host is not None else connection_params.hosts - hosts_to_query = [] + def send_query(endpoint): + return func(*args, **kwargs, endpoint=endpoint) + setattr(send_query, '_name', func.__name__) try_index = 0 - while True: - # regenerate host list if it got empty - if not hosts_to_query: - hosts_to_query = list(allowed_hosts - bad_hosts) or list(allowed_hosts) - random.shuffle(hosts_to_query) - - host = hosts_to_query.pop() - try: - return func(*args, **kwargs, host=host) - except Exception as e: - try_index += 1 - if isinstance(e, urllib.error.URLError): - bad_hosts.add(host) - if not connection_params.quiet: - print(f'WARNING: failed to fetch data from host {host} in {func.__name__}: {e}', file=sys.stderr) - if http and try_index == retries: - print('HINT: consider trying different protocol for endpoints when experiencing massive fetch failures from different hosts', file=sys.stderr) - if try_index == retries: - raise ConnectionError("Can't connect to specified addresses") + result = None + if explicit_endpoint: + try_index, result = retry_query_with_endpoints(send_query, [explicit_endpoint] * retries, request_type, func.__name__, retries) + return result + + if result is not None: + return result + + print_if_verbose(connection_params.args, 'INFO: using random hosts', file=sys.stderr) + + endpoints = get_random_endpoints_for_query(request_type=request_type, items_count=retries - try_index, filter=filter_good_endpoints) + sub_try_index, result = retry_query_with_endpoints(send_query, endpoints, request_type, func.__name__, retries - try_index) + try_index += sub_try_index + + if result is not None: + return result + + if request_type == 'http' and connection_params.grpc_endpoints: + if not connection_params.http_endpoints and not connection_params.printed_warning_about_not_assigned_http_protocol: + print_if_not_quiet( + connection_params.args, + 'WARNING: endpoint for http requests is not specified, grpc endpoints will be used instead with conversion to http. ' + 'You can specify additional endpoints with "http" or "https" protocol.', + file=sys.stderr) + connection_params.printed_warning_about_not_assigned_http_protocol = True + print_if_verbose(connection_params.args, 'INFO: failed with http endpoints, try to use grpc endpoints', file=sys.stderr) + endpoints = get_random_endpoints_for_query(request_type='grpc', items_count=retries - try_index, filter=filter_good_endpoints) + sub_try_index, result = retry_query_with_endpoints(send_query, endpoints, request_type, func.__name__, retries - try_index) + try_index += sub_try_index + + if request_type == 'grpc' and connection_params.http_endpoints: + if not connection_params.grpc_endpoints and not connection_params.printed_warning_about_not_assigned_grpc_protocol: + print_if_not_quiet( + connection_params.args, + 'WARNING: endpoint for grpc requests is not specified, http endpoints will be used instead with conversion to grpc. ' + 'You can specify additional endpoints with "grpc" or "grpcs" protocol.', + file=sys.stderr) + connection_params.printed_warning_about_not_assigned_grpc_protocol = True + print_if_verbose(connection_params.args, 'INFO: failed with grpc endpoints, try to use http endpoints', file=sys.stderr) + endpoints = get_random_endpoints_for_query(request_type='http', items_count=retries - try_index, filter=filter_good_endpoints) + sub_try_index, result = retry_query_with_endpoints(send_query, endpoints, request_type, func.__name__, retries - try_index) + try_index += sub_try_index + + if result is not None: + return result + + print_if_verbose(connection_params.args, 'INFO: failed with all endpoints, retry them', file=sys.stderr) + + endpoints = get_random_endpoints_for_query(request_type=None, items_count=retries - try_index, filter=None) + endpoints = list(islice(cycle(endpoints), retries - try_index)) + sub_try_index, result = retry_query_with_endpoints(lambda endpoint: func(*args, **kwargs, endpoint=endpoint), endpoints, request_type, func.__name__, retries - try_index) + return result return wrapped return wrapper @inmemcache('fetch', ['path', 'params', 'explicit_host', 'fmt'], 'cache') -@query_random_host_with_retry(explicit_host_param='explicit_host', http=True) -def fetch(path, params={}, explicit_host=None, fmt='json', host=None, cache=True, method=None, data=None, content_type=None, accept=None): - url = connection_params.make_url(host, path, params) +@query_random_host_with_retry(request_type='http') +def fetch(path, params={}, explicit_host=None, fmt='json', host=None, cache=True, method=None, data=None, content_type=None, accept=None, endpoint=None): + if endpoint is None and host is not None: + endpoint = connection_params.make_endpoint_info(f'{connection_params.mon_protocol}://{host}') + if endpoint.protocol not in ('http', 'https'): + endpoint = connection_params.make_endpoint_info(f'{connection_params.mon_protocol}://{endpoint.host_with_mon_port}') + url = connection_params.make_url(endpoint, path, params) if connection_params.verbose: print('INFO: fetching %s' % url, file=sys.stderr) request = urllib.request.Request(url, data=data, method=method) @@ -321,15 +462,15 @@ def fetch(path, params={}, explicit_host=None, fmt='json', host=None, cache=True assert False, 'ERROR: invalid stream fmt specified: %s' % fmt -@query_random_host_with_retry(explicit_host_param='explicit_host') -def invoke_grpc(func, *params, explicit_host=None, host=None): +@query_random_host_with_retry(request_type='grpc') +def invoke_grpc(func, *params, explicit_host=None, endpoint=None): options = [ ('grpc.max_receive_message_length', 256 << 20), # 256 MiB ] if connection_params.verbose: p = ', '.join('<<< %s >>>' % text_format.MessageToString(param, as_one_line=True) for param in params) - print('INFO: issuing %s(%s) @%s:%d protocol %s' % (func, p, host, connection_params.grpc_port, - connection_params.mon_protocol), file=sys.stderr) + print('INFO: issuing %s(%s) @%s:%d protocol %s' % (func, p, endpoint.host, endpoint.grpc_port, + endpoint.protocol), file=sys.stderr) def work(channel): try: @@ -343,9 +484,9 @@ def invoke_grpc(func, *params, explicit_host=None, host=None): print('ERROR: exception %s' % e, file=sys.stderr) raise ConnectionError("Can't connect to specified addresses by gRPC protocol") - hostport = '%s:%d' % (host, connection_params.grpc_port) + hostport = endpoint.host_with_grpc_port retval = None - if connection_params.mon_protocol == 'grpcs': + if endpoint.protocol == 'grpcs': creds = grpc.ssl_channel_credentials(connection_params.get_cafile_data()) with grpc.secure_channel(hostport, creds, options) as channel: retval = work(channel) @@ -355,20 +496,11 @@ def invoke_grpc(func, *params, explicit_host=None, host=None): return retval -def invoke_bsc_request(request): - if connection_params.http: - tablet_id = 72057594037932033 - data = request.SerializeToString() - res = fetch('tablets/app', params=dict(TabletID=tablet_id, exec=1), fmt='raw', cache=False, method='POST', - data=data, content_type='application/x-protobuf', accept='application/x-protobuf') - m = kikimr_bsconfig.TConfigResponse() - m.MergeFromString(res) - return m - +def invoke_grpc_bsc_request(request, endpoint=None): bs_request = kikimr_msgbus.TBlobStorageConfigRequest(Domain=connection_params.domain, Request=request) if connection_params.token is not None: bs_request.SecurityToken = connection_params.token - bs_response = invoke_grpc('BlobStorageConfig', bs_request) + bs_response = invoke_grpc('BlobStorageConfig', bs_request, endpoint=endpoint) if bs_response.Status != 1: # remove security token from error message bs_request.SecurityToken = '' @@ -378,6 +510,24 @@ def invoke_bsc_request(request): return bs_response.BlobStorageConfigResponse +def invoke_http_bsc_request(request, endpoint=None): + tablet_id = 72057594037932033 + data = request.SerializeToString() + res = fetch('tablets/app', params=dict(TabletID=tablet_id, exec=1), fmt='raw', cache=False, method='POST', + data=data, content_type='application/x-protobuf', accept='application/x-protobuf', endpoint=endpoint) + m = kikimr_bsconfig.TConfigResponse() + m.MergeFromString(res) + return m + + +@query_random_host_with_retry(request_type=None) +def invoke_bsc_request(request, explicit_host=None, endpoint=None): + if endpoint.protocol in ('http', 'https'): + return invoke_http_bsc_request(request, endpoint=endpoint) + else: + return invoke_grpc_bsc_request(request, endpoint=endpoint) + + def cms_host_restart_request(user, host, reason, duration_usec, max_avail): cms_request = kikimr_msgbus.TCmsRequest() if connection_params.token is not None: @@ -901,20 +1051,6 @@ def get_vslots_by_vdisk_ids(base_config, vdisk_ids): return res -def fetch_vdisk_status(hostname): - res = [] - try: - j = fetch('viewer/json/vdiskinfo', dict(enums=1, node_id=0), hostname, cache=False) - except Exception: - return [] - for v in j.get('VDiskStateInfo', []): - try: - res.append((hostname, *get_vslot_id_json(v), *get_vdisk_id_json(v['VDiskId']), v['VDiskState'], v['Replicated'])) - except KeyError: - pass - return res - - def filter_healthy_groups(groups, node_mon_map, base_config, vslot_map): res = { group.GroupId: len(group.VSlotId) diff --git a/ydb/apps/dstool/lib/dstool_cmd_group_show_blob_info.py b/ydb/apps/dstool/lib/dstool_cmd_group_show_blob_info.py index a4ea71d65b..e39e155819 100644 --- a/ydb/apps/dstool/lib/dstool_cmd_group_show_blob_info.py +++ b/ydb/apps/dstool/lib/dstool_cmd_group_show_blob_info.py @@ -13,7 +13,7 @@ def add_options(p): def process_vslot(vslot, node_mon_map, blob_id): - page = 'vdisk/json/getblob' + page = 'node/{vslot.VSlotId.NodeId}/vdisk/json/getblob' params = { 'node_id': vslot.VSlotId.NodeId, 'pdisk_id': vslot.VSlotId.PDiskId, @@ -22,8 +22,7 @@ def process_vslot(vslot, node_mon_map, blob_id): 'to': blob_id, 'internals': 'yes' } - host = node_mon_map[vslot.VSlotId.NodeId] - data = common.fetch(page, params, host, 'raw').decode('utf-8') + data = common.fetch(page, params, fmt='raw').decode('utf-8') data = json.loads(data) res = [] if 'logoblobs' in data: diff --git a/ydb/apps/dstool/lib/dstool_cmd_group_show_storage_efficiency.py b/ydb/apps/dstool/lib/dstool_cmd_group_show_storage_efficiency.py index 5866dfacaa..d0a34bcbf8 100644 --- a/ydb/apps/dstool/lib/dstool_cmd_group_show_storage_efficiency.py +++ b/ydb/apps/dstool/lib/dstool_cmd_group_show_storage_efficiency.py @@ -41,13 +41,13 @@ table_output = create_table_output() def parse_vdisk_storage_efficiency(host, node_id, pdisk_id, vslot_id): idx_size, inplace_size, huge_size, comp_idx_size, comp_inplace_size, comp_huge_size = 0, 0, 0, 0, 0, 0 items, comp_items = 0, 0 - page = 'actors/vdisks/vdisk%09u_%09u' % (pdisk_id, vslot_id) + page = 'node/%d/actors/vdisks/vdisk%09u_%09u' % (node_id, pdisk_id, vslot_id) size_col = 'Idx / Inplaced / Huge Size' items_col = 'Items / WInplData / WHugeData' usage_col = 'Idx% / IdxB% / InplB% / HugeB%' count_items = True try: - data = common.fetch(page, {}, host, fmt='raw').decode('utf-8') + data = common.fetch(page, {}, fmt='raw').decode('utf-8') for t in re.finditer(r'<thead><tr>(.*?)</tr></thead><tbody>(.*?)</tbody>', data, re.S): cols = [m.group(1) for m in re.finditer('<th>(.*?)</th>', t.group(1))] if size_col not in cols or usage_col not in cols or items_col not in cols: @@ -111,7 +111,7 @@ def parse_vdisk_storage_efficiency(host, node_id, pdisk_id, vslot_id): huge_waste_bytes += num_chunks * (chunk_size - slot_size * num_slots_per_chunk) huge_defrag_bytes += (used_slot_count[slot_size, num_slots_per_chunk] + num_slots_per_chunk - 1) // num_slots_per_chunk * chunk_size except Exception as e: - print('Failed to process VDisk %s: %s' % (page, e)) + print('Failed to process VDisk %s from node %d: %s' % (page, node_id, e)) return None return idx_size, inplace_size, huge_size, comp_idx_size, comp_inplace_size, comp_huge_size, items, comp_items, \ huge_useful_bytes, huge_unused_bytes, huge_waste_bytes, huge_defrag_bytes diff --git a/ydb/apps/dstool/lib/dstool_cmd_pdisk_set.py b/ydb/apps/dstool/lib/dstool_cmd_pdisk_set.py index 1b92a5d0af..5f6b8b51cc 100644 --- a/ydb/apps/dstool/lib/dstool_cmd_pdisk_set.py +++ b/ydb/apps/dstool/lib/dstool_cmd_pdisk_set.py @@ -88,7 +88,7 @@ def do(args): } for pdisk_id, pdisk in sorted(pdisks.items()): try: - for row in common.fetch('viewer/json/pdiskinfo', params, node_id_to_host[pdisk.NodeId][0]).get('PDiskStateInfo', []): + for row in common.fetch(f'node/{pdisk.NodeId}/viewer/json/pdiskinfo', params).get('PDiskStateInfo', []): if 'State' in row and row['NodeId'] == pdisk.NodeId and row['PDiskId'] == pdisk.PDiskId: pdisk_status[pdisk_id] = not row['State'].endswith('Error') except Exception as e: diff --git a/ydb/apps/dstool/lib/grouptool.py b/ydb/apps/dstool/lib/grouptool.py index 36fc8f3235..ab43fbe3b8 100644 --- a/ydb/apps/dstool/lib/grouptool.py +++ b/ydb/apps/dstool/lib/grouptool.py @@ -303,52 +303,9 @@ def parse_vdisk_storage_from_http_api(node_id, pdisk_id, vslot_id): return res -def parse_vdisk_storage_legacy(host, pdisk_id, vslot_id): - page = 'actors/vdisks/vdisk%09u_%09u' % (pdisk_id, vslot_id) - data = common.fetch(page, dict(type='stat', dbname='LogoBlobs'), host, fmt='raw') - res = [] - m = table_re.search(str(data)) - for row in row_re.finditer(m.group(1)): - data = [ - strip_small(m.group(2)) if m_datatext is None else int(m_datatext.group(1)) - for m in cell_re.finditer(row.group(1)) - for m_datatext in [datatext_re.search(m.group(1))] # try to find data-text attr in <td> - ] - m = tabletid_re.match(data[0]) - tablet_id = int(m.group(1)) - channel = int(data[1]) - size = data[3] - if not isinstance(size, int): - factor = None - if size.endswith('KiB'): - size = size[:-3] - factor = 1024.0 - elif size.endswith('MiB'): - size = size[:-3] - factor = 1024.0**2 - elif size.endswith('GiB'): - size = size[:-3] - factor = 1024.0**3 - elif size.endswith('TiB'): - size = size[:-3] - factor = 1024.0**4 - elif size.endswith('PiB'): - size = size[:-3] - factor = 1024.0**5 - elif size.endswith('B'): - size = size[:-1] - factor = 1.0 - size = int(float(size) * factor) - res.append((tablet_id, channel, size)) - return res - - def parse_vdisk_storage(host, node_id, pdisk_id, vslot_id): try: - if common.connection_params.http: - return parse_vdisk_storage_from_http_api(node_id, pdisk_id, vslot_id) - else: - return parse_vdisk_storage_legacy(host, pdisk_id, vslot_id) + return parse_vdisk_storage_from_http_api(node_id, pdisk_id, vslot_id) except Exception as e: print('Failed to parse VDisk storage at host %s PDiskId# %d VSlotId# %d error# %s' % (host, pdisk_id, vslot_id, e), file=sys.stderr) return None diff --git a/ydb/apps/dstool/main.py b/ydb/apps/dstool/main.py index 4e2075b799..0042dd4073 100755 --- a/ydb/apps/dstool/main.py +++ b/ydb/apps/dstool/main.py @@ -4,6 +4,8 @@ from ydb.apps.dstool.lib.arg_parser import ArgumentParser import ydb.apps.dstool.lib.common as common import ydb.apps.dstool.lib.commands as commands +import sys + def main(): parser = ArgumentParser(description='YDB Distributed Storage Administration Tool') @@ -15,8 +17,12 @@ def main(): subparsers = parser.add_subparsers(help='Subcommands', dest='global_command', required=True) command_map = commands.make_command_map_by_structure(subparsers) args = parser.parse_args() - common.apply_args(args) - commands.run_command(command_map, args) + try: + common.apply_args(args) + commands.run_command(command_map, args) + except common.InvalidParameterError as e: + e.print() + sys.exit(1) if __name__ == '__main__': |