aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-kikimr-dev <robot-kikimr-dev@yandex-team.com>2023-05-26 09:24:31 +0300
committerrobot-kikimr-dev <robot-kikimr-dev@yandex-team.com>2023-05-26 09:24:31 +0300
commit5796cc86e89c091d6fb04d9099d756376059a02c (patch)
tree3915a029e7c307f6cbda3b7cf14c490d812e071c
parent31a1d179be9f1c6f46182d1b77b7899a78bf64c5 (diff)
downloadydb-5796cc86e89c091d6fb04d9099d756376059a02c.tar.gz
YDB SDK Sync from git
-rw-r--r--ydb/public/sdk/python3/ydb/_grpc/grpcwrapper/ydb_topic.py69
-rw-r--r--ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py53
-rw-r--r--ydb/public/sdk/python3/ydb/aio/iam.py2
-rw-r--r--ydb/public/sdk/python3/ydb/iam/auth.py2
-rw-r--r--ydb/public/sdk/python3/ydb/issues.py5
-rw-r--r--ydb/public/sdk/python3/ydb/topic.py4
-rw-r--r--ydb/public/sdk/python3/ydb/ydb_version.py2
7 files changed, 100 insertions, 37 deletions
diff --git a/ydb/public/sdk/python3/ydb/_grpc/grpcwrapper/ydb_topic.py b/ydb/public/sdk/python3/ydb/_grpc/grpcwrapper/ydb_topic.py
index f20b80a90d1..5b5e294a215 100644
--- a/ydb/public/sdk/python3/ydb/_grpc/grpcwrapper/ydb_topic.py
+++ b/ydb/public/sdk/python3/ydb/_grpc/grpcwrapper/ydb_topic.py
@@ -1,3 +1,5 @@
+from __future__ import annotations
+
import datetime
import enum
import typing
@@ -8,6 +10,7 @@ from google.protobuf.message import Message
from . import ydb_topic_public_types
from ... import scheme
+from ... import issues
# Workaround for good IDE and universal for runtime
if typing.TYPE_CHECKING:
@@ -588,16 +591,32 @@ class StreamReadMessage:
)
@dataclass
- class PartitionSessionStatusRequest:
+ class PartitionSessionStatusRequest(IToProto):
partition_session_id: int
+ def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.PartitionSessionStatusRequest:
+ return ydb_topic_pb2.StreamReadMessage.PartitionSessionStatusRequest(
+ partition_session_id=self.partition_session_id
+ )
+
@dataclass
- class PartitionSessionStatusResponse:
+ class PartitionSessionStatusResponse(IFromProto):
partition_session_id: int
partition_offsets: "OffsetsRange"
committed_offset: int
write_time_high_watermark: float
+ @staticmethod
+ def from_proto(
+ msg: ydb_topic_pb2.StreamReadMessage.PartitionSessionStatusResponse,
+ ) -> "StreamReadMessage.PartitionSessionStatusResponse":
+ return StreamReadMessage.PartitionSessionStatusResponse(
+ partition_session_id=msg.partition_session_id,
+ partition_offsets=OffsetsRange.from_proto(msg.partition_offsets),
+ committed_offset=msg.committed_offset,
+ write_time_high_watermark=msg.write_time_high_watermark,
+ )
+
@dataclass
class StartPartitionSessionRequest(IFromProto):
partition_session: "StreamReadMessage.PartitionSession"
@@ -632,15 +651,30 @@ class StreamReadMessage:
return res
@dataclass
- class StopPartitionSessionRequest:
+ class StopPartitionSessionRequest(IFromProto):
partition_session_id: int
graceful: bool
committed_offset: int
+ @staticmethod
+ def from_proto(
+ msg: ydb_topic_pb2.StreamReadMessage.StopPartitionSessionRequest,
+ ) -> StreamReadMessage.StopPartitionSessionRequest:
+ return StreamReadMessage.StopPartitionSessionRequest(
+ partition_session_id=msg.partition_session_id,
+ graceful=msg.graceful,
+ committed_offset=msg.committed_offset,
+ )
+
@dataclass
- class StopPartitionSessionResponse:
+ class StopPartitionSessionResponse(IToProto):
partition_session_id: int
+ def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.StopPartitionSessionResponse:
+ return ydb_topic_pb2.StreamReadMessage.StopPartitionSessionResponse(
+ partition_session_id=self.partition_session_id,
+ )
+
@dataclass
class FromClient(IToProto):
client_message: "ReaderMessagesFromClientToServer"
@@ -660,6 +694,10 @@ class StreamReadMessage:
res.update_token_request.CopyFrom(self.client_message.to_proto())
elif isinstance(self.client_message, StreamReadMessage.StartPartitionSessionResponse):
res.start_partition_session_response.CopyFrom(self.client_message.to_proto())
+ elif isinstance(self.client_message, StreamReadMessage.StopPartitionSessionResponse):
+ res.stop_partition_session_response.CopyFrom(self.client_message.to_proto())
+ elif isinstance(self.client_message, StreamReadMessage.PartitionSessionStatusRequest):
+ res.start_partition_session_response.CopyFrom(self.client_message.to_proto())
else:
raise NotImplementedError("Unknown message type: %s" % type(self.client_message))
return res
@@ -694,7 +732,14 @@ class StreamReadMessage:
return StreamReadMessage.FromServer(
server_status=server_status,
server_message=StreamReadMessage.StartPartitionSessionRequest.from_proto(
- msg.start_partition_session_request
+ msg.start_partition_session_request,
+ ),
+ )
+ elif mess_type == "stop_partition_session_request":
+ return StreamReadMessage.FromServer(
+ server_status=server_status,
+ server_message=StreamReadMessage.StopPartitionSessionRequest.from_proto(
+ msg.stop_partition_session_request
),
)
elif mess_type == "update_token_response":
@@ -702,9 +747,17 @@ class StreamReadMessage:
server_status=server_status,
server_message=UpdateTokenResponse.from_proto(msg.update_token_response),
)
-
- # todo replace exception to log
- raise NotImplementedError()
+ elif mess_type == "partition_session_status_response":
+ return StreamReadMessage.FromServer(
+ server_status=server_status,
+ server_message=StreamReadMessage.PartitionSessionStatusResponse.from_proto(
+ msg.partition_session_status_response
+ ),
+ )
+ else:
+ raise issues.UnexpectedGrpcMessage(
+ "Unexpected message while parse ReaderMessagesFromServerToClient: '%s'" % mess_type
+ )
ReaderMessagesFromClientToServer = Union[
diff --git a/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py
index 539d6831cac..ebe7bd6b46c 100644
--- a/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py
+++ b/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py
@@ -26,6 +26,9 @@ from .._grpc.grpcwrapper.ydb_topic import (
Codec,
)
from .._errors import check_retriable_error
+import logging
+
+logger = logging.getLogger(__name__)
class TopicReaderError(YdbError):
@@ -146,7 +149,6 @@ class ReaderReconnector:
def __init__(self, driver: Driver, settings: topic_reader.PublicReaderSettings):
self._id = self._static_reader_reconnector_counter.inc_and_get()
-
self._settings = settings
self._driver = driver
self._background_tasks = set()
@@ -395,39 +397,42 @@ class ReaderStream:
)
)
while True:
- message = await self._stream.receive() # type: StreamReadMessage.FromServer
- _process_response(message.server_status)
+ try:
+ message = await self._stream.receive() # type: StreamReadMessage.FromServer
+ _process_response(message.server_status)
- if isinstance(message.server_message, StreamReadMessage.ReadResponse):
- self._on_read_response(message.server_message)
+ if isinstance(message.server_message, StreamReadMessage.ReadResponse):
+ self._on_read_response(message.server_message)
- elif isinstance(message.server_message, StreamReadMessage.CommitOffsetResponse):
- self._on_commit_response(message.server_message)
+ elif isinstance(message.server_message, StreamReadMessage.CommitOffsetResponse):
+ self._on_commit_response(message.server_message)
- elif isinstance(
- message.server_message,
- StreamReadMessage.StartPartitionSessionRequest,
- ):
- self._on_start_partition_session(message.server_message)
+ elif isinstance(
+ message.server_message,
+ StreamReadMessage.StartPartitionSessionRequest,
+ ):
+ self._on_start_partition_session(message.server_message)
- elif isinstance(
- message.server_message,
- StreamReadMessage.StopPartitionSessionRequest,
- ):
- self._on_partition_session_stop(message.server_message)
+ elif isinstance(
+ message.server_message,
+ StreamReadMessage.StopPartitionSessionRequest,
+ ):
+ self._on_partition_session_stop(message.server_message)
- elif isinstance(message.server_message, UpdateTokenResponse):
- self._update_token_event.set()
+ elif isinstance(message.server_message, UpdateTokenResponse):
+ self._update_token_event.set()
- else:
- raise NotImplementedError(
- "Unexpected type of StreamReadMessage.FromServer message: %s" % message.server_message
- )
+ else:
+ raise issues.UnexpectedGrpcMessage(
+ "Unexpected message in _read_messages_loop: %s" % type(message.server_message)
+ )
+ except issues.UnexpectedGrpcMessage as e:
+ logger.exception("unexpected message in stream reader: %s" % e)
self._state_changed.set()
except Exception as e:
self._set_first_error(e)
- raise
+ return
async def _update_token_loop(self):
while True:
diff --git a/ydb/public/sdk/python3/ydb/aio/iam.py b/ydb/public/sdk/python3/ydb/aio/iam.py
index 64490db9bbd..eab8faffe05 100644
--- a/ydb/public/sdk/python3/ydb/aio/iam.py
+++ b/ydb/public/sdk/python3/ydb/aio/iam.py
@@ -26,7 +26,7 @@ except ImportError:
class TokenServiceCredentials(AbstractExpiringTokenCredentials):
def __init__(self, iam_endpoint=None, iam_channel_credentials=None):
super(TokenServiceCredentials, self).__init__()
- assert iam_token_service_pb2_grpc is not None, "run pip install==ydb[yc] to use service account credentials"
+ assert iam_token_service_pb2_grpc is not None, 'run pip install "ydb[yc]" to use service account credentials'
self._get_token_request_timeout = 10
self._iam_endpoint = "iam.api.cloud.yandex.net:443" if iam_endpoint is None else iam_endpoint
self._iam_channel_credentials = {} if iam_channel_credentials is None else iam_channel_credentials
diff --git a/ydb/public/sdk/python3/ydb/iam/auth.py b/ydb/public/sdk/python3/ydb/iam/auth.py
index 2623818d774..82e7c9f6c8e 100644
--- a/ydb/public/sdk/python3/ydb/iam/auth.py
+++ b/ydb/public/sdk/python3/ydb/iam/auth.py
@@ -45,7 +45,7 @@ def get_jwt(account_id, access_key_id, private_key, jwt_expiration_timeout):
class TokenServiceCredentials(credentials.AbstractExpiringTokenCredentials):
def __init__(self, iam_endpoint=None, iam_channel_credentials=None, tracer=None):
super(TokenServiceCredentials, self).__init__(tracer)
- assert iam_token_service_pb2_grpc is not None, "run pip install==ydb[yc] to use service account credentials"
+ assert iam_token_service_pb2_grpc is not None, 'run pip install "ydb[yc]" to use service account credentials'
self._get_token_request_timeout = 10
self._iam_token_service_pb2 = iam_token_service_pb2
self._iam_token_service_pb2_grpc = iam_token_service_pb2_grpc
diff --git a/ydb/public/sdk/python3/ydb/issues.py b/ydb/public/sdk/python3/ydb/issues.py
index f15c475cc64..a489d4e0662 100644
--- a/ydb/public/sdk/python3/ydb/issues.py
+++ b/ydb/public/sdk/python3/ydb/issues.py
@@ -156,6 +156,11 @@ class SessionPoolEmpty(Error, queue.Empty):
status = StatusCode.SESSION_POOL_EMPTY
+class UnexpectedGrpcMessage(Error):
+ def __init__(self, message: str):
+ super().__init__(message)
+
+
def _format_issues(issues):
if not issues:
return ""
diff --git a/ydb/public/sdk/python3/ydb/topic.py b/ydb/public/sdk/python3/ydb/topic.py
index abf93903b7e..190f532908a 100644
--- a/ydb/public/sdk/python3/ydb/topic.py
+++ b/ydb/public/sdk/python3/ydb/topic.py
@@ -168,7 +168,7 @@ class TopicClientAsyncIO:
if not decoder_executor:
decoder_executor = self._executor
- args = locals()
+ args = locals().copy()
del args["self"]
settings = TopicReaderSettings(**args)
@@ -188,7 +188,7 @@ class TopicClientAsyncIO:
encoders: Optional[Mapping[_ydb_topic_public_types.PublicCodec, Callable[[bytes], bytes]]] = None,
encoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool
) -> TopicWriterAsyncIO:
- args = locals()
+ args = locals().copy()
del args["self"]
settings = TopicWriterSettings(**args)
diff --git a/ydb/public/sdk/python3/ydb/ydb_version.py b/ydb/public/sdk/python3/ydb/ydb_version.py
index 5a43db3f46d..63ab73ab853 100644
--- a/ydb/public/sdk/python3/ydb/ydb_version.py
+++ b/ydb/public/sdk/python3/ydb/ydb_version.py
@@ -1 +1 @@
-VERSION = "3.3.3"
+VERSION = "3.3.4"