summaryrefslogtreecommitdiffstats
path: root/contrib/python
diff options
context:
space:
mode:
authorrobot-piglet <[email protected]>2025-01-23 12:01:52 +0300
committerrobot-piglet <[email protected]>2025-01-23 12:42:32 +0300
commit46897b2fced76e53beb9697beef059641df6d1a7 (patch)
tree3cb9b20af890c2ccc9b7f1da888aac9c80e6dcb4 /contrib/python
parentbbb074a7dde8ea7635f0b7e99813d1f82c356738 (diff)
Intermediate changes
commit_hash:8ae2695b8328c208689a529daeadc39dade0aacf
Diffstat (limited to 'contrib/python')
-rw-r--r--contrib/python/ydb/py3/.dist-info/METADATA2
-rw-r--r--contrib/python/ydb/py3/ya.make2
-rw-r--r--contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py8
-rw-r--r--contrib/python/ydb/py3/ydb/_topic_reader/datatypes.py1
-rw-r--r--contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py1
-rw-r--r--contrib/python/ydb/py3/ydb/_topic_writer/topic_writer.py33
-rw-r--r--contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py4
-rw-r--r--contrib/python/ydb/py3/ydb/ydb_version.py2
8 files changed, 37 insertions, 16 deletions
diff --git a/contrib/python/ydb/py3/.dist-info/METADATA b/contrib/python/ydb/py3/.dist-info/METADATA
index 391e0b3f96f..181a3da9c53 100644
--- a/contrib/python/ydb/py3/.dist-info/METADATA
+++ b/contrib/python/ydb/py3/.dist-info/METADATA
@@ -1,6 +1,6 @@
Metadata-Version: 2.1
Name: ydb
-Version: 3.18.14
+Version: 3.18.15
Summary: YDB Python SDK
Home-page: http://github.com/ydb-platform/ydb-python-sdk
Author: Yandex LLC
diff --git a/contrib/python/ydb/py3/ya.make b/contrib/python/ydb/py3/ya.make
index e0f0e9ce8a7..1da9bb152cd 100644
--- a/contrib/python/ydb/py3/ya.make
+++ b/contrib/python/ydb/py3/ya.make
@@ -2,7 +2,7 @@
PY3_LIBRARY()
-VERSION(3.18.14)
+VERSION(3.18.15)
LICENSE(Apache-2.0)
diff --git a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py
index 634ffb536b6..972003989c6 100644
--- a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py
+++ b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py
@@ -207,6 +207,7 @@ class StreamWriteMessage:
data: bytes
uncompressed_size: int
partitioning: "StreamWriteMessage.PartitioningType"
+ metadata_items: Dict[str, bytes]
def to_proto(
self,
@@ -217,6 +218,10 @@ class StreamWriteMessage:
proto.data = self.data
proto.uncompressed_size = self.uncompressed_size
+ for key, value in self.metadata_items.items():
+ item = ydb_topic_pb2.MetadataItem(key=key, value=value)
+ proto.metadata_items.append(item)
+
if self.partitioning is None:
pass
elif isinstance(self.partitioning, StreamWriteMessage.PartitioningPartitionID):
@@ -488,16 +493,19 @@ class StreamReadMessage:
data: bytes
uncompresed_size: int
message_group_id: str
+ metadata_items: Dict[str, bytes]
@staticmethod
def from_proto(
msg: ydb_topic_pb2.StreamReadMessage.ReadResponse.MessageData,
) -> "StreamReadMessage.ReadResponse.MessageData":
+ metadata_items = {meta.key: meta.value for meta in msg.metadata_items}
return StreamReadMessage.ReadResponse.MessageData(
offset=msg.offset,
seq_no=msg.seq_no,
created_at=msg.created_at.ToDatetime(),
data=msg.data,
+ metadata_items=metadata_items,
uncompresed_size=msg.uncompressed_size,
message_group_id=msg.message_group_id,
)
diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/datatypes.py b/contrib/python/ydb/py3/ydb/_topic_reader/datatypes.py
index 01501638390..a9c811ac4fb 100644
--- a/contrib/python/ydb/py3/ydb/_topic_reader/datatypes.py
+++ b/contrib/python/ydb/py3/ydb/_topic_reader/datatypes.py
@@ -40,6 +40,7 @@ class PublicMessage(ICommittable, ISessionAlive):
written_at: datetime.datetime
producer_id: str
data: Union[bytes, Any] # set as original decompressed bytes or deserialized object if deserializer set in reader
+ metadata_items: Dict[str, bytes]
_partition_session: PartitionSession
_commit_start_offset: int
_commit_end_offset: int
diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py
index 351efb9a91c..e407fe01dae 100644
--- a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py
+++ b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py
@@ -627,6 +627,7 @@ class ReaderStream:
written_at=server_batch.written_at,
producer_id=server_batch.producer_id,
data=message_data.data,
+ metadata_items=message_data.metadata_items,
_partition_session=partition_session,
_commit_start_offset=partition_session._next_message_start_commit_offset,
_commit_end_offset=message_data.offset + 1,
diff --git a/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer.py b/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer.py
index 527bf03eacb..aa5fe9749a7 100644
--- a/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer.py
+++ b/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer.py
@@ -15,7 +15,7 @@ from .._grpc.grpcwrapper.common_utils import IToProto
from .._grpc.grpcwrapper.ydb_topic_public_types import PublicCodec
from .. import connection
-Message = typing.Union["PublicMessage", "PublicMessage.SimpleMessageSourceType"]
+Message = typing.Union["PublicMessage", "PublicMessage.SimpleSourceType"]
@dataclass
@@ -91,20 +91,23 @@ class PublicWriterInitInfo:
class PublicMessage:
seqno: Optional[int]
created_at: Optional[datetime.datetime]
- data: "PublicMessage.SimpleMessageSourceType"
+ data: "PublicMessage.SimpleSourceType"
+ metadata_items: Optional[Dict[str, "PublicMessage.SimpleSourceType"]]
- SimpleMessageSourceType = Union[str, bytes] # Will be extend
+ SimpleSourceType = Union[str, bytes] # Will be extend
def __init__(
self,
- data: SimpleMessageSourceType,
+ data: SimpleSourceType,
*,
+ metadata_items: Optional[Dict[str, "PublicMessage.SimpleSourceType"]] = None,
seqno: Optional[int] = None,
created_at: Optional[datetime.datetime] = None,
):
self.seqno = seqno
self.created_at = created_at
self.data = data
+ self.metadata_items = metadata_items
@staticmethod
def _create_message(data: Message) -> "PublicMessage":
@@ -117,30 +120,37 @@ class InternalMessage(StreamWriteMessage.WriteRequest.MessageData, IToProto):
codec: PublicCodec
def __init__(self, mess: PublicMessage):
+ metadata_items = mess.metadata_items or {}
super().__init__(
seq_no=mess.seqno,
created_at=mess.created_at,
data=mess.data,
+ metadata_items=metadata_items,
uncompressed_size=len(mess.data),
partitioning=None,
)
self.codec = PublicCodec.RAW
- def get_bytes(self) -> bytes:
- if self.data is None:
+ def _get_bytes(self, obj: Optional[PublicMessage.SimpleSourceType]) -> bytes:
+ if obj is None:
return bytes()
- if isinstance(self.data, bytes):
- return self.data
- if isinstance(self.data, str):
- return self.data.encode("utf-8")
+ if isinstance(obj, bytes):
+ return obj
+ if isinstance(obj, str):
+ return obj.encode("utf-8")
raise ValueError("Bad data type")
+ def get_data_bytes(self) -> bytes:
+ return self._get_bytes(self.data)
+
def to_message_data(self) -> StreamWriteMessage.WriteRequest.MessageData:
- data = self.get_bytes()
+ data = self.get_data_bytes()
+ metadata_items = {key: self._get_bytes(value) for key, value in self.metadata_items.items()}
return StreamWriteMessage.WriteRequest.MessageData(
seq_no=self.seq_no,
created_at=self.created_at,
data=data,
+ metadata_items=metadata_items,
uncompressed_size=len(data),
partitioning=None, # unsupported by server now
)
@@ -221,6 +231,7 @@ _message_data_overhead = (
seq_no=_max_int,
created_at=datetime.datetime(3000, 1, 1, 1, 1, 1, 1),
data=bytes(1),
+ metadata_items={},
uncompressed_size=_max_int,
partitioning=StreamWriteMessage.PartitioningMessageGroupID(
message_group_id="a" * 100,
diff --git a/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py b/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py
index 869808f7ca2..32d8fefe51c 100644
--- a/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py
+++ b/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py
@@ -427,7 +427,7 @@ class WriterAsyncIOReconnector:
for message in messages:
encoded_data_futures = eventloop.run_in_executor(
- self._encode_executor, encoder_function, message.get_bytes()
+ self._encode_executor, encoder_function, message.get_data_bytes()
)
encode_waiters.append(encoded_data_futures)
@@ -493,7 +493,7 @@ class WriterAsyncIOReconnector:
f = self._codec_functions[codec]
for m in test_messages:
- encoded = f(m.get_bytes())
+ encoded = f(m.get_data_bytes())
s += len(encoded)
return s
diff --git a/contrib/python/ydb/py3/ydb/ydb_version.py b/contrib/python/ydb/py3/ydb/ydb_version.py
index b40f83ba95b..bdc80c2111e 100644
--- a/contrib/python/ydb/py3/ydb/ydb_version.py
+++ b/contrib/python/ydb/py3/ydb/ydb_version.py
@@ -1 +1 @@
-VERSION = "3.18.14"
+VERSION = "3.18.15"