summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-kikimr-dev <[email protected]>2023-08-21 16:30:07 +0300
committerrobot-kikimr-dev <[email protected]>2023-08-21 17:01:37 +0300
commited563c45a30cc549e23e0b672f0e890fa6849361 (patch)
tree88f53ef7bdeecf5502f4471dd68aa249b7c1b948
parent460cbb4a7bab45581fa4cd17d3750e1374db2863 (diff)
YDB SDK Sync from git
-rw-r--r--ydb/public/sdk/python3/ydb/_topic_reader/datatypes.py13
-rw-r--r--ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py6
-rw-r--r--ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_sync.py2
-rw-r--r--ydb/public/sdk/python3/ydb/topic.py6
-rw-r--r--ydb/public/sdk/python3/ydb/ydb_version.py2
5 files changed, 14 insertions, 15 deletions
diff --git a/ydb/public/sdk/python3/ydb/_topic_reader/datatypes.py b/ydb/public/sdk/python3/ydb/_topic_reader/datatypes.py
index 1b767e7c999..28155ea7a34 100644
--- a/ydb/public/sdk/python3/ydb/_topic_reader/datatypes.py
+++ b/ydb/public/sdk/python3/ydb/_topic_reader/datatypes.py
@@ -71,20 +71,14 @@ class PartitionSession:
_ack_waiters: Deque["PartitionSession.CommitAckWaiter"] = field(init=False, default_factory=lambda: deque())
_state_changed: asyncio.Event = field(init=False, default_factory=lambda: asyncio.Event(), compare=False)
- _loop: Optional[asyncio.AbstractEventLoop] = field(init=False) # may be None in tests
def __post_init__(self):
self._next_message_start_commit_offset = self.committed_offset
- try:
- self._loop = asyncio.get_running_loop()
- except RuntimeError:
- self._loop = None
-
def add_waiter(self, end_offset: int) -> "PartitionSession.CommitAckWaiter":
self._ensure_not_closed()
- waiter = PartitionSession.CommitAckWaiter(end_offset, self._create_future())
+ waiter = PartitionSession.CommitAckWaiter(end_offset, asyncio.Future())
if end_offset <= self.committed_offset:
waiter._finish_ok()
return waiter
@@ -97,11 +91,6 @@ class PartitionSession:
return waiter
- def _create_future(self) -> asyncio.Future:
- if self._loop:
- return self._loop.create_future()
- return asyncio.Future()
-
def ack_notify(self, offset: int):
self._ensure_not_closed()
diff --git a/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py
index facd08530ae..50684f7cf97 100644
--- a/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py
+++ b/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py
@@ -88,6 +88,12 @@ class PublicAsyncIOReader:
if not self._closed:
self._loop.create_task(self.close(flush=False), name="close reader")
+ async def wait_message(self):
+ """
+ Wait at least one message from reader.
+ """
+ await self._reconnector.wait_message()
+
async def receive_batch(
self,
) -> typing.Union[datatypes.PublicBatch, None]:
diff --git a/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_sync.py b/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_sync.py
index 5c8db630c73..e5b4e1a2b42 100644
--- a/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_sync.py
+++ b/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_sync.py
@@ -84,7 +84,7 @@ class TopicReaderSync:
"""
self._check_closed()
- return self._caller.unsafe_call_with_future(self._async_reader._reconnector.wait_message())
+ return self._caller.unsafe_call_with_future(self._async_reader.wait_message())
def receive_batch(
self,
diff --git a/ydb/public/sdk/python3/ydb/topic.py b/ydb/public/sdk/python3/ydb/topic.py
index e0d3cf2dd1b..00ffb1c479a 100644
--- a/ydb/public/sdk/python3/ydb/topic.py
+++ b/ydb/public/sdk/python3/ydb/topic.py
@@ -11,6 +11,7 @@ __all__ = [
"TopicMeteringMode",
"TopicReader",
"TopicReaderAsyncIO",
+ "TopicReaderBatch",
"TopicReaderMessage",
"TopicReaderSelector",
"TopicReaderSettings",
@@ -33,7 +34,10 @@ from . import aio, Credentials, _apis, issues
from . import driver
-from ._topic_reader.datatypes import PublicMessage as TopicReaderMessage
+from ._topic_reader.datatypes import (
+ PublicBatch as TopicReaderBatch,
+ PublicMessage as TopicReaderMessage,
+)
from ._topic_reader.topic_reader import (
PublicReaderSettings as TopicReaderSettings,
diff --git a/ydb/public/sdk/python3/ydb/ydb_version.py b/ydb/public/sdk/python3/ydb/ydb_version.py
index 78ab3284ca2..84d3799eff0 100644
--- a/ydb/public/sdk/python3/ydb/ydb_version.py
+++ b/ydb/public/sdk/python3/ydb/ydb_version.py
@@ -1 +1 @@
-VERSION = "3.3.7"
+VERSION = "3.4.0"