aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrekby <rekby@ydb.tech>2023-03-21 22:02:56 +0300
committerrekby <rekby@ydb.tech>2023-03-21 22:02:56 +0300
commitf2900afd402bd86173b9e8e419b3ae70e51c5e18 (patch)
tree1c9f3ef4999254adbd2bae5dfeb35fd1bb5ccb7c
parent8d763f3ce0e3793a611ef7bbcad00cc86ef11252 (diff)
downloadydb-f2900afd402bd86173b9e8e419b3ae70e51c5e18.tar.gz
add receive one message, swap topic reader args (fix api bug)
-rw-r--r--ydb/public/sdk/python3/ydb/_topic_reader/datatypes.py6
-rw-r--r--ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py34
-rw-r--r--ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_sync.py27
-rw-r--r--ydb/public/sdk/python3/ydb/topic.py4
-rw-r--r--ydb/public/sdk/python3/ydb/ydb_version.py2
5 files changed, 54 insertions, 19 deletions
diff --git a/ydb/public/sdk/python3/ydb/_topic_reader/datatypes.py b/ydb/public/sdk/python3/ydb/_topic_reader/datatypes.py
index 3845995fcf..5376c76dc1 100644
--- a/ydb/public/sdk/python3/ydb/_topic_reader/datatypes.py
+++ b/ydb/public/sdk/python3/ydb/_topic_reader/datatypes.py
@@ -179,6 +179,9 @@ class PublicBatch(ICommittable, ISessionAlive):
self.messages[-1]._commit_get_offsets_range().end,
)
+ def empty(self) -> bool:
+ return len(self.messages) == 0
+
# ISessionAlive implementation
@property
def is_alive(self) -> bool:
@@ -187,3 +190,6 @@ class PublicBatch(ICommittable, ISessionAlive):
state == PartitionSession.State.Active
or state == PartitionSession.State.GracefulShutdown
)
+
+ def pop_message(self) -> PublicMessage:
+ return self.messages.pop(0)
diff --git a/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py
index bb87d3ccc8..0068e4ba2f 100644
--- a/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py
+++ b/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py
@@ -95,14 +95,6 @@ class PublicAsyncIOReader:
"""
raise NotImplementedError()
- async def receive_message(self) -> typing.Union[topic_reader.PublicMessage, None]:
- """
- Block until receive new message
-
- use asyncio.wait_for for wait with timeout.
- """
- raise NotImplementedError()
-
def batches(
self,
*,
@@ -133,6 +125,15 @@ class PublicAsyncIOReader:
await self._reconnector.wait_message()
return self._reconnector.receive_batch_nowait()
+ async def receive_message(self) -> typing.Optional[datatypes.PublicMessage]:
+ """
+ Block until receive new message
+
+ use asyncio.wait_for for wait with timeout.
+ """
+ await self._reconnector.wait_message()
+ return self._reconnector.receive_message_nowait()
+
async def commit_on_exit(
self, mess: datatypes.ICommittable
) -> typing.AsyncContextManager:
@@ -244,6 +245,9 @@ class ReaderReconnector:
def receive_batch_nowait(self):
return self._stream_reader.receive_batch_nowait()
+ def receive_message_nowait(self):
+ return self._stream_reader.receive_message_nowait()
+
def commit(
self, batch: datatypes.ICommittable
) -> datatypes.PartitionSession.CommitAckWaiter:
@@ -397,12 +401,24 @@ class ReaderStream:
raise self._get_first_error()
if not self._message_batches:
- return
+ return None
batch = self._message_batches.popleft()
self._buffer_release_bytes(batch._bytes_size)
return batch
+ def receive_message_nowait(self):
+ try:
+ batch = self._message_batches[0]
+ message = batch.pop_message()
+ except IndexError:
+ return None
+
+ if batch.empty():
+ self._message_batches.popleft()
+
+ return message
+
def commit(
self, batch: datatypes.ICommittable
) -> datatypes.PartitionSession.CommitAckWaiter:
diff --git a/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_sync.py b/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_sync.py
index 30bf92a10e..ed9730fafd 100644
--- a/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_sync.py
+++ b/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_sync.py
@@ -83,19 +83,28 @@ class TopicReaderSync:
It has no async_ version for prevent lost messages, use async_wait_message as signal for new batches available.
if no new message in timeout seconds (default - infinite): stop iterations by raise StopIteration
- if timeout <= 0 - it will fast non block method, get messages from internal buffer only.
+ if timeout <= 0 - it will fast wait only one event loop cycle - without wait any i/o operations or pauses,
+ get messages from internal buffer only.
"""
raise NotImplementedError()
- def receive_message(self, *, timeout: Union[float, None] = None) -> PublicMessage:
+ def receive_message(
+ self, *, timeout: TimeoutType = None
+ ) -> datatypes.PublicMessage:
"""
Block until receive new message
It has no async_ version for prevent lost messages, use async_wait_message as signal for new batches available.
+ receive_message(timeout=0) may return None even right after async_wait_message() is ok - because lost of partition
+ or connection to server lost
if no new message in timeout seconds (default - infinite): raise TimeoutError()
- if timeout <= 0 - it will fast non block method, get messages from internal buffer only.
+ if timeout <= 0 - it will fast wait only one event loop cycle - without wait any i/o operations or pauses, get messages from internal buffer only.
"""
- raise NotImplementedError()
+ self._check_closed()
+
+ return self._caller.safe_call_with_result(
+ self._async_reader.receive_message(), timeout
+ )
def async_wait_message(self) -> concurrent.futures.Future:
"""
@@ -105,7 +114,11 @@ class TopicReaderSync:
Possible situation when receive signal about message available, but no messages when try to receive a message.
If message expired between send event and try to retrieve message (for example connection broken).
"""
- raise NotImplementedError()
+ self._check_closed()
+
+ return self._caller.unsafe_call_with_future(
+ self._async_reader._reconnector.wait_message()
+ )
def batches(
self,
@@ -119,7 +132,7 @@ class TopicReaderSync:
It has no async_ version for prevent lost messages, use async_wait_message as signal for new batches available.
if no new message in timeout seconds (default - infinite): stop iterations by raise StopIteration
- if timeout <= 0 - it will fast non block method, get messages from internal buffer only.
+ if timeout <= 0 - it will fast wait only one event loop cycle - without wait any i/o operations or pauses, get messages from internal buffer only.
"""
raise NotImplementedError()
@@ -135,7 +148,7 @@ class TopicReaderSync:
It has no async_ version for prevent lost messages, use async_wait_message as signal for new batches available.
if no new message in timeout seconds (default - infinite): raise TimeoutError()
- if timeout <= 0 - it will fast non block method, get messages from internal buffer only.
+ if timeout <= 0 - it will fast wait only one event loop cycle - without wait any i/o operations or pauses, get messages from internal buffer only.
"""
self._check_closed()
diff --git a/ydb/public/sdk/python3/ydb/topic.py b/ydb/public/sdk/python3/ydb/topic.py
index efe62219cb..ae6b5a5b40 100644
--- a/ydb/public/sdk/python3/ydb/topic.py
+++ b/ydb/public/sdk/python3/ydb/topic.py
@@ -137,8 +137,8 @@ class TopicClientAsyncIO:
def reader(
self,
- consumer: str,
topic: str,
+ consumer: str,
buffer_size_bytes: int = 50 * 1024 * 1024,
# decoders: map[codec_code] func(encoded_bytes)->decoded_bytes
decoders: Union[Mapping[int, Callable[[bytes], bytes]], None] = None,
@@ -306,8 +306,8 @@ class TopicClient:
def reader(
self,
- consumer: str,
topic: str,
+ consumer: str,
buffer_size_bytes: int = 50 * 1024 * 1024,
# decoders: map[codec_code] func(encoded_bytes)->decoded_bytes
decoders: Union[Mapping[int, Callable[[bytes], bytes]], None] = None,
diff --git a/ydb/public/sdk/python3/ydb/ydb_version.py b/ydb/public/sdk/python3/ydb/ydb_version.py
index ef5ee52a9b..9b3d0a8cdd 100644
--- a/ydb/public/sdk/python3/ydb/ydb_version.py
+++ b/ydb/public/sdk/python3/ydb/ydb_version.py
@@ -1 +1 @@
-VERSION = "3.0.1b9"
+VERSION = "3.0.1b11"