aboutsummaryrefslogtreecommitdiffstats
path: root/contrib
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-04-26 12:55:31 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2024-04-26 13:05:51 +0300
commitc455f2fcfbdb29b94e5555c43149f4fd10692e11 (patch)
tree43ea71ff4bfb9109b0266916f92d5fcb7c07097a /contrib
parent5bbe44ff4e12b6d5496d56ecca97b0c4db340509 (diff)
downloadydb-c455f2fcfbdb29b94e5555c43149f4fd10692e11.tar.gz
Intermediate changes
Diffstat (limited to 'contrib')
-rw-r--r--contrib/python/ydb/py3/.dist-info/METADATA2
-rw-r--r--contrib/python/ydb/py3/ya.make2
-rw-r--r--contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py33
-rw-r--r--contrib/python/ydb/py3/ydb/topic.py2
-rw-r--r--contrib/python/ydb/py3/ydb/ydb_version.py2
5 files changed, 28 insertions, 13 deletions
diff --git a/contrib/python/ydb/py3/.dist-info/METADATA b/contrib/python/ydb/py3/.dist-info/METADATA
index 6db8a9f068..31e64bdc6f 100644
--- a/contrib/python/ydb/py3/.dist-info/METADATA
+++ b/contrib/python/ydb/py3/.dist-info/METADATA
@@ -1,6 +1,6 @@
Metadata-Version: 2.1
Name: ydb
-Version: 3.11.1
+Version: 3.11.2
Summary: YDB Python SDK
Home-page: http://github.com/ydb-platform/ydb-python-sdk
Author: Yandex LLC
diff --git a/contrib/python/ydb/py3/ya.make b/contrib/python/ydb/py3/ya.make
index bc2089bef2..99b2cc9f4d 100644
--- a/contrib/python/ydb/py3/ya.make
+++ b/contrib/python/ydb/py3/ya.make
@@ -2,7 +2,7 @@
PY3_LIBRARY()
-VERSION(3.11.1)
+VERSION(3.11.2)
LICENSE(Apache-2.0)
diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py
index 50684f7cf9..7b3d1cfa10 100644
--- a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py
+++ b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py
@@ -8,6 +8,7 @@ from asyncio import Task
from collections import deque
from typing import Optional, Set, Dict, Union, Callable
+import ydb
from .. import _apis, issues
from .._utilities import AtomicCounter
from ..aio import Driver
@@ -35,7 +36,7 @@ class TopicReaderError(YdbError):
pass
-class TopicReaderUnexpectedCodec(YdbError):
+class PublicTopicReaderUnexpectedCodecError(YdbError):
pass
@@ -222,9 +223,7 @@ class ReaderReconnector:
async def close(self, flush: bool):
if self._stream_reader:
- if flush:
- await self.flush()
- await self._stream_reader.close()
+ await self._stream_reader.close(flush)
for task in self._background_tasks:
task.cancel()
@@ -339,9 +338,12 @@ class ReaderStream:
self._update_token_event.set()
self._background_tasks.add(asyncio.create_task(self._read_messages_loop(), name="read_messages_loop"))
- self._background_tasks.add(asyncio.create_task(self._decode_batches_loop()))
+ self._background_tasks.add(asyncio.create_task(self._decode_batches_loop(), name="decode_batches"))
if self._get_token_function:
self._background_tasks.add(asyncio.create_task(self._update_token_loop(), name="update_token_loop"))
+ self._background_tasks.add(
+ asyncio.create_task(self._handle_background_errors(), name="handle_background_errors")
+ )
async def wait_error(self):
raise await self._first_error
@@ -411,6 +413,17 @@ class ReaderStream:
return waiter
+ async def _handle_background_errors(self):
+ done, _ = await asyncio.wait(self._background_tasks, return_when=asyncio.FIRST_EXCEPTION)
+ for f in done:
+ f = f # type: asyncio.Future
+ err = f.exception()
+ if not isinstance(err, ydb.Error):
+ old_err = err
+ err = ydb.Error("Background process failed unexpected")
+ err.__cause__ = old_err
+ self._set_first_error(err)
+
async def _read_messages_loop(self):
try:
self._stream.write(
@@ -602,7 +615,7 @@ class ReaderStream:
try:
decode_func = self._decoders[batch._codec]
except KeyError:
- raise TopicReaderUnexpectedCodec("Receive message with unexpected codec: %s" % batch._codec)
+ raise PublicTopicReaderUnexpectedCodecError("Receive message with unexpected codec: %s" % batch._codec)
decode_data_futures = []
for message in batch.messages:
@@ -628,9 +641,6 @@ class ReaderStream:
return self._first_error.result()
async def flush(self):
- if self._closed:
- raise RuntimeError("Flush on closed Stream")
-
futures = []
for session in self._partition_sessions.values():
futures.extend(w.future for w in session._ack_waiters)
@@ -638,12 +648,15 @@ class ReaderStream:
if futures:
await asyncio.wait(futures)
- async def close(self):
+ async def close(self, flush: bool):
if self._closed:
return
self._closed = True
+ if flush:
+ await self.flush()
+
self._set_first_error(TopicReaderStreamClosedError())
self._state_changed.set()
self._stream.close()
diff --git a/contrib/python/ydb/py3/ydb/topic.py b/contrib/python/ydb/py3/ydb/topic.py
index 2175af47f7..948bcff4cf 100644
--- a/contrib/python/ydb/py3/ydb/topic.py
+++ b/contrib/python/ydb/py3/ydb/topic.py
@@ -15,6 +15,7 @@ __all__ = [
"TopicReaderMessage",
"TopicReaderSelector",
"TopicReaderSettings",
+ "TopicReaderUnexpectedCodecError",
"TopicReaderPartitionExpiredError",
"TopicStatWindow",
"TopicWriteResult",
@@ -49,6 +50,7 @@ from ._topic_reader.topic_reader_sync import TopicReaderSync as TopicReader
from ._topic_reader.topic_reader_asyncio import (
PublicAsyncIOReader as TopicReaderAsyncIO,
PublicTopicReaderPartitionExpiredError as TopicReaderPartitionExpiredError,
+ PublicTopicReaderUnexpectedCodecError as TopicReaderUnexpectedCodecError,
)
from ._topic_writer.topic_writer import ( # noqa: F401
diff --git a/contrib/python/ydb/py3/ydb/ydb_version.py b/contrib/python/ydb/py3/ydb/ydb_version.py
index 03dc246c4e..a5b8a0a4e9 100644
--- a/contrib/python/ydb/py3/ydb/ydb_version.py
+++ b/contrib/python/ydb/py3/ydb/ydb_version.py
@@ -1 +1 @@
-VERSION = "3.11.1"
+VERSION = "3.11.2"