diff options
author | robot-kikimr-dev <[email protected]> | 2023-06-13 17:25:34 +0300 |
---|---|---|
committer | robot-kikimr-dev <[email protected]> | 2023-06-13 17:25:34 +0300 |
commit | 83f5539e37a5f6a300eea1a929dcc8ece7cfffbb (patch) | |
tree | 9d084e1130e090209ba821b2c99a12e04156a83c | |
parent | 8d531a009b82140e46c87d0501b7ef6352defc43 (diff) |
YDB SDK Sync from git
5 files changed, 35 insertions, 10 deletions
diff --git a/ydb/public/sdk/python3/ydb/_topic_reader/datatypes.py b/ydb/public/sdk/python3/ydb/_topic_reader/datatypes.py index 4c209f27d43..1b767e7c999 100644 --- a/ydb/public/sdk/python3/ydb/_topic_reader/datatypes.py +++ b/ydb/public/sdk/python3/ydb/_topic_reader/datatypes.py @@ -82,6 +82,8 @@ class PartitionSession: self._loop = None def add_waiter(self, end_offset: int) -> "PartitionSession.CommitAckWaiter": + self._ensure_not_closed() + waiter = PartitionSession.CommitAckWaiter(end_offset, self._create_future()) if end_offset <= self.committed_offset: waiter._finish_ok() @@ -121,7 +123,7 @@ class PartitionSession: return self.state = PartitionSession.State.Stopped - exception = topic_reader_asyncio.TopicReaderCommitToExpiredPartition() + exception = topic_reader_asyncio.PublicTopicReaderPartitionExpiredError() for waiter in self._ack_waiters: waiter._finish_error(exception) @@ -131,7 +133,7 @@ class PartitionSession: def _ensure_not_closed(self): if self.state == PartitionSession.State.Stopped: - raise topic_reader_asyncio.TopicReaderCommitToExpiredPartition() + raise topic_reader_asyncio.PublicTopicReaderPartitionExpiredError() class State(enum.Enum): Active = 1 diff --git a/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py index ebe7bd6b46c..facd08530ae 100644 --- a/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py @@ -39,7 +39,7 @@ class TopicReaderUnexpectedCodec(YdbError): pass -class TopicReaderCommitToExpiredPartition(TopicReaderError): +class PublicTopicReaderPartitionExpiredError(TopicReaderError): """ Commit message when partition read session are dropped. It is ok - the message/batch will not commit to server and will receive in other read session @@ -114,15 +114,22 @@ class PublicAsyncIOReader: Write commit message to a buffer. For the method no way check the commit result - (for example if lost connection - commits will not re-send and committed messages will receive again) + (for example if lost connection - commits will not re-send and committed messages will receive again). """ - self._reconnector.commit(batch) + try: + self._reconnector.commit(batch) + except PublicTopicReaderPartitionExpiredError: + pass async def commit_with_ack(self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch]): """ write commit message to a buffer and wait ack from the server. use asyncio.wait_for for wait with timeout. + + may raise ydb.TopicReaderPartitionExpiredError, the error mean reader partition closed from server + before receive commit ack. Message may be acked or not (if not - it will send in other read session, + to this or other reader). """ waiter = self._reconnector.commit(batch) await waiter.future @@ -174,6 +181,14 @@ class ReaderReconnector: await asyncio.sleep(retry_info.sleep_timeout_seconds) attempt += 1 + finally: + if self._stream_reader is not None: + # noinspection PyBroadException + try: + await self._stream_reader.close() + except BaseException: + # supress any error on close stream reader + pass async def wait_message(self): while True: @@ -348,6 +363,9 @@ class ReaderStream: return batch def receive_message_nowait(self): + if self._get_first_error(): + raise self._get_first_error() + try: batch = self._message_batches[0] message = batch.pop_message() @@ -355,7 +373,7 @@ class ReaderStream: return None if batch.empty(): - self._message_batches.popleft() + self.receive_batch_nowait() return message @@ -366,10 +384,10 @@ class ReaderStream: raise TopicReaderError("reader can commit only self-produced messages") if partition_session.reader_stream_id != self._id: - raise TopicReaderCommitToExpiredPartition("commit messages after reconnect to server") + raise PublicTopicReaderPartitionExpiredError("commit messages after reconnect to server") if partition_session.id not in self._partition_sessions: - raise TopicReaderCommitToExpiredPartition("commit messages after server stop the partition read session") + raise PublicTopicReaderPartitionExpiredError("commit messages after server stop the partition read session") commit_range = batch._commit_get_offsets_range() waiter = partition_session.add_waiter(commit_range.end) @@ -617,6 +635,7 @@ class ReaderStream: async def close(self): if self._closed: return + self._closed = True self._set_first_error(TopicReaderStreamClosedError()) @@ -625,6 +644,7 @@ class ReaderStream: for session in self._partition_sessions.values(): session.close() + self._partition_sessions.clear() for task in self._background_tasks: task.cancel() diff --git a/ydb/public/sdk/python3/ydb/_topic_writer/topic_writer_asyncio.py b/ydb/public/sdk/python3/ydb/_topic_writer/topic_writer_asyncio.py index d83187fc1b7..dd969c7e8e0 100644 --- a/ydb/public/sdk/python3/ydb/_topic_writer/topic_writer_asyncio.py +++ b/ydb/public/sdk/python3/ydb/_topic_writer/topic_writer_asyncio.py @@ -1,6 +1,7 @@ import asyncio import concurrent.futures import datetime +import functools import gzip import typing from collections import deque @@ -75,7 +76,7 @@ class WriterAsyncIO: if self._closed or self._loop.is_closed(): return - self._loop.call_soon(self.close, False) + self._loop.call_soon(functools.partial(self.close, flush=False)) async def close(self, *, flush: bool = True): if self._closed: diff --git a/ydb/public/sdk/python3/ydb/topic.py b/ydb/public/sdk/python3/ydb/topic.py index 190f532908a..d598fb7d2e1 100644 --- a/ydb/public/sdk/python3/ydb/topic.py +++ b/ydb/public/sdk/python3/ydb/topic.py @@ -13,6 +13,7 @@ __all__ = [ "TopicReaderAsyncIO", "TopicReaderSelector", "TopicReaderSettings", + "TopicReaderPartitionExpiredError", "TopicStatWindow", "TopicWriteResult", "TopicWriter", @@ -40,6 +41,7 @@ from ._topic_reader.topic_reader_sync import TopicReaderSync as TopicReader from ._topic_reader.topic_reader_asyncio import ( PublicAsyncIOReader as TopicReaderAsyncIO, + PublicTopicReaderPartitionExpiredError as TopicReaderPartitionExpiredError, ) from ._topic_writer.topic_writer import ( # noqa: F401 diff --git a/ydb/public/sdk/python3/ydb/ydb_version.py b/ydb/public/sdk/python3/ydb/ydb_version.py index 63ab73ab853..9a114d2031a 100644 --- a/ydb/public/sdk/python3/ydb/ydb_version.py +++ b/ydb/public/sdk/python3/ydb/ydb_version.py @@ -1 +1 @@ -VERSION = "3.3.4" +VERSION = "3.3.5" |