diff options
author | rekby <rekby@ydb.tech> | 2023-03-21 22:02:56 +0300 |
---|---|---|
committer | rekby <rekby@ydb.tech> | 2023-03-21 22:02:56 +0300 |
commit | f2900afd402bd86173b9e8e419b3ae70e51c5e18 (patch) | |
tree | 1c9f3ef4999254adbd2bae5dfeb35fd1bb5ccb7c | |
parent | 8d763f3ce0e3793a611ef7bbcad00cc86ef11252 (diff) | |
download | ydb-f2900afd402bd86173b9e8e419b3ae70e51c5e18.tar.gz |
add receive one message, swap topic reader args (fix api bug)
5 files changed, 54 insertions, 19 deletions
diff --git a/ydb/public/sdk/python3/ydb/_topic_reader/datatypes.py b/ydb/public/sdk/python3/ydb/_topic_reader/datatypes.py index 3845995fcf..5376c76dc1 100644 --- a/ydb/public/sdk/python3/ydb/_topic_reader/datatypes.py +++ b/ydb/public/sdk/python3/ydb/_topic_reader/datatypes.py @@ -179,6 +179,9 @@ class PublicBatch(ICommittable, ISessionAlive): self.messages[-1]._commit_get_offsets_range().end, ) + def empty(self) -> bool: + return len(self.messages) == 0 + # ISessionAlive implementation @property def is_alive(self) -> bool: @@ -187,3 +190,6 @@ class PublicBatch(ICommittable, ISessionAlive): state == PartitionSession.State.Active or state == PartitionSession.State.GracefulShutdown ) + + def pop_message(self) -> PublicMessage: + return self.messages.pop(0) diff --git a/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py index bb87d3ccc8..0068e4ba2f 100644 --- a/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py @@ -95,14 +95,6 @@ class PublicAsyncIOReader: """ raise NotImplementedError() - async def receive_message(self) -> typing.Union[topic_reader.PublicMessage, None]: - """ - Block until receive new message - - use asyncio.wait_for for wait with timeout. - """ - raise NotImplementedError() - def batches( self, *, @@ -133,6 +125,15 @@ class PublicAsyncIOReader: await self._reconnector.wait_message() return self._reconnector.receive_batch_nowait() + async def receive_message(self) -> typing.Optional[datatypes.PublicMessage]: + """ + Block until receive new message + + use asyncio.wait_for for wait with timeout. + """ + await self._reconnector.wait_message() + return self._reconnector.receive_message_nowait() + async def commit_on_exit( self, mess: datatypes.ICommittable ) -> typing.AsyncContextManager: @@ -244,6 +245,9 @@ class ReaderReconnector: def receive_batch_nowait(self): return self._stream_reader.receive_batch_nowait() + def receive_message_nowait(self): + return self._stream_reader.receive_message_nowait() + def commit( self, batch: datatypes.ICommittable ) -> datatypes.PartitionSession.CommitAckWaiter: @@ -397,12 +401,24 @@ class ReaderStream: raise self._get_first_error() if not self._message_batches: - return + return None batch = self._message_batches.popleft() self._buffer_release_bytes(batch._bytes_size) return batch + def receive_message_nowait(self): + try: + batch = self._message_batches[0] + message = batch.pop_message() + except IndexError: + return None + + if batch.empty(): + self._message_batches.popleft() + + return message + def commit( self, batch: datatypes.ICommittable ) -> datatypes.PartitionSession.CommitAckWaiter: diff --git a/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_sync.py b/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_sync.py index 30bf92a10e..ed9730fafd 100644 --- a/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_sync.py +++ b/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_sync.py @@ -83,19 +83,28 @@ class TopicReaderSync: It has no async_ version for prevent lost messages, use async_wait_message as signal for new batches available. if no new message in timeout seconds (default - infinite): stop iterations by raise StopIteration - if timeout <= 0 - it will fast non block method, get messages from internal buffer only. + if timeout <= 0 - it will fast wait only one event loop cycle - without wait any i/o operations or pauses, + get messages from internal buffer only. """ raise NotImplementedError() - def receive_message(self, *, timeout: Union[float, None] = None) -> PublicMessage: + def receive_message( + self, *, timeout: TimeoutType = None + ) -> datatypes.PublicMessage: """ Block until receive new message It has no async_ version for prevent lost messages, use async_wait_message as signal for new batches available. + receive_message(timeout=0) may return None even right after async_wait_message() is ok - because lost of partition + or connection to server lost if no new message in timeout seconds (default - infinite): raise TimeoutError() - if timeout <= 0 - it will fast non block method, get messages from internal buffer only. + if timeout <= 0 - it will fast wait only one event loop cycle - without wait any i/o operations or pauses, get messages from internal buffer only. """ - raise NotImplementedError() + self._check_closed() + + return self._caller.safe_call_with_result( + self._async_reader.receive_message(), timeout + ) def async_wait_message(self) -> concurrent.futures.Future: """ @@ -105,7 +114,11 @@ class TopicReaderSync: Possible situation when receive signal about message available, but no messages when try to receive a message. If message expired between send event and try to retrieve message (for example connection broken). """ - raise NotImplementedError() + self._check_closed() + + return self._caller.unsafe_call_with_future( + self._async_reader._reconnector.wait_message() + ) def batches( self, @@ -119,7 +132,7 @@ class TopicReaderSync: It has no async_ version for prevent lost messages, use async_wait_message as signal for new batches available. if no new message in timeout seconds (default - infinite): stop iterations by raise StopIteration - if timeout <= 0 - it will fast non block method, get messages from internal buffer only. + if timeout <= 0 - it will fast wait only one event loop cycle - without wait any i/o operations or pauses, get messages from internal buffer only. """ raise NotImplementedError() @@ -135,7 +148,7 @@ class TopicReaderSync: It has no async_ version for prevent lost messages, use async_wait_message as signal for new batches available. if no new message in timeout seconds (default - infinite): raise TimeoutError() - if timeout <= 0 - it will fast non block method, get messages from internal buffer only. + if timeout <= 0 - it will fast wait only one event loop cycle - without wait any i/o operations or pauses, get messages from internal buffer only. """ self._check_closed() diff --git a/ydb/public/sdk/python3/ydb/topic.py b/ydb/public/sdk/python3/ydb/topic.py index efe62219cb..ae6b5a5b40 100644 --- a/ydb/public/sdk/python3/ydb/topic.py +++ b/ydb/public/sdk/python3/ydb/topic.py @@ -137,8 +137,8 @@ class TopicClientAsyncIO: def reader( self, - consumer: str, topic: str, + consumer: str, buffer_size_bytes: int = 50 * 1024 * 1024, # decoders: map[codec_code] func(encoded_bytes)->decoded_bytes decoders: Union[Mapping[int, Callable[[bytes], bytes]], None] = None, @@ -306,8 +306,8 @@ class TopicClient: def reader( self, - consumer: str, topic: str, + consumer: str, buffer_size_bytes: int = 50 * 1024 * 1024, # decoders: map[codec_code] func(encoded_bytes)->decoded_bytes decoders: Union[Mapping[int, Callable[[bytes], bytes]], None] = None, diff --git a/ydb/public/sdk/python3/ydb/ydb_version.py b/ydb/public/sdk/python3/ydb/ydb_version.py index ef5ee52a9b..9b3d0a8cdd 100644 --- a/ydb/public/sdk/python3/ydb/ydb_version.py +++ b/ydb/public/sdk/python3/ydb/ydb_version.py @@ -1 +1 @@ -VERSION = "3.0.1b9" +VERSION = "3.0.1b11" |