diff options
author | robot-kikimr-dev <robot-kikimr-dev@yandex-team.com> | 2023-05-26 09:24:31 +0300 |
---|---|---|
committer | robot-kikimr-dev <robot-kikimr-dev@yandex-team.com> | 2023-05-26 09:24:31 +0300 |
commit | 5796cc86e89c091d6fb04d9099d756376059a02c (patch) | |
tree | 3915a029e7c307f6cbda3b7cf14c490d812e071c | |
parent | 31a1d179be9f1c6f46182d1b77b7899a78bf64c5 (diff) | |
download | ydb-5796cc86e89c091d6fb04d9099d756376059a02c.tar.gz |
YDB SDK Sync from git
-rw-r--r-- | ydb/public/sdk/python3/ydb/_grpc/grpcwrapper/ydb_topic.py | 69 | ||||
-rw-r--r-- | ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py | 53 | ||||
-rw-r--r-- | ydb/public/sdk/python3/ydb/aio/iam.py | 2 | ||||
-rw-r--r-- | ydb/public/sdk/python3/ydb/iam/auth.py | 2 | ||||
-rw-r--r-- | ydb/public/sdk/python3/ydb/issues.py | 5 | ||||
-rw-r--r-- | ydb/public/sdk/python3/ydb/topic.py | 4 | ||||
-rw-r--r-- | ydb/public/sdk/python3/ydb/ydb_version.py | 2 |
7 files changed, 100 insertions, 37 deletions
diff --git a/ydb/public/sdk/python3/ydb/_grpc/grpcwrapper/ydb_topic.py b/ydb/public/sdk/python3/ydb/_grpc/grpcwrapper/ydb_topic.py index f20b80a90d1..5b5e294a215 100644 --- a/ydb/public/sdk/python3/ydb/_grpc/grpcwrapper/ydb_topic.py +++ b/ydb/public/sdk/python3/ydb/_grpc/grpcwrapper/ydb_topic.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import datetime import enum import typing @@ -8,6 +10,7 @@ from google.protobuf.message import Message from . import ydb_topic_public_types from ... import scheme +from ... import issues # Workaround for good IDE and universal for runtime if typing.TYPE_CHECKING: @@ -588,16 +591,32 @@ class StreamReadMessage: ) @dataclass - class PartitionSessionStatusRequest: + class PartitionSessionStatusRequest(IToProto): partition_session_id: int + def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.PartitionSessionStatusRequest: + return ydb_topic_pb2.StreamReadMessage.PartitionSessionStatusRequest( + partition_session_id=self.partition_session_id + ) + @dataclass - class PartitionSessionStatusResponse: + class PartitionSessionStatusResponse(IFromProto): partition_session_id: int partition_offsets: "OffsetsRange" committed_offset: int write_time_high_watermark: float + @staticmethod + def from_proto( + msg: ydb_topic_pb2.StreamReadMessage.PartitionSessionStatusResponse, + ) -> "StreamReadMessage.PartitionSessionStatusResponse": + return StreamReadMessage.PartitionSessionStatusResponse( + partition_session_id=msg.partition_session_id, + partition_offsets=OffsetsRange.from_proto(msg.partition_offsets), + committed_offset=msg.committed_offset, + write_time_high_watermark=msg.write_time_high_watermark, + ) + @dataclass class StartPartitionSessionRequest(IFromProto): partition_session: "StreamReadMessage.PartitionSession" @@ -632,15 +651,30 @@ class StreamReadMessage: return res @dataclass - class StopPartitionSessionRequest: + class StopPartitionSessionRequest(IFromProto): partition_session_id: int graceful: bool committed_offset: int + @staticmethod + def from_proto( + msg: ydb_topic_pb2.StreamReadMessage.StopPartitionSessionRequest, + ) -> StreamReadMessage.StopPartitionSessionRequest: + return StreamReadMessage.StopPartitionSessionRequest( + partition_session_id=msg.partition_session_id, + graceful=msg.graceful, + committed_offset=msg.committed_offset, + ) + @dataclass - class StopPartitionSessionResponse: + class StopPartitionSessionResponse(IToProto): partition_session_id: int + def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.StopPartitionSessionResponse: + return ydb_topic_pb2.StreamReadMessage.StopPartitionSessionResponse( + partition_session_id=self.partition_session_id, + ) + @dataclass class FromClient(IToProto): client_message: "ReaderMessagesFromClientToServer" @@ -660,6 +694,10 @@ class StreamReadMessage: res.update_token_request.CopyFrom(self.client_message.to_proto()) elif isinstance(self.client_message, StreamReadMessage.StartPartitionSessionResponse): res.start_partition_session_response.CopyFrom(self.client_message.to_proto()) + elif isinstance(self.client_message, StreamReadMessage.StopPartitionSessionResponse): + res.stop_partition_session_response.CopyFrom(self.client_message.to_proto()) + elif isinstance(self.client_message, StreamReadMessage.PartitionSessionStatusRequest): + res.start_partition_session_response.CopyFrom(self.client_message.to_proto()) else: raise NotImplementedError("Unknown message type: %s" % type(self.client_message)) return res @@ -694,7 +732,14 @@ class StreamReadMessage: return StreamReadMessage.FromServer( server_status=server_status, server_message=StreamReadMessage.StartPartitionSessionRequest.from_proto( - msg.start_partition_session_request + msg.start_partition_session_request, + ), + ) + elif mess_type == "stop_partition_session_request": + return StreamReadMessage.FromServer( + server_status=server_status, + server_message=StreamReadMessage.StopPartitionSessionRequest.from_proto( + msg.stop_partition_session_request ), ) elif mess_type == "update_token_response": @@ -702,9 +747,17 @@ class StreamReadMessage: server_status=server_status, server_message=UpdateTokenResponse.from_proto(msg.update_token_response), ) - - # todo replace exception to log - raise NotImplementedError() + elif mess_type == "partition_session_status_response": + return StreamReadMessage.FromServer( + server_status=server_status, + server_message=StreamReadMessage.PartitionSessionStatusResponse.from_proto( + msg.partition_session_status_response + ), + ) + else: + raise issues.UnexpectedGrpcMessage( + "Unexpected message while parse ReaderMessagesFromServerToClient: '%s'" % mess_type + ) ReaderMessagesFromClientToServer = Union[ diff --git a/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py index 539d6831cac..ebe7bd6b46c 100644 --- a/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py @@ -26,6 +26,9 @@ from .._grpc.grpcwrapper.ydb_topic import ( Codec, ) from .._errors import check_retriable_error +import logging + +logger = logging.getLogger(__name__) class TopicReaderError(YdbError): @@ -146,7 +149,6 @@ class ReaderReconnector: def __init__(self, driver: Driver, settings: topic_reader.PublicReaderSettings): self._id = self._static_reader_reconnector_counter.inc_and_get() - self._settings = settings self._driver = driver self._background_tasks = set() @@ -395,39 +397,42 @@ class ReaderStream: ) ) while True: - message = await self._stream.receive() # type: StreamReadMessage.FromServer - _process_response(message.server_status) + try: + message = await self._stream.receive() # type: StreamReadMessage.FromServer + _process_response(message.server_status) - if isinstance(message.server_message, StreamReadMessage.ReadResponse): - self._on_read_response(message.server_message) + if isinstance(message.server_message, StreamReadMessage.ReadResponse): + self._on_read_response(message.server_message) - elif isinstance(message.server_message, StreamReadMessage.CommitOffsetResponse): - self._on_commit_response(message.server_message) + elif isinstance(message.server_message, StreamReadMessage.CommitOffsetResponse): + self._on_commit_response(message.server_message) - elif isinstance( - message.server_message, - StreamReadMessage.StartPartitionSessionRequest, - ): - self._on_start_partition_session(message.server_message) + elif isinstance( + message.server_message, + StreamReadMessage.StartPartitionSessionRequest, + ): + self._on_start_partition_session(message.server_message) - elif isinstance( - message.server_message, - StreamReadMessage.StopPartitionSessionRequest, - ): - self._on_partition_session_stop(message.server_message) + elif isinstance( + message.server_message, + StreamReadMessage.StopPartitionSessionRequest, + ): + self._on_partition_session_stop(message.server_message) - elif isinstance(message.server_message, UpdateTokenResponse): - self._update_token_event.set() + elif isinstance(message.server_message, UpdateTokenResponse): + self._update_token_event.set() - else: - raise NotImplementedError( - "Unexpected type of StreamReadMessage.FromServer message: %s" % message.server_message - ) + else: + raise issues.UnexpectedGrpcMessage( + "Unexpected message in _read_messages_loop: %s" % type(message.server_message) + ) + except issues.UnexpectedGrpcMessage as e: + logger.exception("unexpected message in stream reader: %s" % e) self._state_changed.set() except Exception as e: self._set_first_error(e) - raise + return async def _update_token_loop(self): while True: diff --git a/ydb/public/sdk/python3/ydb/aio/iam.py b/ydb/public/sdk/python3/ydb/aio/iam.py index 64490db9bbd..eab8faffe05 100644 --- a/ydb/public/sdk/python3/ydb/aio/iam.py +++ b/ydb/public/sdk/python3/ydb/aio/iam.py @@ -26,7 +26,7 @@ except ImportError: class TokenServiceCredentials(AbstractExpiringTokenCredentials): def __init__(self, iam_endpoint=None, iam_channel_credentials=None): super(TokenServiceCredentials, self).__init__() - assert iam_token_service_pb2_grpc is not None, "run pip install==ydb[yc] to use service account credentials" + assert iam_token_service_pb2_grpc is not None, 'run pip install "ydb[yc]" to use service account credentials' self._get_token_request_timeout = 10 self._iam_endpoint = "iam.api.cloud.yandex.net:443" if iam_endpoint is None else iam_endpoint self._iam_channel_credentials = {} if iam_channel_credentials is None else iam_channel_credentials diff --git a/ydb/public/sdk/python3/ydb/iam/auth.py b/ydb/public/sdk/python3/ydb/iam/auth.py index 2623818d774..82e7c9f6c8e 100644 --- a/ydb/public/sdk/python3/ydb/iam/auth.py +++ b/ydb/public/sdk/python3/ydb/iam/auth.py @@ -45,7 +45,7 @@ def get_jwt(account_id, access_key_id, private_key, jwt_expiration_timeout): class TokenServiceCredentials(credentials.AbstractExpiringTokenCredentials): def __init__(self, iam_endpoint=None, iam_channel_credentials=None, tracer=None): super(TokenServiceCredentials, self).__init__(tracer) - assert iam_token_service_pb2_grpc is not None, "run pip install==ydb[yc] to use service account credentials" + assert iam_token_service_pb2_grpc is not None, 'run pip install "ydb[yc]" to use service account credentials' self._get_token_request_timeout = 10 self._iam_token_service_pb2 = iam_token_service_pb2 self._iam_token_service_pb2_grpc = iam_token_service_pb2_grpc diff --git a/ydb/public/sdk/python3/ydb/issues.py b/ydb/public/sdk/python3/ydb/issues.py index f15c475cc64..a489d4e0662 100644 --- a/ydb/public/sdk/python3/ydb/issues.py +++ b/ydb/public/sdk/python3/ydb/issues.py @@ -156,6 +156,11 @@ class SessionPoolEmpty(Error, queue.Empty): status = StatusCode.SESSION_POOL_EMPTY +class UnexpectedGrpcMessage(Error): + def __init__(self, message: str): + super().__init__(message) + + def _format_issues(issues): if not issues: return "" diff --git a/ydb/public/sdk/python3/ydb/topic.py b/ydb/public/sdk/python3/ydb/topic.py index abf93903b7e..190f532908a 100644 --- a/ydb/public/sdk/python3/ydb/topic.py +++ b/ydb/public/sdk/python3/ydb/topic.py @@ -168,7 +168,7 @@ class TopicClientAsyncIO: if not decoder_executor: decoder_executor = self._executor - args = locals() + args = locals().copy() del args["self"] settings = TopicReaderSettings(**args) @@ -188,7 +188,7 @@ class TopicClientAsyncIO: encoders: Optional[Mapping[_ydb_topic_public_types.PublicCodec, Callable[[bytes], bytes]]] = None, encoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool ) -> TopicWriterAsyncIO: - args = locals() + args = locals().copy() del args["self"] settings = TopicWriterSettings(**args) diff --git a/ydb/public/sdk/python3/ydb/ydb_version.py b/ydb/public/sdk/python3/ydb/ydb_version.py index 5a43db3f46d..63ab73ab853 100644 --- a/ydb/public/sdk/python3/ydb/ydb_version.py +++ b/ydb/public/sdk/python3/ydb/ydb_version.py @@ -1 +1 @@ -VERSION = "3.3.3" +VERSION = "3.3.4" |