summaryrefslogtreecommitdiffstats
path: root/contrib/libs/grpc/src/python/grpcio_tests/tests/stress/client.py
diff options
context:
space:
mode:
authorDevtools Arcadia <[email protected]>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <[email protected]>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /contrib/libs/grpc/src/python/grpcio_tests/tests/stress/client.py
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'contrib/libs/grpc/src/python/grpcio_tests/tests/stress/client.py')
-rw-r--r--contrib/libs/grpc/src/python/grpcio_tests/tests/stress/client.py159
1 files changed, 159 insertions, 0 deletions
diff --git a/contrib/libs/grpc/src/python/grpcio_tests/tests/stress/client.py b/contrib/libs/grpc/src/python/grpcio_tests/tests/stress/client.py
new file mode 100644
index 00000000000..01c14ba3e20
--- /dev/null
+++ b/contrib/libs/grpc/src/python/grpcio_tests/tests/stress/client.py
@@ -0,0 +1,159 @@
+# Copyright 2016 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Entry point for running stress tests."""
+
+import argparse
+from concurrent import futures
+import threading
+
+import grpc
+from six.moves import queue
+from src.proto.grpc.testing import metrics_pb2_grpc
+from src.proto.grpc.testing import test_pb2_grpc
+
+from tests.interop import methods
+from tests.interop import resources
+from tests.qps import histogram
+from tests.stress import metrics_server
+from tests.stress import test_runner
+
+
+def _args():
+ parser = argparse.ArgumentParser(
+ description='gRPC Python stress test client')
+ parser.add_argument(
+ '--server_addresses',
+ help='comma separated list of hostname:port to run servers on',
+ default='localhost:8080',
+ type=str)
+ parser.add_argument(
+ '--test_cases',
+ help='comma separated list of testcase:weighting of tests to run',
+ default='large_unary:100',
+ type=str)
+ parser.add_argument('--test_duration_secs',
+ help='number of seconds to run the stress test',
+ default=-1,
+ type=int)
+ parser.add_argument('--num_channels_per_server',
+ help='number of channels per server',
+ default=1,
+ type=int)
+ parser.add_argument('--num_stubs_per_channel',
+ help='number of stubs to create per channel',
+ default=1,
+ type=int)
+ parser.add_argument('--metrics_port',
+ help='the port to listen for metrics requests on',
+ default=8081,
+ type=int)
+ parser.add_argument(
+ '--use_test_ca',
+ help='Whether to use our fake CA. Requires --use_tls=true',
+ default=False,
+ type=bool)
+ parser.add_argument('--use_tls',
+ help='Whether to use TLS',
+ default=False,
+ type=bool)
+ parser.add_argument('--server_host_override',
+ help='the server host to which to claim to connect',
+ type=str)
+ return parser.parse_args()
+
+
+def _test_case_from_arg(test_case_arg):
+ for test_case in methods.TestCase:
+ if test_case_arg == test_case.value:
+ return test_case
+ else:
+ raise ValueError('No test case {}!'.format(test_case_arg))
+
+
+def _parse_weighted_test_cases(test_case_args):
+ weighted_test_cases = {}
+ for test_case_arg in test_case_args.split(','):
+ name, weight = test_case_arg.split(':', 1)
+ test_case = _test_case_from_arg(name)
+ weighted_test_cases[test_case] = int(weight)
+ return weighted_test_cases
+
+
+def _get_channel(target, args):
+ if args.use_tls:
+ if args.use_test_ca:
+ root_certificates = resources.test_root_certificates()
+ else:
+ root_certificates = None # will load default roots.
+ channel_credentials = grpc.ssl_channel_credentials(
+ root_certificates=root_certificates)
+ options = ((
+ 'grpc.ssl_target_name_override',
+ args.server_host_override,
+ ),)
+ channel = grpc.secure_channel(target,
+ channel_credentials,
+ options=options)
+ else:
+ channel = grpc.insecure_channel(target)
+
+ # waits for the channel to be ready before we start sending messages
+ grpc.channel_ready_future(channel).result()
+ return channel
+
+
+def run_test(args):
+ test_cases = _parse_weighted_test_cases(args.test_cases)
+ test_server_targets = args.server_addresses.split(',')
+ # Propagate any client exceptions with a queue
+ exception_queue = queue.Queue()
+ stop_event = threading.Event()
+ hist = histogram.Histogram(1, 1)
+ runners = []
+
+ server = grpc.server(futures.ThreadPoolExecutor(max_workers=25))
+ metrics_pb2_grpc.add_MetricsServiceServicer_to_server(
+ metrics_server.MetricsServer(hist), server)
+ server.add_insecure_port('[::]:{}'.format(args.metrics_port))
+ server.start()
+
+ for test_server_target in test_server_targets:
+ for _ in range(args.num_channels_per_server):
+ channel = _get_channel(test_server_target, args)
+ for _ in range(args.num_stubs_per_channel):
+ stub = test_pb2_grpc.TestServiceStub(channel)
+ runner = test_runner.TestRunner(stub, test_cases, hist,
+ exception_queue, stop_event)
+ runners.append(runner)
+
+ for runner in runners:
+ runner.start()
+ try:
+ timeout_secs = args.test_duration_secs
+ if timeout_secs < 0:
+ timeout_secs = None
+ raise exception_queue.get(block=True, timeout=timeout_secs)
+ except queue.Empty:
+ # No exceptions thrown, success
+ pass
+ finally:
+ stop_event.set()
+ for runner in runners:
+ runner.join()
+ runner = None
+ server.stop(None)
+
+
+if __name__ == '__main__':
+ run_test(_args())