aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkruall <kruall@ydb.tech>2024-11-20 12:24:56 +0300
committerGitHub <noreply@github.com>2024-11-20 12:24:56 +0300
commit2cf848f3e1004307b2b619c0263c6d7379acf7f4 (patch)
treeeaf66786eaf803213f55fddf653a3eee81391771
parent912022f5004fc384fd1996cae3f848bd1d44a28d (diff)
downloadydb-2cf848f3e1004307b2b619c0263c6d7379acf7f4.tar.gz
Improve endpoints experience in ydb-dstool (#11756)
-rw-r--r--ydb/apps/dstool/lib/common.py306
-rw-r--r--ydb/apps/dstool/lib/dstool_cmd_group_show_blob_info.py5
-rw-r--r--ydb/apps/dstool/lib/dstool_cmd_group_show_storage_efficiency.py6
-rw-r--r--ydb/apps/dstool/lib/dstool_cmd_pdisk_set.py2
-rw-r--r--ydb/apps/dstool/lib/grouptool.py45
-rwxr-xr-xydb/apps/dstool/main.py10
6 files changed, 236 insertions, 138 deletions
diff --git a/ydb/apps/dstool/lib/common.py b/ydb/apps/dstool/lib/common.py
index f16c365282..fbaa41fb96 100644
--- a/ydb/apps/dstool/lib/common.py
+++ b/ydb/apps/dstool/lib/common.py
@@ -17,11 +17,13 @@ from functools import wraps
from inspect import signature
from operator import attrgetter, itemgetter
from collections import defaultdict
+from itertools import cycle, islice
import ydb.core.protos.grpc_pb2_grpc as kikimr_grpc
import ydb.core.protos.msgbus_pb2 as kikimr_msgbus
import ydb.core.protos.blobstorage_config_pb2 as kikimr_bsconfig
import ydb.core.protos.blobstorage_base3_pb2 as kikimr_bs3
import ydb.core.protos.cms_pb2 as kikimr_cms
+from ydb.apps.dstool.lib.arg_parser import print_error_with_usage
import typing
@@ -34,22 +36,59 @@ EVirtualGroupState = kikimr_bs3.EVirtualGroupState
TGroupDecommitStatus = kikimr_bs3.TGroupDecommitStatus
+class InvalidParameterError(Exception):
+ """Exception raised for invalid command line parameter."""
+ def __init__(self, parser, parameter_name, parameter, message=""):
+ self.parser = parser
+ self.parameter = parameter
+ self.parameter_name = parameter_name
+ self.message = message
+ super().__init__(self.message)
+
+ def print(self):
+ print_error_with_usage(self.parser, f"ERROR: invalid parameter '{self.parameter_name}' with value '{self.parameter}'. {self.message}")
+
+
class EndpointInfo:
- def __init__(self, protocol: str, host: str, port: int):
+ def __init__(self, protocol: str, host: str, grpc_port: int, mon_port: int):
self.protocol = protocol
- self.port = port
+ self.grpc_port = grpc_port
+ self.mon_port = mon_port
self.host = host
+ @property
+ def full(self):
+ if self.protocol in ('http', 'https'):
+ return f'{self.protocol}://{self.host_with_mon_port}'
+ else:
+ return f'{self.protocol}://{self.host_with_grpc_port}'
+
+ @property
+ def host_with_port(self):
+ if self.protocol in ('http', 'https'):
+ return self.host_with_mon_port
+ else:
+ return self.host_with_grpc_port
+
+ @property
+ def host_with_grpc_port(self):
+ return f'{self.host}:{self.grpc_port}'
+
+ @property
+ def host_with_mon_port(self):
+ return f'{self.host}:{self.mon_port}'
+
class ConnectionParams:
- ENDPOINT_HELP = 'Default protocol is http, default port is 8765'
+ ENDPOINT_HELP = 'Endpoint is specified as PROTOCOL://HOST[:PORT]. Can be specified multiple times with different protocols.'
def __init__(self):
self.hosts = set()
self.endpoints = dict()
- self.grpc_port = None
- self.mon_port = None
- self.mon_protocol = None
+ self.grpc_port = 2135
+ self.mon_port = 8765
+ self.grpc_protocol = 'grpc'
+ self.mon_protocol = 'http'
self.token_type = None
self.token = None
self.domain = None
@@ -59,10 +98,20 @@ class ConnectionParams:
self.cafile = None
self.cadata = None
self.insecure = None
- self.http = None
+ self.parser = None
+ self.use_ip = None
+ self.http_endpoints = dict()
+ self.grpc_endpoints = dict()
+ self.args = None
+ self.printed_warning_about_not_assigned_http_protocol = False
+ self.printed_warning_about_not_assigned_grpc_protocol = False
def make_endpoint_info(self, endpoint: str):
- return EndpointInfo(*self.get_protocol_host_port(endpoint))
+ protocol, host, port = self.get_protocol_host_port(endpoint)
+ grpc_port = self.grpc_port if protocol not in ('grpc', 'grpcs') else port
+ mon_port = self.mon_port if protocol not in ('http', 'https') else port
+ endpoint_info = EndpointInfo(protocol, host, grpc_port, mon_port)
+ return endpoint_info
def get_protocol_host_port(self, endpoint):
protocol, sep, endpoint = endpoint.rpartition('://')
@@ -72,7 +121,7 @@ class ConnectionParams:
if sep == ':':
return protocol, endpoint, int(port)
else:
- return protocol, endpoint, self.mon_port
+ return protocol, endpoint, self.grpc_port if protocol in ('grpc', 'grpcs') else self.mon_port
def get_cafile_data(self):
if self.cafile is None:
@@ -96,10 +145,12 @@ class ConnectionParams:
netloc = new_netloc
return netloc
- def make_url(self, host, path, params):
- endpoint_info = self.endpoints[host] if host in self.endpoints else self.make_endpoint_info(host)
- netloc = self.get_netloc(endpoint_info.host, endpoint_info.port)
- return urllib.parse.urlunsplit((endpoint_info.protocol, netloc, path, urllib.parse.urlencode(params), ''))
+ def make_url(self, endpoint, path, params):
+ if self.use_ip:
+ location = self.get_netloc(endpoint.host, endpoint.port)
+ else:
+ location = endpoint.host_with_port
+ return urllib.parse.urlunsplit((endpoint.protocol, location, path, urllib.parse.urlencode(params), ''))
def parse_token(self, token_file):
if token_file:
@@ -123,22 +174,30 @@ class ConnectionParams:
self.token_type = 'OAuth'
def apply_args(self, args, with_localhost=True):
+ self.args = args
self.grpc_port = args.grpc_port
self.mon_port = args.mon_port
- self.mon_protocol = args.mon_protocol
+ protocols = defaultdict(int)
if args.endpoint:
for endpoint in args.endpoint:
endpoint_info = self.make_endpoint_info(endpoint)
- if self.mon_protocol is None:
- self.mon_protocol = endpoint_info.protocol
- host_with_port = '{0}:{1}'.format(endpoint_info.host, endpoint_info.port)
+ if endpoint_info.protocol not in ('http', 'https', 'grpc', 'grpcs'):
+ raise InvalidParameterError(self.parser, '--endpoint', endpoint, 'Invalid protocol specified for endpoint')
+ protocols[endpoint_info.protocol] += 1
+ host_with_port = endpoint_info.host_with_port
self.hosts.add(endpoint_info.host)
self.endpoints[endpoint_info.host] = endpoint_info
self.endpoints[host_with_port] = endpoint_info
+ if endpoint_info.protocol in ('http', 'https'):
+ self.http_endpoints[host_with_port] = endpoint_info
+ else:
+ self.grpc_endpoints[host_with_port] = endpoint_info
- if self.mon_protocol is None:
- self.mon_protocol = 'http'
+ if 'grpc' not in protocols and 'grpcs' in protocols:
+ self.grpc_protocol = 'grpcs'
+ if 'http' not in protocols and 'https' in protocols:
+ self.mon_protocol = 'https'
self.parse_token(args.token_file)
self.domain = 1
@@ -147,9 +206,9 @@ class ConnectionParams:
self.http_timeout = args.http_timeout
self.cafile = args.cafile
self.insecure = args.insecure
- self.http = args.http
def add_host_access_options(self, parser, with_endpoint=True):
+ self.parser = parser
parser.add_argument('--verbose', '-v', action='store_true', help='Be verbose during operation')
parser.add_argument('--quiet', '-q', action='store_true', help="Don't show non-vital messages")
g = parser.add_argument_group('Server access options')
@@ -157,12 +216,11 @@ class ConnectionParams:
g.add_argument('--endpoint', '-e', metavar='[PROTOCOL://]HOST[:PORT]', type=str, required=True, action='append', help=ConnectionParams.ENDPOINT_HELP)
g.add_argument('--grpc-port', type=int, default=2135, metavar='PORT', help='GRPC port to use for procedure invocation')
g.add_argument('--mon-port', type=int, default=8765, metavar='PORT', help='HTTP monitoring port for viewer JSON access')
- g.add_argument('--mon-protocol', type=str, metavar='PROTOCOL', choices=('http', 'https'), help='HTTP monitoring protocol for viewer JSON access')
g.add_argument('--token-file', type=FileType(encoding='ascii'), metavar='PATH', help='Path to token file')
g.add_argument('--ca-file', metavar='PATH', dest='cafile', type=str, help='Path to a file containing the PEM encoding of the server root certificates for tls connections.')
- g.add_argument('--http', action='store_true', help='Use HTTP to connect to blob storage controller instead of GRPC')
g.add_argument('--http-timeout', type=int, default=5, help='Timeout for blocking socket I/O operations during HTTP(s) queries')
g.add_argument('--insecure', action='store_true', help='Allow insecure HTTPS fetching')
+ g.add_argument('--use-ip', action='store_true', help='Use IP addresses instead of hostnames when connecting to endpoints')
connection_params = ConnectionParams()
@@ -173,10 +231,6 @@ def set_connection_params_type(connection_params_type: type):
connection_params = connection_params_type()
-def make_url(host, path, params):
- return connection_params.make_url(host, path, params)
-
-
get_pdisk_id = attrgetter('NodeId', 'PDiskId')
get_vslot_id = attrgetter('NodeId', 'PDiskId', 'VSlotId')
get_vslot_id_json = itemgetter('NodeId', 'PDiskId', 'VDiskSlotId')
@@ -257,48 +311,135 @@ class GroupSelectionError(Exception):
pass
-def query_random_host_with_retry(retries=5, explicit_host_param=None, http=False):
+def filter_good_endpoints(endpoints):
+ return [endpoint for endpoint in endpoints if endpoint.host_with_port not in bad_hosts]
+
+
+def get_random_endpoints_for_query(request_type=None, items_count=1, filter=None):
+ if request_type == 'http':
+ endpoints = connection_params.http_endpoints
+ elif request_type == 'grpc':
+ endpoints = connection_params.grpc_endpoints
+ else:
+ endpoints = connection_params.endpoints
+ endpoints = list(endpoints.values())
+ if filter:
+ endpoints = filter(endpoints)
+ random.shuffle(endpoints)
+ return endpoints[:items_count]
+
+
+def retry_query_with_endpoints(query, endpoints, request_type, query_name, max_retries=5):
+ try_index = 0
+ result = None
+ for endpoint in endpoints:
+ try:
+ result = query(endpoint)
+ break
+ except Exception as e:
+ try_index += 1
+ if isinstance(e, urllib.error.URLError):
+ bad_hosts.add(endpoint.host_with_port)
+ if not connection_params.quiet:
+ print(f'WARNING: failed to fetch data from host {endpoint.host_with_port} in {query_name}: {e}', file=sys.stderr)
+ if request_type == 'http' and try_index == max_retries:
+ print('HINT: consider trying different protocol for endpoints when experiencing massive fetch failures from different hosts', file=sys.stderr)
+ if try_index == max_retries:
+ raise ConnectionError("Can't connect to specified addresses")
+ return try_index, result
+
+
+def query_random_host_with_retry(retries=5, request_type=None):
def wrapper(func):
sig = signature(func)
@wraps(func)
def wrapped(*args, **kwargs):
- explicit_host = None
- if explicit_host_param is not None:
- explicit_host = sig.bind(*args, **kwargs).arguments.get(explicit_host_param)
+ binded = sig.bind(*args, **kwargs)
+ explicit_host = binded.arguments.pop('explicit_host', None)
+ host = binded.arguments.pop('host', None)
+ endpoint = binded.arguments.pop('endpoint', None)
+
+ if endpoint is not None or host is not None:
+ return func(*args, **kwargs)
+
+ if explicit_host is not None and explicit_host in connection_params.endpoints:
+ explicit_endpoint = connection_params.endpoints[explicit_host]
+ elif explicit_host is not None:
+ explicit_endpoint = connection_params.make_endpoint_info(f'{connection_params.mon_protocol}://{explicit_host}')
+ else:
+ explicit_endpoint = None
- allowed_hosts = {explicit_host} if explicit_host is not None else connection_params.hosts
- hosts_to_query = []
+ def send_query(endpoint):
+ return func(*args, **kwargs, endpoint=endpoint)
+ setattr(send_query, '_name', func.__name__)
try_index = 0
- while True:
- # regenerate host list if it got empty
- if not hosts_to_query:
- hosts_to_query = list(allowed_hosts - bad_hosts) or list(allowed_hosts)
- random.shuffle(hosts_to_query)
-
- host = hosts_to_query.pop()
- try:
- return func(*args, **kwargs, host=host)
- except Exception as e:
- try_index += 1
- if isinstance(e, urllib.error.URLError):
- bad_hosts.add(host)
- if not connection_params.quiet:
- print(f'WARNING: failed to fetch data from host {host} in {func.__name__}: {e}', file=sys.stderr)
- if http and try_index == retries:
- print('HINT: consider trying different protocol for endpoints when experiencing massive fetch failures from different hosts', file=sys.stderr)
- if try_index == retries:
- raise ConnectionError("Can't connect to specified addresses")
+ result = None
+ if explicit_endpoint:
+ try_index, result = retry_query_with_endpoints(send_query, [explicit_endpoint] * retries, request_type, func.__name__, retries)
+ return result
+
+ if result is not None:
+ return result
+
+ print_if_verbose(connection_params.args, 'INFO: using random hosts', file=sys.stderr)
+
+ endpoints = get_random_endpoints_for_query(request_type=request_type, items_count=retries - try_index, filter=filter_good_endpoints)
+ sub_try_index, result = retry_query_with_endpoints(send_query, endpoints, request_type, func.__name__, retries - try_index)
+ try_index += sub_try_index
+
+ if result is not None:
+ return result
+
+ if request_type == 'http' and connection_params.grpc_endpoints:
+ if not connection_params.http_endpoints and not connection_params.printed_warning_about_not_assigned_http_protocol:
+ print_if_not_quiet(
+ connection_params.args,
+ 'WARNING: endpoint for http requests is not specified, grpc endpoints will be used instead with conversion to http. '
+ 'You can specify additional endpoints with "http" or "https" protocol.',
+ file=sys.stderr)
+ connection_params.printed_warning_about_not_assigned_http_protocol = True
+ print_if_verbose(connection_params.args, 'INFO: failed with http endpoints, try to use grpc endpoints', file=sys.stderr)
+ endpoints = get_random_endpoints_for_query(request_type='grpc', items_count=retries - try_index, filter=filter_good_endpoints)
+ sub_try_index, result = retry_query_with_endpoints(send_query, endpoints, request_type, func.__name__, retries - try_index)
+ try_index += sub_try_index
+
+ if request_type == 'grpc' and connection_params.http_endpoints:
+ if not connection_params.grpc_endpoints and not connection_params.printed_warning_about_not_assigned_grpc_protocol:
+ print_if_not_quiet(
+ connection_params.args,
+ 'WARNING: endpoint for grpc requests is not specified, http endpoints will be used instead with conversion to grpc. '
+ 'You can specify additional endpoints with "grpc" or "grpcs" protocol.',
+ file=sys.stderr)
+ connection_params.printed_warning_about_not_assigned_grpc_protocol = True
+ print_if_verbose(connection_params.args, 'INFO: failed with grpc endpoints, try to use http endpoints', file=sys.stderr)
+ endpoints = get_random_endpoints_for_query(request_type='http', items_count=retries - try_index, filter=filter_good_endpoints)
+ sub_try_index, result = retry_query_with_endpoints(send_query, endpoints, request_type, func.__name__, retries - try_index)
+ try_index += sub_try_index
+
+ if result is not None:
+ return result
+
+ print_if_verbose(connection_params.args, 'INFO: failed with all endpoints, retry them', file=sys.stderr)
+
+ endpoints = get_random_endpoints_for_query(request_type=None, items_count=retries - try_index, filter=None)
+ endpoints = list(islice(cycle(endpoints), retries - try_index))
+ sub_try_index, result = retry_query_with_endpoints(lambda endpoint: func(*args, **kwargs, endpoint=endpoint), endpoints, request_type, func.__name__, retries - try_index)
+ return result
return wrapped
return wrapper
@inmemcache('fetch', ['path', 'params', 'explicit_host', 'fmt'], 'cache')
-@query_random_host_with_retry(explicit_host_param='explicit_host', http=True)
-def fetch(path, params={}, explicit_host=None, fmt='json', host=None, cache=True, method=None, data=None, content_type=None, accept=None):
- url = connection_params.make_url(host, path, params)
+@query_random_host_with_retry(request_type='http')
+def fetch(path, params={}, explicit_host=None, fmt='json', host=None, cache=True, method=None, data=None, content_type=None, accept=None, endpoint=None):
+ if endpoint is None and host is not None:
+ endpoint = connection_params.make_endpoint_info(f'{connection_params.mon_protocol}://{host}')
+ if endpoint.protocol not in ('http', 'https'):
+ endpoint = connection_params.make_endpoint_info(f'{connection_params.mon_protocol}://{endpoint.host_with_mon_port}')
+ url = connection_params.make_url(endpoint, path, params)
if connection_params.verbose:
print('INFO: fetching %s' % url, file=sys.stderr)
request = urllib.request.Request(url, data=data, method=method)
@@ -321,15 +462,15 @@ def fetch(path, params={}, explicit_host=None, fmt='json', host=None, cache=True
assert False, 'ERROR: invalid stream fmt specified: %s' % fmt
-@query_random_host_with_retry(explicit_host_param='explicit_host')
-def invoke_grpc(func, *params, explicit_host=None, host=None):
+@query_random_host_with_retry(request_type='grpc')
+def invoke_grpc(func, *params, explicit_host=None, endpoint=None):
options = [
('grpc.max_receive_message_length', 256 << 20), # 256 MiB
]
if connection_params.verbose:
p = ', '.join('<<< %s >>>' % text_format.MessageToString(param, as_one_line=True) for param in params)
- print('INFO: issuing %s(%s) @%s:%d protocol %s' % (func, p, host, connection_params.grpc_port,
- connection_params.mon_protocol), file=sys.stderr)
+ print('INFO: issuing %s(%s) @%s:%d protocol %s' % (func, p, endpoint.host, endpoint.grpc_port,
+ endpoint.protocol), file=sys.stderr)
def work(channel):
try:
@@ -343,9 +484,9 @@ def invoke_grpc(func, *params, explicit_host=None, host=None):
print('ERROR: exception %s' % e, file=sys.stderr)
raise ConnectionError("Can't connect to specified addresses by gRPC protocol")
- hostport = '%s:%d' % (host, connection_params.grpc_port)
+ hostport = endpoint.host_with_grpc_port
retval = None
- if connection_params.mon_protocol == 'grpcs':
+ if endpoint.protocol == 'grpcs':
creds = grpc.ssl_channel_credentials(connection_params.get_cafile_data())
with grpc.secure_channel(hostport, creds, options) as channel:
retval = work(channel)
@@ -355,20 +496,11 @@ def invoke_grpc(func, *params, explicit_host=None, host=None):
return retval
-def invoke_bsc_request(request):
- if connection_params.http:
- tablet_id = 72057594037932033
- data = request.SerializeToString()
- res = fetch('tablets/app', params=dict(TabletID=tablet_id, exec=1), fmt='raw', cache=False, method='POST',
- data=data, content_type='application/x-protobuf', accept='application/x-protobuf')
- m = kikimr_bsconfig.TConfigResponse()
- m.MergeFromString(res)
- return m
-
+def invoke_grpc_bsc_request(request, endpoint=None):
bs_request = kikimr_msgbus.TBlobStorageConfigRequest(Domain=connection_params.domain, Request=request)
if connection_params.token is not None:
bs_request.SecurityToken = connection_params.token
- bs_response = invoke_grpc('BlobStorageConfig', bs_request)
+ bs_response = invoke_grpc('BlobStorageConfig', bs_request, endpoint=endpoint)
if bs_response.Status != 1:
# remove security token from error message
bs_request.SecurityToken = ''
@@ -378,6 +510,24 @@ def invoke_bsc_request(request):
return bs_response.BlobStorageConfigResponse
+def invoke_http_bsc_request(request, endpoint=None):
+ tablet_id = 72057594037932033
+ data = request.SerializeToString()
+ res = fetch('tablets/app', params=dict(TabletID=tablet_id, exec=1), fmt='raw', cache=False, method='POST',
+ data=data, content_type='application/x-protobuf', accept='application/x-protobuf', endpoint=endpoint)
+ m = kikimr_bsconfig.TConfigResponse()
+ m.MergeFromString(res)
+ return m
+
+
+@query_random_host_with_retry(request_type=None)
+def invoke_bsc_request(request, explicit_host=None, endpoint=None):
+ if endpoint.protocol in ('http', 'https'):
+ return invoke_http_bsc_request(request, endpoint=endpoint)
+ else:
+ return invoke_grpc_bsc_request(request, endpoint=endpoint)
+
+
def cms_host_restart_request(user, host, reason, duration_usec, max_avail):
cms_request = kikimr_msgbus.TCmsRequest()
if connection_params.token is not None:
@@ -901,20 +1051,6 @@ def get_vslots_by_vdisk_ids(base_config, vdisk_ids):
return res
-def fetch_vdisk_status(hostname):
- res = []
- try:
- j = fetch('viewer/json/vdiskinfo', dict(enums=1, node_id=0), hostname, cache=False)
- except Exception:
- return []
- for v in j.get('VDiskStateInfo', []):
- try:
- res.append((hostname, *get_vslot_id_json(v), *get_vdisk_id_json(v['VDiskId']), v['VDiskState'], v['Replicated']))
- except KeyError:
- pass
- return res
-
-
def filter_healthy_groups(groups, node_mon_map, base_config, vslot_map):
res = {
group.GroupId: len(group.VSlotId)
diff --git a/ydb/apps/dstool/lib/dstool_cmd_group_show_blob_info.py b/ydb/apps/dstool/lib/dstool_cmd_group_show_blob_info.py
index a4ea71d65b..e39e155819 100644
--- a/ydb/apps/dstool/lib/dstool_cmd_group_show_blob_info.py
+++ b/ydb/apps/dstool/lib/dstool_cmd_group_show_blob_info.py
@@ -13,7 +13,7 @@ def add_options(p):
def process_vslot(vslot, node_mon_map, blob_id):
- page = 'vdisk/json/getblob'
+ page = 'node/{vslot.VSlotId.NodeId}/vdisk/json/getblob'
params = {
'node_id': vslot.VSlotId.NodeId,
'pdisk_id': vslot.VSlotId.PDiskId,
@@ -22,8 +22,7 @@ def process_vslot(vslot, node_mon_map, blob_id):
'to': blob_id,
'internals': 'yes'
}
- host = node_mon_map[vslot.VSlotId.NodeId]
- data = common.fetch(page, params, host, 'raw').decode('utf-8')
+ data = common.fetch(page, params, fmt='raw').decode('utf-8')
data = json.loads(data)
res = []
if 'logoblobs' in data:
diff --git a/ydb/apps/dstool/lib/dstool_cmd_group_show_storage_efficiency.py b/ydb/apps/dstool/lib/dstool_cmd_group_show_storage_efficiency.py
index 5866dfacaa..d0a34bcbf8 100644
--- a/ydb/apps/dstool/lib/dstool_cmd_group_show_storage_efficiency.py
+++ b/ydb/apps/dstool/lib/dstool_cmd_group_show_storage_efficiency.py
@@ -41,13 +41,13 @@ table_output = create_table_output()
def parse_vdisk_storage_efficiency(host, node_id, pdisk_id, vslot_id):
idx_size, inplace_size, huge_size, comp_idx_size, comp_inplace_size, comp_huge_size = 0, 0, 0, 0, 0, 0
items, comp_items = 0, 0
- page = 'actors/vdisks/vdisk%09u_%09u' % (pdisk_id, vslot_id)
+ page = 'node/%d/actors/vdisks/vdisk%09u_%09u' % (node_id, pdisk_id, vslot_id)
size_col = 'Idx / Inplaced / Huge Size'
items_col = 'Items / WInplData / WHugeData'
usage_col = 'Idx% / IdxB% / InplB% / HugeB%'
count_items = True
try:
- data = common.fetch(page, {}, host, fmt='raw').decode('utf-8')
+ data = common.fetch(page, {}, fmt='raw').decode('utf-8')
for t in re.finditer(r'<thead><tr>(.*?)</tr></thead><tbody>(.*?)</tbody>', data, re.S):
cols = [m.group(1) for m in re.finditer('<th>(.*?)</th>', t.group(1))]
if size_col not in cols or usage_col not in cols or items_col not in cols:
@@ -111,7 +111,7 @@ def parse_vdisk_storage_efficiency(host, node_id, pdisk_id, vslot_id):
huge_waste_bytes += num_chunks * (chunk_size - slot_size * num_slots_per_chunk)
huge_defrag_bytes += (used_slot_count[slot_size, num_slots_per_chunk] + num_slots_per_chunk - 1) // num_slots_per_chunk * chunk_size
except Exception as e:
- print('Failed to process VDisk %s: %s' % (page, e))
+ print('Failed to process VDisk %s from node %d: %s' % (page, node_id, e))
return None
return idx_size, inplace_size, huge_size, comp_idx_size, comp_inplace_size, comp_huge_size, items, comp_items, \
huge_useful_bytes, huge_unused_bytes, huge_waste_bytes, huge_defrag_bytes
diff --git a/ydb/apps/dstool/lib/dstool_cmd_pdisk_set.py b/ydb/apps/dstool/lib/dstool_cmd_pdisk_set.py
index 1b92a5d0af..5f6b8b51cc 100644
--- a/ydb/apps/dstool/lib/dstool_cmd_pdisk_set.py
+++ b/ydb/apps/dstool/lib/dstool_cmd_pdisk_set.py
@@ -88,7 +88,7 @@ def do(args):
}
for pdisk_id, pdisk in sorted(pdisks.items()):
try:
- for row in common.fetch('viewer/json/pdiskinfo', params, node_id_to_host[pdisk.NodeId][0]).get('PDiskStateInfo', []):
+ for row in common.fetch(f'node/{pdisk.NodeId}/viewer/json/pdiskinfo', params).get('PDiskStateInfo', []):
if 'State' in row and row['NodeId'] == pdisk.NodeId and row['PDiskId'] == pdisk.PDiskId:
pdisk_status[pdisk_id] = not row['State'].endswith('Error')
except Exception as e:
diff --git a/ydb/apps/dstool/lib/grouptool.py b/ydb/apps/dstool/lib/grouptool.py
index 36fc8f3235..ab43fbe3b8 100644
--- a/ydb/apps/dstool/lib/grouptool.py
+++ b/ydb/apps/dstool/lib/grouptool.py
@@ -303,52 +303,9 @@ def parse_vdisk_storage_from_http_api(node_id, pdisk_id, vslot_id):
return res
-def parse_vdisk_storage_legacy(host, pdisk_id, vslot_id):
- page = 'actors/vdisks/vdisk%09u_%09u' % (pdisk_id, vslot_id)
- data = common.fetch(page, dict(type='stat', dbname='LogoBlobs'), host, fmt='raw')
- res = []
- m = table_re.search(str(data))
- for row in row_re.finditer(m.group(1)):
- data = [
- strip_small(m.group(2)) if m_datatext is None else int(m_datatext.group(1))
- for m in cell_re.finditer(row.group(1))
- for m_datatext in [datatext_re.search(m.group(1))] # try to find data-text attr in <td>
- ]
- m = tabletid_re.match(data[0])
- tablet_id = int(m.group(1))
- channel = int(data[1])
- size = data[3]
- if not isinstance(size, int):
- factor = None
- if size.endswith('KiB'):
- size = size[:-3]
- factor = 1024.0
- elif size.endswith('MiB'):
- size = size[:-3]
- factor = 1024.0**2
- elif size.endswith('GiB'):
- size = size[:-3]
- factor = 1024.0**3
- elif size.endswith('TiB'):
- size = size[:-3]
- factor = 1024.0**4
- elif size.endswith('PiB'):
- size = size[:-3]
- factor = 1024.0**5
- elif size.endswith('B'):
- size = size[:-1]
- factor = 1.0
- size = int(float(size) * factor)
- res.append((tablet_id, channel, size))
- return res
-
-
def parse_vdisk_storage(host, node_id, pdisk_id, vslot_id):
try:
- if common.connection_params.http:
- return parse_vdisk_storage_from_http_api(node_id, pdisk_id, vslot_id)
- else:
- return parse_vdisk_storage_legacy(host, pdisk_id, vslot_id)
+ return parse_vdisk_storage_from_http_api(node_id, pdisk_id, vslot_id)
except Exception as e:
print('Failed to parse VDisk storage at host %s PDiskId# %d VSlotId# %d error# %s' % (host, pdisk_id, vslot_id, e), file=sys.stderr)
return None
diff --git a/ydb/apps/dstool/main.py b/ydb/apps/dstool/main.py
index 4e2075b799..0042dd4073 100755
--- a/ydb/apps/dstool/main.py
+++ b/ydb/apps/dstool/main.py
@@ -4,6 +4,8 @@ from ydb.apps.dstool.lib.arg_parser import ArgumentParser
import ydb.apps.dstool.lib.common as common
import ydb.apps.dstool.lib.commands as commands
+import sys
+
def main():
parser = ArgumentParser(description='YDB Distributed Storage Administration Tool')
@@ -15,8 +17,12 @@ def main():
subparsers = parser.add_subparsers(help='Subcommands', dest='global_command', required=True)
command_map = commands.make_command_map_by_structure(subparsers)
args = parser.parse_args()
- common.apply_args(args)
- commands.run_command(command_map, args)
+ try:
+ common.apply_args(args)
+ commands.run_command(command_map, args)
+ except common.InvalidParameterError as e:
+ e.print()
+ sys.exit(1)
if __name__ == '__main__':