diff options
| author | robot-piglet <[email protected]> | 2025-01-23 12:01:52 +0300 |
|---|---|---|
| committer | robot-piglet <[email protected]> | 2025-01-23 12:42:32 +0300 |
| commit | 46897b2fced76e53beb9697beef059641df6d1a7 (patch) | |
| tree | 3cb9b20af890c2ccc9b7f1da888aac9c80e6dcb4 /contrib/python | |
| parent | bbb074a7dde8ea7635f0b7e99813d1f82c356738 (diff) | |
Intermediate changes
commit_hash:8ae2695b8328c208689a529daeadc39dade0aacf
Diffstat (limited to 'contrib/python')
8 files changed, 37 insertions, 16 deletions
diff --git a/contrib/python/ydb/py3/.dist-info/METADATA b/contrib/python/ydb/py3/.dist-info/METADATA index 391e0b3f96f..181a3da9c53 100644 --- a/contrib/python/ydb/py3/.dist-info/METADATA +++ b/contrib/python/ydb/py3/.dist-info/METADATA @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: ydb -Version: 3.18.14 +Version: 3.18.15 Summary: YDB Python SDK Home-page: http://github.com/ydb-platform/ydb-python-sdk Author: Yandex LLC diff --git a/contrib/python/ydb/py3/ya.make b/contrib/python/ydb/py3/ya.make index e0f0e9ce8a7..1da9bb152cd 100644 --- a/contrib/python/ydb/py3/ya.make +++ b/contrib/python/ydb/py3/ya.make @@ -2,7 +2,7 @@ PY3_LIBRARY() -VERSION(3.18.14) +VERSION(3.18.15) LICENSE(Apache-2.0) diff --git a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py index 634ffb536b6..972003989c6 100644 --- a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py +++ b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py @@ -207,6 +207,7 @@ class StreamWriteMessage: data: bytes uncompressed_size: int partitioning: "StreamWriteMessage.PartitioningType" + metadata_items: Dict[str, bytes] def to_proto( self, @@ -217,6 +218,10 @@ class StreamWriteMessage: proto.data = self.data proto.uncompressed_size = self.uncompressed_size + for key, value in self.metadata_items.items(): + item = ydb_topic_pb2.MetadataItem(key=key, value=value) + proto.metadata_items.append(item) + if self.partitioning is None: pass elif isinstance(self.partitioning, StreamWriteMessage.PartitioningPartitionID): @@ -488,16 +493,19 @@ class StreamReadMessage: data: bytes uncompresed_size: int message_group_id: str + metadata_items: Dict[str, bytes] @staticmethod def from_proto( msg: ydb_topic_pb2.StreamReadMessage.ReadResponse.MessageData, ) -> "StreamReadMessage.ReadResponse.MessageData": + metadata_items = {meta.key: meta.value for meta in msg.metadata_items} return StreamReadMessage.ReadResponse.MessageData( offset=msg.offset, seq_no=msg.seq_no, created_at=msg.created_at.ToDatetime(), data=msg.data, + metadata_items=metadata_items, uncompresed_size=msg.uncompressed_size, message_group_id=msg.message_group_id, ) diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/datatypes.py b/contrib/python/ydb/py3/ydb/_topic_reader/datatypes.py index 01501638390..a9c811ac4fb 100644 --- a/contrib/python/ydb/py3/ydb/_topic_reader/datatypes.py +++ b/contrib/python/ydb/py3/ydb/_topic_reader/datatypes.py @@ -40,6 +40,7 @@ class PublicMessage(ICommittable, ISessionAlive): written_at: datetime.datetime producer_id: str data: Union[bytes, Any] # set as original decompressed bytes or deserialized object if deserializer set in reader + metadata_items: Dict[str, bytes] _partition_session: PartitionSession _commit_start_offset: int _commit_end_offset: int diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py index 351efb9a91c..e407fe01dae 100644 --- a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py +++ b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py @@ -627,6 +627,7 @@ class ReaderStream: written_at=server_batch.written_at, producer_id=server_batch.producer_id, data=message_data.data, + metadata_items=message_data.metadata_items, _partition_session=partition_session, _commit_start_offset=partition_session._next_message_start_commit_offset, _commit_end_offset=message_data.offset + 1, diff --git a/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer.py b/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer.py index 527bf03eacb..aa5fe9749a7 100644 --- a/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer.py +++ b/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer.py @@ -15,7 +15,7 @@ from .._grpc.grpcwrapper.common_utils import IToProto from .._grpc.grpcwrapper.ydb_topic_public_types import PublicCodec from .. import connection -Message = typing.Union["PublicMessage", "PublicMessage.SimpleMessageSourceType"] +Message = typing.Union["PublicMessage", "PublicMessage.SimpleSourceType"] @dataclass @@ -91,20 +91,23 @@ class PublicWriterInitInfo: class PublicMessage: seqno: Optional[int] created_at: Optional[datetime.datetime] - data: "PublicMessage.SimpleMessageSourceType" + data: "PublicMessage.SimpleSourceType" + metadata_items: Optional[Dict[str, "PublicMessage.SimpleSourceType"]] - SimpleMessageSourceType = Union[str, bytes] # Will be extend + SimpleSourceType = Union[str, bytes] # Will be extend def __init__( self, - data: SimpleMessageSourceType, + data: SimpleSourceType, *, + metadata_items: Optional[Dict[str, "PublicMessage.SimpleSourceType"]] = None, seqno: Optional[int] = None, created_at: Optional[datetime.datetime] = None, ): self.seqno = seqno self.created_at = created_at self.data = data + self.metadata_items = metadata_items @staticmethod def _create_message(data: Message) -> "PublicMessage": @@ -117,30 +120,37 @@ class InternalMessage(StreamWriteMessage.WriteRequest.MessageData, IToProto): codec: PublicCodec def __init__(self, mess: PublicMessage): + metadata_items = mess.metadata_items or {} super().__init__( seq_no=mess.seqno, created_at=mess.created_at, data=mess.data, + metadata_items=metadata_items, uncompressed_size=len(mess.data), partitioning=None, ) self.codec = PublicCodec.RAW - def get_bytes(self) -> bytes: - if self.data is None: + def _get_bytes(self, obj: Optional[PublicMessage.SimpleSourceType]) -> bytes: + if obj is None: return bytes() - if isinstance(self.data, bytes): - return self.data - if isinstance(self.data, str): - return self.data.encode("utf-8") + if isinstance(obj, bytes): + return obj + if isinstance(obj, str): + return obj.encode("utf-8") raise ValueError("Bad data type") + def get_data_bytes(self) -> bytes: + return self._get_bytes(self.data) + def to_message_data(self) -> StreamWriteMessage.WriteRequest.MessageData: - data = self.get_bytes() + data = self.get_data_bytes() + metadata_items = {key: self._get_bytes(value) for key, value in self.metadata_items.items()} return StreamWriteMessage.WriteRequest.MessageData( seq_no=self.seq_no, created_at=self.created_at, data=data, + metadata_items=metadata_items, uncompressed_size=len(data), partitioning=None, # unsupported by server now ) @@ -221,6 +231,7 @@ _message_data_overhead = ( seq_no=_max_int, created_at=datetime.datetime(3000, 1, 1, 1, 1, 1, 1), data=bytes(1), + metadata_items={}, uncompressed_size=_max_int, partitioning=StreamWriteMessage.PartitioningMessageGroupID( message_group_id="a" * 100, diff --git a/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py b/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py index 869808f7ca2..32d8fefe51c 100644 --- a/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py +++ b/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py @@ -427,7 +427,7 @@ class WriterAsyncIOReconnector: for message in messages: encoded_data_futures = eventloop.run_in_executor( - self._encode_executor, encoder_function, message.get_bytes() + self._encode_executor, encoder_function, message.get_data_bytes() ) encode_waiters.append(encoded_data_futures) @@ -493,7 +493,7 @@ class WriterAsyncIOReconnector: f = self._codec_functions[codec] for m in test_messages: - encoded = f(m.get_bytes()) + encoded = f(m.get_data_bytes()) s += len(encoded) return s diff --git a/contrib/python/ydb/py3/ydb/ydb_version.py b/contrib/python/ydb/py3/ydb/ydb_version.py index b40f83ba95b..bdc80c2111e 100644 --- a/contrib/python/ydb/py3/ydb/ydb_version.py +++ b/contrib/python/ydb/py3/ydb/ydb_version.py @@ -1 +1 @@ -VERSION = "3.18.14" +VERSION = "3.18.15" |
