diff options
author | robot-kikimr-dev <[email protected]> | 2023-08-21 16:30:07 +0300 |
---|---|---|
committer | robot-kikimr-dev <[email protected]> | 2023-08-21 17:01:37 +0300 |
commit | ed563c45a30cc549e23e0b672f0e890fa6849361 (patch) | |
tree | 88f53ef7bdeecf5502f4471dd68aa249b7c1b948 | |
parent | 460cbb4a7bab45581fa4cd17d3750e1374db2863 (diff) |
YDB SDK Sync from git
5 files changed, 14 insertions, 15 deletions
diff --git a/ydb/public/sdk/python3/ydb/_topic_reader/datatypes.py b/ydb/public/sdk/python3/ydb/_topic_reader/datatypes.py index 1b767e7c999..28155ea7a34 100644 --- a/ydb/public/sdk/python3/ydb/_topic_reader/datatypes.py +++ b/ydb/public/sdk/python3/ydb/_topic_reader/datatypes.py @@ -71,20 +71,14 @@ class PartitionSession: _ack_waiters: Deque["PartitionSession.CommitAckWaiter"] = field(init=False, default_factory=lambda: deque()) _state_changed: asyncio.Event = field(init=False, default_factory=lambda: asyncio.Event(), compare=False) - _loop: Optional[asyncio.AbstractEventLoop] = field(init=False) # may be None in tests def __post_init__(self): self._next_message_start_commit_offset = self.committed_offset - try: - self._loop = asyncio.get_running_loop() - except RuntimeError: - self._loop = None - def add_waiter(self, end_offset: int) -> "PartitionSession.CommitAckWaiter": self._ensure_not_closed() - waiter = PartitionSession.CommitAckWaiter(end_offset, self._create_future()) + waiter = PartitionSession.CommitAckWaiter(end_offset, asyncio.Future()) if end_offset <= self.committed_offset: waiter._finish_ok() return waiter @@ -97,11 +91,6 @@ class PartitionSession: return waiter - def _create_future(self) -> asyncio.Future: - if self._loop: - return self._loop.create_future() - return asyncio.Future() - def ack_notify(self, offset: int): self._ensure_not_closed() diff --git a/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py index facd08530ae..50684f7cf97 100644 --- a/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py @@ -88,6 +88,12 @@ class PublicAsyncIOReader: if not self._closed: self._loop.create_task(self.close(flush=False), name="close reader") + async def wait_message(self): + """ + Wait at least one message from reader. + """ + await self._reconnector.wait_message() + async def receive_batch( self, ) -> typing.Union[datatypes.PublicBatch, None]: diff --git a/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_sync.py b/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_sync.py index 5c8db630c73..e5b4e1a2b42 100644 --- a/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_sync.py +++ b/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_sync.py @@ -84,7 +84,7 @@ class TopicReaderSync: """ self._check_closed() - return self._caller.unsafe_call_with_future(self._async_reader._reconnector.wait_message()) + return self._caller.unsafe_call_with_future(self._async_reader.wait_message()) def receive_batch( self, diff --git a/ydb/public/sdk/python3/ydb/topic.py b/ydb/public/sdk/python3/ydb/topic.py index e0d3cf2dd1b..00ffb1c479a 100644 --- a/ydb/public/sdk/python3/ydb/topic.py +++ b/ydb/public/sdk/python3/ydb/topic.py @@ -11,6 +11,7 @@ __all__ = [ "TopicMeteringMode", "TopicReader", "TopicReaderAsyncIO", + "TopicReaderBatch", "TopicReaderMessage", "TopicReaderSelector", "TopicReaderSettings", @@ -33,7 +34,10 @@ from . import aio, Credentials, _apis, issues from . import driver -from ._topic_reader.datatypes import PublicMessage as TopicReaderMessage +from ._topic_reader.datatypes import ( + PublicBatch as TopicReaderBatch, + PublicMessage as TopicReaderMessage, +) from ._topic_reader.topic_reader import ( PublicReaderSettings as TopicReaderSettings, diff --git a/ydb/public/sdk/python3/ydb/ydb_version.py b/ydb/public/sdk/python3/ydb/ydb_version.py index 78ab3284ca2..84d3799eff0 100644 --- a/ydb/public/sdk/python3/ydb/ydb_version.py +++ b/ydb/public/sdk/python3/ydb/ydb_version.py @@ -1 +1 @@ -VERSION = "3.3.7" +VERSION = "3.4.0" |