diff --git a/src/python/grpcio/grpc/aio/_call.py b/src/python/grpcio/grpc/aio/_call.py index 82b0d3ce5..24f209065 100644 --- a/src/python/grpcio/grpc/aio/_call.py +++ b/src/python/grpcio/grpc/aio/_call.py @@ -19,7 +19,15 @@ from functools import partial import inspect import logging import traceback -from typing import Any, AsyncIterator, Generator, Generic, Optional, Tuple +from typing import ( + Any, + AsyncIterator, + Generator, + Generic, + Optional, + Tuple, + Union, +) import grpc from grpc import _common @@ -29,6 +37,7 @@ from . import _base_call from ._metadata import Metadata from ._typing import DeserializingFunction from ._typing import DoneCallbackType +from ._typing import EOFType from ._typing import MetadatumType from ._typing import RequestIterableType from ._typing import RequestType @@ -380,7 +389,7 @@ class _StreamResponseMixin(Call): raw_response, self._response_deserializer ) - async def read(self) -> ResponseType: + async def read(self) -> Union[EOFType, ResponseType]: if self.done(): await self._raise_for_status() return cygrpc.EOF diff --git a/src/python/grpcio/grpc/aio/_interceptor.py b/src/python/grpcio/grpc/aio/_interceptor.py index e7ceb00fb..1401a08ee 100644 --- a/src/python/grpcio/grpc/aio/_interceptor.py +++ b/src/python/grpcio/grpc/aio/_interceptor.py @@ -43,6 +43,7 @@ from ._call import _RPC_HALF_CLOSED_DETAILS from ._metadata import Metadata from ._typing import DeserializingFunction from ._typing import DoneCallbackType +from ._typing import EOFType from ._typing import RequestIterableType from ._typing import RequestType from ._typing import ResponseIterableType @@ -494,12 +495,15 @@ class _InterceptedStreamResponseMixin: ) return self._response_aiter - async def read(self) -> ResponseType: + async def read(self) -> Union[EOFType, ResponseType]: if self._response_aiter is None: self._response_aiter = ( self._wait_for_interceptor_task_response_iterator() ) - return await self._response_aiter.asend(None) + try: + return await self._response_aiter.asend(None) + except StopAsyncIteration: + return cygrpc.EOF class _InterceptedStreamRequestMixin: @@ -1141,7 +1145,7 @@ class UnaryStreamCallResponseIterator( ): """UnaryStreamCall class wich uses an alternative response iterator.""" - async def read(self) -> ResponseType: + async def read(self) -> Union[EOFType, ResponseType]: # Behind the scenes everyting goes through the # async iterator. So this path should not be reached. raise NotImplementedError() @@ -1152,7 +1156,7 @@ class StreamStreamCallResponseIterator( ): """StreamStreamCall class wich uses an alternative response iterator.""" - async def read(self) -> ResponseType: + async def read(self) -> Union[EOFType, ResponseType]: # Behind the scenes everyting goes through the # async iterator. So this path should not be reached. raise NotImplementedError()