summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-kikimr-dev <[email protected]>2023-06-13 17:25:34 +0300
committerrobot-kikimr-dev <[email protected]>2023-06-13 17:25:34 +0300
commit83f5539e37a5f6a300eea1a929dcc8ece7cfffbb (patch)
tree9d084e1130e090209ba821b2c99a12e04156a83c
parent8d531a009b82140e46c87d0501b7ef6352defc43 (diff)
YDB SDK Sync from git
-rw-r--r--ydb/public/sdk/python3/ydb/_topic_reader/datatypes.py6
-rw-r--r--ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py32
-rw-r--r--ydb/public/sdk/python3/ydb/_topic_writer/topic_writer_asyncio.py3
-rw-r--r--ydb/public/sdk/python3/ydb/topic.py2
-rw-r--r--ydb/public/sdk/python3/ydb/ydb_version.py2
5 files changed, 35 insertions, 10 deletions
diff --git a/ydb/public/sdk/python3/ydb/_topic_reader/datatypes.py b/ydb/public/sdk/python3/ydb/_topic_reader/datatypes.py
index 4c209f27d43..1b767e7c999 100644
--- a/ydb/public/sdk/python3/ydb/_topic_reader/datatypes.py
+++ b/ydb/public/sdk/python3/ydb/_topic_reader/datatypes.py
@@ -82,6 +82,8 @@ class PartitionSession:
self._loop = None
def add_waiter(self, end_offset: int) -> "PartitionSession.CommitAckWaiter":
+ self._ensure_not_closed()
+
waiter = PartitionSession.CommitAckWaiter(end_offset, self._create_future())
if end_offset <= self.committed_offset:
waiter._finish_ok()
@@ -121,7 +123,7 @@ class PartitionSession:
return
self.state = PartitionSession.State.Stopped
- exception = topic_reader_asyncio.TopicReaderCommitToExpiredPartition()
+ exception = topic_reader_asyncio.PublicTopicReaderPartitionExpiredError()
for waiter in self._ack_waiters:
waiter._finish_error(exception)
@@ -131,7 +133,7 @@ class PartitionSession:
def _ensure_not_closed(self):
if self.state == PartitionSession.State.Stopped:
- raise topic_reader_asyncio.TopicReaderCommitToExpiredPartition()
+ raise topic_reader_asyncio.PublicTopicReaderPartitionExpiredError()
class State(enum.Enum):
Active = 1
diff --git a/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py
index ebe7bd6b46c..facd08530ae 100644
--- a/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py
+++ b/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py
@@ -39,7 +39,7 @@ class TopicReaderUnexpectedCodec(YdbError):
pass
-class TopicReaderCommitToExpiredPartition(TopicReaderError):
+class PublicTopicReaderPartitionExpiredError(TopicReaderError):
"""
Commit message when partition read session are dropped.
It is ok - the message/batch will not commit to server and will receive in other read session
@@ -114,15 +114,22 @@ class PublicAsyncIOReader:
Write commit message to a buffer.
For the method no way check the commit result
- (for example if lost connection - commits will not re-send and committed messages will receive again)
+ (for example if lost connection - commits will not re-send and committed messages will receive again).
"""
- self._reconnector.commit(batch)
+ try:
+ self._reconnector.commit(batch)
+ except PublicTopicReaderPartitionExpiredError:
+ pass
async def commit_with_ack(self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch]):
"""
write commit message to a buffer and wait ack from the server.
use asyncio.wait_for for wait with timeout.
+
+ may raise ydb.TopicReaderPartitionExpiredError, the error mean reader partition closed from server
+ before receive commit ack. Message may be acked or not (if not - it will send in other read session,
+ to this or other reader).
"""
waiter = self._reconnector.commit(batch)
await waiter.future
@@ -174,6 +181,14 @@ class ReaderReconnector:
await asyncio.sleep(retry_info.sleep_timeout_seconds)
attempt += 1
+ finally:
+ if self._stream_reader is not None:
+ # noinspection PyBroadException
+ try:
+ await self._stream_reader.close()
+ except BaseException:
+ # supress any error on close stream reader
+ pass
async def wait_message(self):
while True:
@@ -348,6 +363,9 @@ class ReaderStream:
return batch
def receive_message_nowait(self):
+ if self._get_first_error():
+ raise self._get_first_error()
+
try:
batch = self._message_batches[0]
message = batch.pop_message()
@@ -355,7 +373,7 @@ class ReaderStream:
return None
if batch.empty():
- self._message_batches.popleft()
+ self.receive_batch_nowait()
return message
@@ -366,10 +384,10 @@ class ReaderStream:
raise TopicReaderError("reader can commit only self-produced messages")
if partition_session.reader_stream_id != self._id:
- raise TopicReaderCommitToExpiredPartition("commit messages after reconnect to server")
+ raise PublicTopicReaderPartitionExpiredError("commit messages after reconnect to server")
if partition_session.id not in self._partition_sessions:
- raise TopicReaderCommitToExpiredPartition("commit messages after server stop the partition read session")
+ raise PublicTopicReaderPartitionExpiredError("commit messages after server stop the partition read session")
commit_range = batch._commit_get_offsets_range()
waiter = partition_session.add_waiter(commit_range.end)
@@ -617,6 +635,7 @@ class ReaderStream:
async def close(self):
if self._closed:
return
+
self._closed = True
self._set_first_error(TopicReaderStreamClosedError())
@@ -625,6 +644,7 @@ class ReaderStream:
for session in self._partition_sessions.values():
session.close()
+ self._partition_sessions.clear()
for task in self._background_tasks:
task.cancel()
diff --git a/ydb/public/sdk/python3/ydb/_topic_writer/topic_writer_asyncio.py b/ydb/public/sdk/python3/ydb/_topic_writer/topic_writer_asyncio.py
index d83187fc1b7..dd969c7e8e0 100644
--- a/ydb/public/sdk/python3/ydb/_topic_writer/topic_writer_asyncio.py
+++ b/ydb/public/sdk/python3/ydb/_topic_writer/topic_writer_asyncio.py
@@ -1,6 +1,7 @@
import asyncio
import concurrent.futures
import datetime
+import functools
import gzip
import typing
from collections import deque
@@ -75,7 +76,7 @@ class WriterAsyncIO:
if self._closed or self._loop.is_closed():
return
- self._loop.call_soon(self.close, False)
+ self._loop.call_soon(functools.partial(self.close, flush=False))
async def close(self, *, flush: bool = True):
if self._closed:
diff --git a/ydb/public/sdk/python3/ydb/topic.py b/ydb/public/sdk/python3/ydb/topic.py
index 190f532908a..d598fb7d2e1 100644
--- a/ydb/public/sdk/python3/ydb/topic.py
+++ b/ydb/public/sdk/python3/ydb/topic.py
@@ -13,6 +13,7 @@ __all__ = [
"TopicReaderAsyncIO",
"TopicReaderSelector",
"TopicReaderSettings",
+ "TopicReaderPartitionExpiredError",
"TopicStatWindow",
"TopicWriteResult",
"TopicWriter",
@@ -40,6 +41,7 @@ from ._topic_reader.topic_reader_sync import TopicReaderSync as TopicReader
from ._topic_reader.topic_reader_asyncio import (
PublicAsyncIOReader as TopicReaderAsyncIO,
+ PublicTopicReaderPartitionExpiredError as TopicReaderPartitionExpiredError,
)
from ._topic_writer.topic_writer import ( # noqa: F401
diff --git a/ydb/public/sdk/python3/ydb/ydb_version.py b/ydb/public/sdk/python3/ydb/ydb_version.py
index 63ab73ab853..9a114d2031a 100644
--- a/ydb/public/sdk/python3/ydb/ydb_version.py
+++ b/ydb/public/sdk/python3/ydb/ydb_version.py
@@ -1 +1 @@
-VERSION = "3.3.4"
+VERSION = "3.3.5"