diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-04-26 12:55:31 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2024-04-26 13:05:51 +0300 |
commit | c455f2fcfbdb29b94e5555c43149f4fd10692e11 (patch) | |
tree | 43ea71ff4bfb9109b0266916f92d5fcb7c07097a | |
parent | 5bbe44ff4e12b6d5496d56ecca97b0c4db340509 (diff) | |
download | ydb-c455f2fcfbdb29b94e5555c43149f4fd10692e11.tar.gz |
Intermediate changes
-rw-r--r-- | contrib/python/ydb/py3/.dist-info/METADATA | 2 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ya.make | 2 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py | 33 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/topic.py | 2 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/ydb_version.py | 2 |
5 files changed, 28 insertions, 13 deletions
diff --git a/contrib/python/ydb/py3/.dist-info/METADATA b/contrib/python/ydb/py3/.dist-info/METADATA index 6db8a9f068..31e64bdc6f 100644 --- a/contrib/python/ydb/py3/.dist-info/METADATA +++ b/contrib/python/ydb/py3/.dist-info/METADATA @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: ydb -Version: 3.11.1 +Version: 3.11.2 Summary: YDB Python SDK Home-page: http://github.com/ydb-platform/ydb-python-sdk Author: Yandex LLC diff --git a/contrib/python/ydb/py3/ya.make b/contrib/python/ydb/py3/ya.make index bc2089bef2..99b2cc9f4d 100644 --- a/contrib/python/ydb/py3/ya.make +++ b/contrib/python/ydb/py3/ya.make @@ -2,7 +2,7 @@ PY3_LIBRARY() -VERSION(3.11.1) +VERSION(3.11.2) LICENSE(Apache-2.0) diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py index 50684f7cf9..7b3d1cfa10 100644 --- a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py +++ b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py @@ -8,6 +8,7 @@ from asyncio import Task from collections import deque from typing import Optional, Set, Dict, Union, Callable +import ydb from .. import _apis, issues from .._utilities import AtomicCounter from ..aio import Driver @@ -35,7 +36,7 @@ class TopicReaderError(YdbError): pass -class TopicReaderUnexpectedCodec(YdbError): +class PublicTopicReaderUnexpectedCodecError(YdbError): pass @@ -222,9 +223,7 @@ class ReaderReconnector: async def close(self, flush: bool): if self._stream_reader: - if flush: - await self.flush() - await self._stream_reader.close() + await self._stream_reader.close(flush) for task in self._background_tasks: task.cancel() @@ -339,9 +338,12 @@ class ReaderStream: self._update_token_event.set() self._background_tasks.add(asyncio.create_task(self._read_messages_loop(), name="read_messages_loop")) - self._background_tasks.add(asyncio.create_task(self._decode_batches_loop())) + self._background_tasks.add(asyncio.create_task(self._decode_batches_loop(), name="decode_batches")) if self._get_token_function: self._background_tasks.add(asyncio.create_task(self._update_token_loop(), name="update_token_loop")) + self._background_tasks.add( + asyncio.create_task(self._handle_background_errors(), name="handle_background_errors") + ) async def wait_error(self): raise await self._first_error @@ -411,6 +413,17 @@ class ReaderStream: return waiter + async def _handle_background_errors(self): + done, _ = await asyncio.wait(self._background_tasks, return_when=asyncio.FIRST_EXCEPTION) + for f in done: + f = f # type: asyncio.Future + err = f.exception() + if not isinstance(err, ydb.Error): + old_err = err + err = ydb.Error("Background process failed unexpected") + err.__cause__ = old_err + self._set_first_error(err) + async def _read_messages_loop(self): try: self._stream.write( @@ -602,7 +615,7 @@ class ReaderStream: try: decode_func = self._decoders[batch._codec] except KeyError: - raise TopicReaderUnexpectedCodec("Receive message with unexpected codec: %s" % batch._codec) + raise PublicTopicReaderUnexpectedCodecError("Receive message with unexpected codec: %s" % batch._codec) decode_data_futures = [] for message in batch.messages: @@ -628,9 +641,6 @@ class ReaderStream: return self._first_error.result() async def flush(self): - if self._closed: - raise RuntimeError("Flush on closed Stream") - futures = [] for session in self._partition_sessions.values(): futures.extend(w.future for w in session._ack_waiters) @@ -638,12 +648,15 @@ class ReaderStream: if futures: await asyncio.wait(futures) - async def close(self): + async def close(self, flush: bool): if self._closed: return self._closed = True + if flush: + await self.flush() + self._set_first_error(TopicReaderStreamClosedError()) self._state_changed.set() self._stream.close() diff --git a/contrib/python/ydb/py3/ydb/topic.py b/contrib/python/ydb/py3/ydb/topic.py index 2175af47f7..948bcff4cf 100644 --- a/contrib/python/ydb/py3/ydb/topic.py +++ b/contrib/python/ydb/py3/ydb/topic.py @@ -15,6 +15,7 @@ __all__ = [ "TopicReaderMessage", "TopicReaderSelector", "TopicReaderSettings", + "TopicReaderUnexpectedCodecError", "TopicReaderPartitionExpiredError", "TopicStatWindow", "TopicWriteResult", @@ -49,6 +50,7 @@ from ._topic_reader.topic_reader_sync import TopicReaderSync as TopicReader from ._topic_reader.topic_reader_asyncio import ( PublicAsyncIOReader as TopicReaderAsyncIO, PublicTopicReaderPartitionExpiredError as TopicReaderPartitionExpiredError, + PublicTopicReaderUnexpectedCodecError as TopicReaderUnexpectedCodecError, ) from ._topic_writer.topic_writer import ( # noqa: F401 diff --git a/contrib/python/ydb/py3/ydb/ydb_version.py b/contrib/python/ydb/py3/ydb/ydb_version.py index 03dc246c4e..a5b8a0a4e9 100644 --- a/contrib/python/ydb/py3/ydb/ydb_version.py +++ b/contrib/python/ydb/py3/ydb/ydb_version.py @@ -1 +1 @@ -VERSION = "3.11.1" +VERSION = "3.11.2" |