aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormzinal <zinal@ydb.tech>2023-07-17 16:13:00 +0300
committermzinal <zinal@ydb.tech>2023-07-17 16:13:00 +0300
commitda96ac06774420829265a7de590fad0781dbc073 (patch)
treec412aa39e95682698d845a2a8961a8d54bfbcee9
parenta8900aae714224b850fcd69326ba16381ec39b32 (diff)
downloadydb-da96ac06774420829265a7de590fad0781dbc073.tar.gz
KIKIMR-18071 support GRPC with TLS in ydb-dstool
-rw-r--r--ydb/apps/dstool/lib/common.py46
1 files changed, 39 insertions, 7 deletions
diff --git a/ydb/apps/dstool/lib/common.py b/ydb/apps/dstool/lib/common.py
index 3c7f657ef3..8898c1307b 100644
--- a/ydb/apps/dstool/lib/common.py
+++ b/ydb/apps/dstool/lib/common.py
@@ -42,12 +42,13 @@ class EndpointInfo:
class ConnectionParams:
- ENDPOINT_HELP = 'Default protocol is http, default port is 8765'
+ ENDPOINT_HELP = 'Default protocol is GRPC, default port is 2135'
def __init__(self):
self.hosts = set()
self.endpoints = dict()
self.grpc_port = None
+ self.grpc_protocol = None
self.mon_port = None
self.mon_protocol = None
self.token = None
@@ -93,21 +94,38 @@ class ConnectionParams:
def apply_args(self, args, with_localhost=True):
self.grpc_port = args.grpc_port
+ self.grpc_protocol = args.grpc_protocol
self.mon_port = args.mon_port
self.mon_protocol = args.mon_protocol
+ self.http = args.http
if args.endpoint:
for endpoint in args.endpoint:
endpoint_info = self.make_endpoint_info(endpoint)
+ if self.http is None or self.http is False:
+ if endpoint_info.protocol is not None:
+ self.http = ((endpoint_info.protocol == 'http') or (endpoint_info.protocol == 'https'))
+ if self.mon_port is None:
+ self.mon_port = endpoint_info.port
+ if self.grpc_port is None:
+ self.grpc_port = endpoint_info.port
if self.mon_protocol is None:
self.mon_protocol = endpoint_info.protocol
+ if self.grpc_protocol is None:
+ self.grpc_protocol = endpoint_info.protocol
host_with_port = '{0}:{1}'.format(endpoint_info.host, endpoint_info.port)
self.hosts.add(endpoint_info.host)
self.endpoints[endpoint_info.host] = endpoint_info
self.endpoints[host_with_port] = endpoint_info
+ if self.mon_port is None:
+ self.mon_port = 8765
if self.mon_protocol is None:
self.mon_protocol = 'http'
+ if self.grpc_port is None:
+ self.grpc_port = 2135
+ if self.grpc_protocol is None:
+ self.grpc_protocol = 'grpc'
if args.token_file:
self.token = args.token_file.readline().rstrip('\r\n')
@@ -128,7 +146,6 @@ class ConnectionParams:
self.http_timeout = args.http_timeout
self.cafile = args.cafile
self.insecure = args.insecure
- self.http = args.http
def add_host_access_options(self, parser, with_endpoint=True):
parser.add_argument('--verbose', '-v', action='store_true', help='Be verbose during operation')
@@ -136,14 +153,15 @@ class ConnectionParams:
g = parser.add_argument_group('Server access options')
if with_endpoint:
g.add_argument('--endpoint', '-e', metavar='[PROTOCOL://]HOST[:PORT]', type=str, required=True, action='append', help=ConnectionParams.ENDPOINT_HELP)
- g.add_argument('--grpc-port', type=int, default=2135, metavar='PORT', help='GRPC port to use for procedure invocation')
- g.add_argument('--mon-port', type=int, default=8765, metavar='PORT', help='HTTP monitoring port for viewer JSON access')
- g.add_argument('--mon-protocol', type=str, metavar='PROTOCOL', choices=('http', 'https'), help='HTTP monitoring protocol for viewer JSON access')
+ g.add_argument('--grpc-port', type=int, metavar='PORT', help='GRPC port to use for procedure invocation')
+ g.add_argument('--grpc-protocol', type=str, metavar='PROTOCOL', choices=('grpc', 'grpcs'), help='GRPC vs GRPCS protocol selector')
+ g.add_argument('--mon-port', type=int, metavar='PORT', help='HTTP monitoring port for viewer JSON access')
+ g.add_argument('--mon-protocol', type=str, metavar='PROTOCOL', choices=('http', 'https'), help='HTTP vs HTTPS protocol selector')
g.add_argument('--token-file', type=FileType(encoding='ascii'), metavar='PATH', help='Path to token file')
g.add_argument('--ca-file', metavar='PATH', dest='cafile', type=str, help='Path to a file containing the PEM encoding of the server root certificates for tls connections.')
g.add_argument('--http', action='store_true', help='Use HTTP to connect to blob storage controller instead of GRPC')
g.add_argument('--http-timeout', type=int, default=5, help='Timeout for blocking socket I/O operations during HTTP(s) queries')
- g.add_argument('--insecure', action='store_true', help='Allow insecure HTTPS fetching')
+ g.add_argument('--insecure', action='store_true', help='Allow insecure HTTPS connections (not applicable to GRPCS)')
connection_params = ConnectionParams()
@@ -302,12 +320,26 @@ def fetch(path, params={}, explicit_host=None, fmt='json', host=None, cache=True
assert False, 'ERROR: invalid stream fmt specified: %s' % fmt
+def grpc_channel_factory(host, options):
+ endpoint = '%s:%d' % (host, connection_params.grpc_port)
+ if connection_params.grpc_protocol == 'grpc':
+ return grpc.insecure_channel(endpoint, options)
+ if connection_params.grpc_protocol != 'grpcs':
+ raise ConnectionError("Illegal GRPC protocol name: %s" % connection_params.grpc_protocol)
+ if connection_params.cafile is None:
+ raise ConnectionError("Missing CA certificate option for GRPCS connection")
+ creds = None
+ with open(connection_params.cafile, 'rb') as f:
+ creds = grpc.ssl_channel_credentials(f.read())
+ return grpc.secure_channel(endpoint, creds, options)
+
+
@query_random_host_with_retry(explicit_host_param='explicit_host')
def invoke_grpc(func, *params, explicit_host=None, host=None):
options = [
('grpc.max_receive_message_length', 256 << 20), # 256 MiB
]
- with grpc.insecure_channel('%s:%d' % (host, connection_params.grpc_port), options) as channel:
+ with grpc_channel_factory(host, options) as channel:
if connection_params.verbose:
p = ', '.join('<<< %s >>>' % text_format.MessageToString(param, as_one_line=True) for param in params)
print('INFO: issuing %s(%s) @%s:%d' % (func, p, host, connection_params.grpc_port), file=sys.stderr)