1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
diff --git a/src/python/grpcio/grpc/aio/_call.py b/src/python/grpcio/grpc/aio/_call.py
index 82b0d3ce5..24f209065 100644
--- a/src/python/grpcio/grpc/aio/_call.py
+++ b/src/python/grpcio/grpc/aio/_call.py
@@ -19,7 +19,15 @@ from functools import partial
import inspect
import logging
import traceback
-from typing import Any, AsyncIterator, Generator, Generic, Optional, Tuple
+from typing import (
+ Any,
+ AsyncIterator,
+ Generator,
+ Generic,
+ Optional,
+ Tuple,
+ Union,
+)
import grpc
from grpc import _common
@@ -29,6 +37,7 @@ from . import _base_call
from ._metadata import Metadata
from ._typing import DeserializingFunction
from ._typing import DoneCallbackType
+from ._typing import EOFType
from ._typing import MetadatumType
from ._typing import RequestIterableType
from ._typing import RequestType
@@ -380,7 +389,7 @@ class _StreamResponseMixin(Call):
raw_response, self._response_deserializer
)
- async def read(self) -> ResponseType:
+ async def read(self) -> Union[EOFType, ResponseType]:
if self.done():
await self._raise_for_status()
return cygrpc.EOF
diff --git a/src/python/grpcio/grpc/aio/_interceptor.py b/src/python/grpcio/grpc/aio/_interceptor.py
index e7ceb00fb..1401a08ee 100644
--- a/src/python/grpcio/grpc/aio/_interceptor.py
+++ b/src/python/grpcio/grpc/aio/_interceptor.py
@@ -43,6 +43,7 @@ from ._call import _RPC_HALF_CLOSED_DETAILS
from ._metadata import Metadata
from ._typing import DeserializingFunction
from ._typing import DoneCallbackType
+from ._typing import EOFType
from ._typing import RequestIterableType
from ._typing import RequestType
from ._typing import ResponseIterableType
@@ -494,12 +495,15 @@ class _InterceptedStreamResponseMixin:
)
return self._response_aiter
- async def read(self) -> ResponseType:
+ async def read(self) -> Union[EOFType, ResponseType]:
if self._response_aiter is None:
self._response_aiter = (
self._wait_for_interceptor_task_response_iterator()
)
- return await self._response_aiter.asend(None)
+ try:
+ return await self._response_aiter.asend(None)
+ except StopAsyncIteration:
+ return cygrpc.EOF
class _InterceptedStreamRequestMixin:
@@ -1141,7 +1145,7 @@ class UnaryStreamCallResponseIterator(
):
"""UnaryStreamCall class wich uses an alternative response iterator."""
- async def read(self) -> ResponseType:
+ async def read(self) -> Union[EOFType, ResponseType]:
# Behind the scenes everyting goes through the
# async iterator. So this path should not be reached.
raise NotImplementedError()
@@ -1152,7 +1156,7 @@ class StreamStreamCallResponseIterator(
):
"""StreamStreamCall class wich uses an alternative response iterator."""
- async def read(self) -> ResponseType:
+ async def read(self) -> Union[EOFType, ResponseType]:
# Behind the scenes everyting goes through the
# async iterator. So this path should not be reached.
raise NotImplementedError()
|