summaryrefslogtreecommitdiffstats
path: root/contrib/libs/grpc/src/python/grpcio_tests/tests/interop/service.py
diff options
context:
space:
mode:
authorDevtools Arcadia <[email protected]>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <[email protected]>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /contrib/libs/grpc/src/python/grpcio_tests/tests/interop/service.py
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'contrib/libs/grpc/src/python/grpcio_tests/tests/interop/service.py')
-rw-r--r--contrib/libs/grpc/src/python/grpcio_tests/tests/interop/service.py96
1 files changed, 96 insertions, 0 deletions
diff --git a/contrib/libs/grpc/src/python/grpcio_tests/tests/interop/service.py b/contrib/libs/grpc/src/python/grpcio_tests/tests/interop/service.py
new file mode 100644
index 00000000000..08bb0c45a24
--- /dev/null
+++ b/contrib/libs/grpc/src/python/grpcio_tests/tests/interop/service.py
@@ -0,0 +1,96 @@
+# Copyright 2019 The gRPC Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""The Python implementation of the TestServicer."""
+
+import time
+
+import grpc
+
+from src.proto.grpc.testing import empty_pb2
+from src.proto.grpc.testing import messages_pb2
+from src.proto.grpc.testing import test_pb2_grpc
+
+_INITIAL_METADATA_KEY = "x-grpc-test-echo-initial"
+_TRAILING_METADATA_KEY = "x-grpc-test-echo-trailing-bin"
+_US_IN_A_SECOND = 1000 * 1000
+
+
+def _maybe_echo_metadata(servicer_context):
+ """Copies metadata from request to response if it is present."""
+ invocation_metadata = dict(servicer_context.invocation_metadata())
+ if _INITIAL_METADATA_KEY in invocation_metadata:
+ initial_metadatum = (_INITIAL_METADATA_KEY,
+ invocation_metadata[_INITIAL_METADATA_KEY])
+ servicer_context.send_initial_metadata((initial_metadatum,))
+ if _TRAILING_METADATA_KEY in invocation_metadata:
+ trailing_metadatum = (_TRAILING_METADATA_KEY,
+ invocation_metadata[_TRAILING_METADATA_KEY])
+ servicer_context.set_trailing_metadata((trailing_metadatum,))
+
+
+def _maybe_echo_status_and_message(request, servicer_context):
+ """Sets the response context code and details if the request asks for them"""
+ if request.HasField('response_status'):
+ servicer_context.set_code(request.response_status.code)
+ servicer_context.set_details(request.response_status.message)
+
+
+class TestService(test_pb2_grpc.TestServiceServicer):
+
+ def EmptyCall(self, request, context):
+ _maybe_echo_metadata(context)
+ return empty_pb2.Empty()
+
+ def UnaryCall(self, request, context):
+ _maybe_echo_metadata(context)
+ _maybe_echo_status_and_message(request, context)
+ return messages_pb2.SimpleResponse(
+ payload=messages_pb2.Payload(type=messages_pb2.COMPRESSABLE,
+ body=b'\x00' * request.response_size))
+
+ def StreamingOutputCall(self, request, context):
+ _maybe_echo_status_and_message(request, context)
+ for response_parameters in request.response_parameters:
+ if response_parameters.interval_us != 0:
+ time.sleep(response_parameters.interval_us / _US_IN_A_SECOND)
+ yield messages_pb2.StreamingOutputCallResponse(
+ payload=messages_pb2.Payload(type=request.response_type,
+ body=b'\x00' *
+ response_parameters.size))
+
+ def StreamingInputCall(self, request_iterator, context):
+ aggregate_size = 0
+ for request in request_iterator:
+ if request.payload is not None and request.payload.body:
+ aggregate_size += len(request.payload.body)
+ return messages_pb2.StreamingInputCallResponse(
+ aggregated_payload_size=aggregate_size)
+
+ def FullDuplexCall(self, request_iterator, context):
+ _maybe_echo_metadata(context)
+ for request in request_iterator:
+ _maybe_echo_status_and_message(request, context)
+ for response_parameters in request.response_parameters:
+ if response_parameters.interval_us != 0:
+ time.sleep(response_parameters.interval_us /
+ _US_IN_A_SECOND)
+ yield messages_pb2.StreamingOutputCallResponse(
+ payload=messages_pb2.Payload(type=request.payload.type,
+ body=b'\x00' *
+ response_parameters.size))
+
+ # NOTE(nathaniel): Apparently this is the same as the full-duplex call?
+ # NOTE(atash): It isn't even called in the interop spec (Oct 22 2015)...
+ def HalfDuplexCall(self, request_iterator, context):
+ return self.FullDuplexCall(request_iterator, context)