diff options
author | mzinal <zinal@ydb.tech> | 2023-07-17 16:13:00 +0300 |
---|---|---|
committer | mzinal <zinal@ydb.tech> | 2023-07-17 16:13:00 +0300 |
commit | da96ac06774420829265a7de590fad0781dbc073 (patch) | |
tree | c412aa39e95682698d845a2a8961a8d54bfbcee9 | |
parent | a8900aae714224b850fcd69326ba16381ec39b32 (diff) | |
download | ydb-da96ac06774420829265a7de590fad0781dbc073.tar.gz |
KIKIMR-18071 support GRPC with TLS in ydb-dstool
-rw-r--r-- | ydb/apps/dstool/lib/common.py | 46 |
1 files changed, 39 insertions, 7 deletions
diff --git a/ydb/apps/dstool/lib/common.py b/ydb/apps/dstool/lib/common.py index 3c7f657ef3..8898c1307b 100644 --- a/ydb/apps/dstool/lib/common.py +++ b/ydb/apps/dstool/lib/common.py @@ -42,12 +42,13 @@ class EndpointInfo: class ConnectionParams: - ENDPOINT_HELP = 'Default protocol is http, default port is 8765' + ENDPOINT_HELP = 'Default protocol is GRPC, default port is 2135' def __init__(self): self.hosts = set() self.endpoints = dict() self.grpc_port = None + self.grpc_protocol = None self.mon_port = None self.mon_protocol = None self.token = None @@ -93,21 +94,38 @@ class ConnectionParams: def apply_args(self, args, with_localhost=True): self.grpc_port = args.grpc_port + self.grpc_protocol = args.grpc_protocol self.mon_port = args.mon_port self.mon_protocol = args.mon_protocol + self.http = args.http if args.endpoint: for endpoint in args.endpoint: endpoint_info = self.make_endpoint_info(endpoint) + if self.http is None or self.http is False: + if endpoint_info.protocol is not None: + self.http = ((endpoint_info.protocol == 'http') or (endpoint_info.protocol == 'https')) + if self.mon_port is None: + self.mon_port = endpoint_info.port + if self.grpc_port is None: + self.grpc_port = endpoint_info.port if self.mon_protocol is None: self.mon_protocol = endpoint_info.protocol + if self.grpc_protocol is None: + self.grpc_protocol = endpoint_info.protocol host_with_port = '{0}:{1}'.format(endpoint_info.host, endpoint_info.port) self.hosts.add(endpoint_info.host) self.endpoints[endpoint_info.host] = endpoint_info self.endpoints[host_with_port] = endpoint_info + if self.mon_port is None: + self.mon_port = 8765 if self.mon_protocol is None: self.mon_protocol = 'http' + if self.grpc_port is None: + self.grpc_port = 2135 + if self.grpc_protocol is None: + self.grpc_protocol = 'grpc' if args.token_file: self.token = args.token_file.readline().rstrip('\r\n') @@ -128,7 +146,6 @@ class ConnectionParams: self.http_timeout = args.http_timeout self.cafile = args.cafile self.insecure = args.insecure - self.http = args.http def add_host_access_options(self, parser, with_endpoint=True): parser.add_argument('--verbose', '-v', action='store_true', help='Be verbose during operation') @@ -136,14 +153,15 @@ class ConnectionParams: g = parser.add_argument_group('Server access options') if with_endpoint: g.add_argument('--endpoint', '-e', metavar='[PROTOCOL://]HOST[:PORT]', type=str, required=True, action='append', help=ConnectionParams.ENDPOINT_HELP) - g.add_argument('--grpc-port', type=int, default=2135, metavar='PORT', help='GRPC port to use for procedure invocation') - g.add_argument('--mon-port', type=int, default=8765, metavar='PORT', help='HTTP monitoring port for viewer JSON access') - g.add_argument('--mon-protocol', type=str, metavar='PROTOCOL', choices=('http', 'https'), help='HTTP monitoring protocol for viewer JSON access') + g.add_argument('--grpc-port', type=int, metavar='PORT', help='GRPC port to use for procedure invocation') + g.add_argument('--grpc-protocol', type=str, metavar='PROTOCOL', choices=('grpc', 'grpcs'), help='GRPC vs GRPCS protocol selector') + g.add_argument('--mon-port', type=int, metavar='PORT', help='HTTP monitoring port for viewer JSON access') + g.add_argument('--mon-protocol', type=str, metavar='PROTOCOL', choices=('http', 'https'), help='HTTP vs HTTPS protocol selector') g.add_argument('--token-file', type=FileType(encoding='ascii'), metavar='PATH', help='Path to token file') g.add_argument('--ca-file', metavar='PATH', dest='cafile', type=str, help='Path to a file containing the PEM encoding of the server root certificates for tls connections.') g.add_argument('--http', action='store_true', help='Use HTTP to connect to blob storage controller instead of GRPC') g.add_argument('--http-timeout', type=int, default=5, help='Timeout for blocking socket I/O operations during HTTP(s) queries') - g.add_argument('--insecure', action='store_true', help='Allow insecure HTTPS fetching') + g.add_argument('--insecure', action='store_true', help='Allow insecure HTTPS connections (not applicable to GRPCS)') connection_params = ConnectionParams() @@ -302,12 +320,26 @@ def fetch(path, params={}, explicit_host=None, fmt='json', host=None, cache=True assert False, 'ERROR: invalid stream fmt specified: %s' % fmt +def grpc_channel_factory(host, options): + endpoint = '%s:%d' % (host, connection_params.grpc_port) + if connection_params.grpc_protocol == 'grpc': + return grpc.insecure_channel(endpoint, options) + if connection_params.grpc_protocol != 'grpcs': + raise ConnectionError("Illegal GRPC protocol name: %s" % connection_params.grpc_protocol) + if connection_params.cafile is None: + raise ConnectionError("Missing CA certificate option for GRPCS connection") + creds = None + with open(connection_params.cafile, 'rb') as f: + creds = grpc.ssl_channel_credentials(f.read()) + return grpc.secure_channel(endpoint, creds, options) + + @query_random_host_with_retry(explicit_host_param='explicit_host') def invoke_grpc(func, *params, explicit_host=None, host=None): options = [ ('grpc.max_receive_message_length', 256 << 20), # 256 MiB ] - with grpc.insecure_channel('%s:%d' % (host, connection_params.grpc_port), options) as channel: + with grpc_channel_factory(host, options) as channel: if connection_params.verbose: p = ', '.join('<<< %s >>>' % text_format.MessageToString(param, as_one_line=True) for param in params) print('INFO: issuing %s(%s) @%s:%d' % (func, p, host, connection_params.grpc_port), file=sys.stderr) |