aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-kikimr-dev <robot-kikimr-dev@yandex-team.com>2023-12-08 13:51:54 +0300
committerrobot-kikimr-dev <robot-kikimr-dev@yandex-team.com>2023-12-08 15:11:31 +0300
commite1fd0f7bb4712ff4be116d4b16c2bac076cd1318 (patch)
tree24ac762600a8c46b29b088eab7e8dbc5e89043c9
parentab9f19c8c2ce7ef36f054686c8a22d8c7e9133b3 (diff)
downloadydb-e1fd0f7bb4712ff4be116d4b16c2bac076cd1318.tar.gz
YDB SDK Sync from git
-rw-r--r--ydb/public/sdk/python3/ydb/aio/pool.py7
-rw-r--r--ydb/public/sdk/python3/ydb/export.py8
-rw-r--r--ydb/public/sdk/python3/ydb/issues.py26
-rw-r--r--ydb/public/sdk/python3/ydb/pool.py7
-rw-r--r--ydb/public/sdk/python3/ydb/resolver.py38
-rw-r--r--ydb/public/sdk/python3/ydb/topic.py24
-rw-r--r--ydb/public/sdk/python3/ydb/types.py68
-rw-r--r--ydb/public/sdk/python3/ydb/ydb_version.py2
8 files changed, 134 insertions, 46 deletions
diff --git a/ydb/public/sdk/python3/ydb/aio/pool.py b/ydb/public/sdk/python3/ydb/aio/pool.py
index 6e95dd6fe1..c637a7ca96 100644
--- a/ydb/public/sdk/python3/ydb/aio/pool.py
+++ b/ydb/public/sdk/python3/ydb/aio/pool.py
@@ -1,11 +1,12 @@
import asyncio
import logging
import random
+import typing
from ydb import issues
from ydb.pool import ConnectionsCache as _ConnectionsCache, IConnectionPool
-from .connection import Connection
+from .connection import Connection, EndpointKey
from . import resolver
@@ -21,7 +22,7 @@ class ConnectionsCache(_ConnectionsCache):
self._fast_fail_error = None
- async def get(self, preferred_endpoint=None, fast_fail=False, wait_timeout=10):
+ async def get(self, preferred_endpoint: typing.Optional[EndpointKey] = None, fast_fail=False, wait_timeout=10):
if fast_fail:
await asyncio.wait_for(self._fast_fail_event.wait(), timeout=wait_timeout)
@@ -34,7 +35,7 @@ class ConnectionsCache(_ConnectionsCache):
return self.connections_by_node_id[preferred_endpoint.node_id]
if preferred_endpoint is not None and preferred_endpoint.endpoint in self.connections:
- return self.connections[preferred_endpoint]
+ return self.connections[preferred_endpoint.endpoint]
for conn_lst in self.conn_lst_order:
try:
diff --git a/ydb/public/sdk/python3/ydb/export.py b/ydb/public/sdk/python3/ydb/export.py
index 1b75a6d53f..827925c7bc 100644
--- a/ydb/public/sdk/python3/ydb/export.py
+++ b/ydb/public/sdk/python3/ydb/export.py
@@ -83,6 +83,7 @@ class ExportToYTSettings(s_impl.BaseRequestSettings):
self.host = None
self.port = None
self.uid = None
+ self.use_type_v3 = False
def with_port(self, port):
self.port = port
@@ -118,6 +119,10 @@ class ExportToYTSettings(s_impl.BaseRequestSettings):
self.items.extend(items)
return self
+ def with_use_type_v3(self, use_type_v3):
+ self.use_type_v3 = use_type_v3
+ return self
+
class ExportToS3Settings(s_impl.BaseRequestSettings):
def __init__(self):
@@ -192,6 +197,9 @@ def _export_to_yt_request_factory(settings):
if settings.port:
request.settings.port = settings.port
+ if settings.use_type_v3:
+ request.settings.use_type_v3 = settings.use_type_v3
+
for source_path, destination_path in settings.items:
request.settings.items.add(source_path=source_path, destination_path=destination_path)
diff --git a/ydb/public/sdk/python3/ydb/issues.py b/ydb/public/sdk/python3/ydb/issues.py
index 66dba50843..065dcbc80c 100644
--- a/ydb/public/sdk/python3/ydb/issues.py
+++ b/ydb/public/sdk/python3/ydb/issues.py
@@ -1,10 +1,19 @@
# -*- coding: utf-8 -*-
+from __future__ import annotations
+
from google.protobuf import text_format
import enum
import queue
+import typing
from . import _apis
+# Workaround for good IDE and universal for runtime
+if typing.TYPE_CHECKING:
+ from _grpc.v4.protos import ydb_issue_message_pb2, ydb_operation_pb2
+else:
+ from ._grpc.common.protos import ydb_issue_message_pb2, ydb_operation_pb2
+
_TRANSPORT_STATUSES_FIRST = 401000
_CLIENT_STATUSES_FIRST = 402000
@@ -44,10 +53,19 @@ class StatusCode(enum.IntEnum):
SESSION_POOL_EMPTY = _CLIENT_STATUSES_FIRST + 40
+# TODO: convert from proto IssueMessage
+class _IssueMessage:
+ def __init__(self, message: str, issue_code: int, severity: int, issues) -> None:
+ self.message = message
+ self.issue_code = issue_code
+ self.severity = severity
+ self.issues = issues
+
+
class Error(Exception):
status = None
- def __init__(self, message, issues=None):
+ def __init__(self, message: str, issues: typing.Optional[typing.Iterable[_IssueMessage]] = None):
super(Error, self).__init__(message)
self.issues = issues
self.message = message
@@ -166,14 +184,14 @@ class UnexpectedGrpcMessage(Error):
super().__init__(message)
-def _format_issues(issues):
+def _format_issues(issues: typing.Iterable[ydb_issue_message_pb2.IssueMessage]) -> str:
if not issues:
return ""
return " ,".join(text_format.MessageToString(issue, as_utf8=False, as_one_line=True) for issue in issues)
-def _format_response(response):
+def _format_response(response: ydb_operation_pb2.Operation) -> str:
fmt_issues = _format_issues(response.issues)
return f"{fmt_issues} (server_code: {response.status})"
@@ -202,7 +220,7 @@ _server_side_error_map = {
}
-def _process_response(response_proto):
+def _process_response(response_proto: ydb_operation_pb2.Operation) -> None:
if response_proto.status not in _success_status_codes:
exc_obj = _server_side_error_map.get(response_proto.status)
raise exc_obj(_format_response(response_proto), response_proto.issues)
diff --git a/ydb/public/sdk/python3/ydb/pool.py b/ydb/public/sdk/python3/ydb/pool.py
index e0bf2f1587..1e75950ea8 100644
--- a/ydb/public/sdk/python3/ydb/pool.py
+++ b/ydb/public/sdk/python3/ydb/pool.py
@@ -5,11 +5,12 @@ import logging
from concurrent import futures
import collections
import random
+import typing
from . import connection as connection_impl, issues, resolver, _utilities, tracing
from abc import abstractmethod
-from .connection import Connection
+from .connection import Connection, EndpointKey
logger = logging.getLogger(__name__)
@@ -123,13 +124,13 @@ class ConnectionsCache(object):
return subscription
@tracing.with_trace()
- def get(self, preferred_endpoint=None) -> Connection:
+ def get(self, preferred_endpoint: typing.Optional[EndpointKey] = None) -> Connection:
with self.lock:
if preferred_endpoint is not None and preferred_endpoint.node_id in self.connections_by_node_id:
return self.connections_by_node_id[preferred_endpoint.node_id]
if preferred_endpoint is not None and preferred_endpoint.endpoint in self.connections:
- return self.connections[preferred_endpoint]
+ return self.connections[preferred_endpoint.endpoint]
for conn_lst in self.conn_lst_order:
try:
diff --git a/ydb/public/sdk/python3/ydb/resolver.py b/ydb/public/sdk/python3/ydb/resolver.py
index d4fb1aff76..b795af928c 100644
--- a/ydb/public/sdk/python3/ydb/resolver.py
+++ b/ydb/public/sdk/python3/ydb/resolver.py
@@ -1,10 +1,21 @@
# -*- coding: utf-8 -*-
+from __future__ import annotations
+
import contextlib
import logging
import threading
import random
import itertools
-from . import connection as conn_impl, issues, settings as settings_impl, _apis
+import typing
+from . import connection as conn_impl, driver, issues, settings as settings_impl, _apis
+
+
+# Workaround for good IDE and universal for runtime
+if typing.TYPE_CHECKING:
+ from ._grpc.v4.protos import ydb_discovery_pb2
+else:
+ from ._grpc.common.protos import ydb_discovery_pb2
+
logger = logging.getLogger(__name__)
@@ -22,7 +33,7 @@ class EndpointInfo(object):
"node_id",
)
- def __init__(self, endpoint_info):
+ def __init__(self, endpoint_info: ydb_discovery_pb2.EndpointInfo):
self.address = endpoint_info.address
self.endpoint = "%s:%s" % (endpoint_info.address, endpoint_info.port)
self.location = endpoint_info.location
@@ -33,7 +44,7 @@ class EndpointInfo(object):
self.ssl_target_name_override = endpoint_info.ssl_target_name_override
self.node_id = endpoint_info.node_id
- def endpoints_with_options(self):
+ def endpoints_with_options(self) -> typing.Generator[typing.Tuple[str, conn_impl.EndpointOptions], None, None]:
ssl_target_name_override = None
if self.ssl:
if self.ssl_target_name_override:
@@ -73,14 +84,14 @@ class EndpointInfo(object):
return self.endpoint == other.endpoint
-def _list_endpoints_request_factory(connection_params):
+def _list_endpoints_request_factory(connection_params: driver.DriverConfig) -> _apis.ydb_discovery.ListEndpointsRequest:
request = _apis.ydb_discovery.ListEndpointsRequest()
request.database = connection_params.database
return request
class DiscoveryResult(object):
- def __init__(self, self_location, endpoints):
+ def __init__(self, self_location: str, endpoints: "list[EndpointInfo]"):
self.self_location = self_location
self.endpoints = endpoints
@@ -94,7 +105,12 @@ class DiscoveryResult(object):
return self.__str__()
@classmethod
- def from_response(cls, rpc_state, response, use_all_nodes=False):
+ def from_response(
+ cls,
+ rpc_state: conn_impl._RpcState,
+ response: ydb_discovery_pb2.ListEndpointsResponse,
+ use_all_nodes: bool = False,
+ ) -> DiscoveryResult:
issues._process_response(response.operation)
message = _apis.ydb_discovery.ListEndpointsResult()
response.operation.result.Unpack(message)
@@ -123,7 +139,7 @@ class DiscoveryResult(object):
class DiscoveryEndpointsResolver(object):
- def __init__(self, driver_config):
+ def __init__(self, driver_config: driver.DriverConfig):
self.logger = logger.getChild(self.__class__.__name__)
self._driver_config = driver_config
self._ready_timeout = getattr(self._driver_config, "discovery_request_timeout", 10)
@@ -136,7 +152,7 @@ class DiscoveryEndpointsResolver(object):
random.shuffle(self._endpoints)
self._endpoints_iter = itertools.cycle(self._endpoints)
- def _add_debug_details(self, message, *args):
+ def _add_debug_details(self, message: str, *args):
self.logger.debug(message, *args)
message = message % args
with self._lock:
@@ -144,19 +160,19 @@ class DiscoveryEndpointsResolver(object):
if len(self._debug_details_items) > self._debug_details_history_size:
self._debug_details_items.pop()
- def debug_details(self):
+ def debug_details(self) -> str:
"""
Returns last resolver errors as a debug string.
"""
with self._lock:
return "\n".join(self._debug_details_items)
- def resolve(self):
+ def resolve(self) -> typing.ContextManager[typing.Optional[DiscoveryResult]]:
with self.context_resolve() as result:
return result
@contextlib.contextmanager
- def context_resolve(self):
+ def context_resolve(self) -> typing.ContextManager[typing.Optional[DiscoveryResult]]:
self.logger.debug("Preparing initial endpoint to resolve endpoints")
endpoint = next(self._endpoints_iter)
initial = conn_impl.Connection.ready_factory(endpoint, self._driver_config, ready_timeout=self._ready_timeout)
diff --git a/ydb/public/sdk/python3/ydb/topic.py b/ydb/public/sdk/python3/ydb/topic.py
index 00ffb1c479..2175af47f7 100644
--- a/ydb/public/sdk/python3/ydb/topic.py
+++ b/ydb/public/sdk/python3/ydb/topic.py
@@ -170,8 +170,11 @@ class TopicClientAsyncIO:
consumer: str,
buffer_size_bytes: int = 50 * 1024 * 1024,
# decoders: map[codec_code] func(encoded_bytes)->decoded_bytes
+ # the func will be called from multiply threads in parallel
decoders: Union[Mapping[int, Callable[[bytes], bytes]], None] = None,
- decoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool
+ # custom decoder executor for call builtin and custom decoders. If None - use shared executor pool.
+ # if max_worker in the executor is 1 - then decoders will be called from the thread without parallel
+ decoder_executor: Optional[concurrent.futures.Executor] = None,
) -> TopicReaderAsyncIO:
if not decoder_executor:
@@ -194,8 +197,12 @@ class TopicClientAsyncIO:
auto_seqno: bool = True,
auto_created_at: bool = True,
codec: Optional[TopicCodec] = None, # default mean auto-select
+ # encoders: map[codec_code] func(encoded_bytes)->decoded_bytes
+ # the func will be called from multiply threads in parallel.
encoders: Optional[Mapping[_ydb_topic_public_types.PublicCodec, Callable[[bytes], bytes]]] = None,
- encoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool
+ # custom encoder executor for call builtin and custom decoders. If None - use shared executor pool.
+ # If max_worker in the executor is 1 - then encoders will be called from the thread without parallel.
+ encoder_executor: Optional[concurrent.futures.Executor] = None,
) -> TopicWriterAsyncIO:
args = locals().copy()
del args["self"]
@@ -319,7 +326,10 @@ class TopicClient:
consumer: str,
buffer_size_bytes: int = 50 * 1024 * 1024,
# decoders: map[codec_code] func(encoded_bytes)->decoded_bytes
+ # the func will be called from multiply threads in parallel
decoders: Union[Mapping[int, Callable[[bytes], bytes]], None] = None,
+ # custom decoder executor for call builtin and custom decoders. If None - use shared executor pool.
+ # if max_worker in the executor is 1 - then decoders will be called from the thread without parallel
decoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool
) -> TopicReader:
if not decoder_executor:
@@ -343,7 +353,11 @@ class TopicClient:
auto_seqno: bool = True,
auto_created_at: bool = True,
codec: Optional[TopicCodec] = None, # default mean auto-select
+ # encoders: map[codec_code] func(encoded_bytes)->decoded_bytes
+ # the func will be called from multiply threads in parallel.
encoders: Optional[Mapping[_ydb_topic_public_types.PublicCodec, Callable[[bytes], bytes]]] = None,
+ # custom encoder executor for call builtin and custom decoders. If None - use shared executor pool.
+ # If max_worker in the executor is 1 - then encoders will be called from the thread without parallel.
encoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool
) -> TopicWriter:
args = locals().copy()
@@ -373,7 +387,11 @@ class TopicClient:
@dataclass
class TopicClientSettings:
- encode_decode_threads_count: int = 4
+ # ATTENTION
+ # When set the encode_decode_threads_count - all custom encoders/decoders for topic reader/writer
+ # MUST be thread-safe
+ # because they will be called from parallel threads
+ encode_decode_threads_count: int = 1
class TopicError(issues.Error):
diff --git a/ydb/public/sdk/python3/ydb/types.py b/ydb/public/sdk/python3/ydb/types.py
index 9771e037f7..cf13aac0a2 100644
--- a/ydb/public/sdk/python3/ydb/types.py
+++ b/ydb/public/sdk/python3/ydb/types.py
@@ -1,76 +1,94 @@
# -*- coding: utf-8 -*-
+from __future__ import annotations
+
import abc
import enum
import json
from . import _utilities, _apis
from datetime import date, datetime, timedelta
+import typing
import uuid
import struct
from google.protobuf import struct_pb2
+from . import table
+
+
+# Workaround for good IDE and universal for runtime
+if typing.TYPE_CHECKING:
+ from ._grpc.v4.protos import ydb_value_pb2
+else:
+ from ._grpc.common.protos import ydb_value_pb2
+
_SECONDS_IN_DAY = 60 * 60 * 24
_EPOCH = datetime(1970, 1, 1)
-def _from_date(x, table_client_settings):
+def _from_date(x: ydb_value_pb2.Value, table_client_settings: table.TableClientSettings) -> typing.Union[date, int]:
if table_client_settings is not None and table_client_settings._native_date_in_result_sets:
return _EPOCH.date() + timedelta(days=x.uint32_value)
return x.uint32_value
-def _to_date(pb, value):
+def _to_date(pb: ydb_value_pb2.Value, value: typing.Union[date, int]) -> None:
if isinstance(value, date):
pb.uint32_value = (value - _EPOCH.date()).days
else:
pb.uint32_value = value
-def _from_datetime_number(x, table_client_settings):
+def _from_datetime_number(
+ x: typing.Union[float, datetime], table_client_settings: table.TableClientSettings
+) -> datetime:
if table_client_settings is not None and table_client_settings._native_datetime_in_result_sets:
return datetime.utcfromtimestamp(x)
return x
-def _from_json(x, table_client_settings):
+def _from_json(x: typing.Union[str, bytearray, bytes], table_client_settings: table.TableClientSettings):
if table_client_settings is not None and table_client_settings._native_json_in_result_sets:
return json.loads(x)
return x
-def _to_uuid(value_pb, table_client_settings):
+def _to_uuid(value_pb: ydb_value_pb2.Value, table_client_settings: table.TableClientSettings) -> uuid.UUID:
return uuid.UUID(bytes_le=struct.pack("QQ", value_pb.low_128, value_pb.high_128))
-def _from_uuid(pb, value):
+def _from_uuid(pb: ydb_value_pb2.Value, value: uuid.UUID):
pb.low_128 = struct.unpack("Q", value.bytes_le[0:8])[0]
pb.high_128 = struct.unpack("Q", value.bytes_le[8:16])[0]
-def _from_interval(value_pb, table_client_settings):
+def _from_interval(
+ value_pb: ydb_value_pb2.Value, table_client_settings: table.TableClientSettings
+) -> typing.Union[timedelta, int]:
if table_client_settings is not None and table_client_settings._native_interval_in_result_sets:
return timedelta(microseconds=value_pb.int64_value)
return value_pb.int64_value
-def _timedelta_to_microseconds(value):
+def _timedelta_to_microseconds(value: timedelta) -> int:
return (value.days * _SECONDS_IN_DAY + value.seconds) * 1000000 + value.microseconds
-def _to_interval(pb, value):
+def _to_interval(pb: ydb_value_pb2.Value, value: typing.Union[timedelta, int]):
if isinstance(value, timedelta):
pb.int64_value = _timedelta_to_microseconds(value)
else:
pb.int64_value = value
-def _from_timestamp(value_pb, table_client_settings):
+def _from_timestamp(
+ value_pb: ydb_value_pb2.Value, table_client_settings: table.TableClientSettings
+) -> typing.Union[datetime, int]:
if table_client_settings is not None and table_client_settings._native_timestamp_in_result_sets:
return _EPOCH + timedelta(microseconds=value_pb.uint64_value)
return value_pb.uint64_value
-def _to_timestamp(pb, value):
+def _to_timestamp(pb: ydb_value_pb2.Value, value: typing.Union[datetime, int]):
if isinstance(value, datetime):
pb.uint64_value = _timedelta_to_microseconds(value - _EPOCH)
else:
@@ -129,13 +147,15 @@ class PrimitiveType(enum.Enum):
DyNumber = _apis.primitive_types.DYNUMBER, "text_value"
- def __init__(self, idn, proto_field, to_obj=None, from_obj=None):
+ def __init__(
+ self, idn: ydb_value_pb2.Type.PrimitiveTypeId, proto_field: typing.Optional[str], to_obj=None, from_obj=None
+ ):
self._idn_ = idn
self._to_obj = to_obj
self._from_obj = from_obj
self._proto_field = proto_field
- def get_value(self, value_pb, table_client_settings):
+ def get_value(self, value_pb: ydb_value_pb2.Value, table_client_settings: table.TableClientSettings):
"""
Extracts value from protocol buffer
:param value_pb: A protocol buffer
@@ -149,7 +169,7 @@ class PrimitiveType(enum.Enum):
return getattr(value_pb, self._proto_field)
- def set_value(self, pb, value):
+ def set_value(self, pb: ydb_value_pb2.Value, value):
"""
Sets value in a protocol buffer
:param pb: A protocol buffer
@@ -176,7 +196,9 @@ class PrimitiveType(enum.Enum):
class DataQuery(object):
__slots__ = ("yql_text", "parameters_types", "name")
- def __init__(self, query_id, parameters_types, name=None):
+ def __init__(
+ self, query_id: str, parameters_types: "dict[str, ydb_value_pb2.Type]", name: typing.Optional[str] = None
+ ):
self.yql_text = query_id
self.parameters_types = parameters_types
self.name = _utilities.get_query_hash(self.yql_text) if name is None else name
@@ -259,7 +281,7 @@ class NullType(AbstractTypeBuilder):
class OptionalType(AbstractTypeBuilder):
__slots__ = ("_repr", "_proto", "_item")
- def __init__(self, optional_type):
+ def __init__(self, optional_type: typing.Union[AbstractTypeBuilder, PrimitiveType]):
"""
Represents optional type that wraps inner type
:param optional_type: An instance of an inner type
@@ -291,7 +313,7 @@ class OptionalType(AbstractTypeBuilder):
class ListType(AbstractTypeBuilder):
__slots__ = ("_repr", "_proto")
- def __init__(self, list_type):
+ def __init__(self, list_type: typing.Union[AbstractTypeBuilder, PrimitiveType]):
"""
:param list_type: List item type builder
"""
@@ -313,7 +335,11 @@ class ListType(AbstractTypeBuilder):
class DictType(AbstractTypeBuilder):
__slots__ = ("__repr", "__proto")
- def __init__(self, key_type, payload_type):
+ def __init__(
+ self,
+ key_type: typing.Union[AbstractTypeBuilder, PrimitiveType],
+ payload_type: typing.Union[AbstractTypeBuilder, PrimitiveType],
+ ):
"""
:param key_type: Key type builder
:param payload_type: Payload type builder
@@ -341,7 +367,7 @@ class TupleType(AbstractTypeBuilder):
self.__elements_repr = []
self.__proto = _apis.ydb_value.Type(tuple_type=_apis.ydb_value.TupleType())
- def add_element(self, element_type):
+ def add_element(self, element_type: typing.Union[AbstractTypeBuilder, PrimitiveType]):
"""
:param element_type: Adds additional element of tuple
:return: self
@@ -366,7 +392,7 @@ class StructType(AbstractTypeBuilder):
self.__members_repr = []
self.__proto = _apis.ydb_value.Type(struct_type=_apis.ydb_value.StructType())
- def add_member(self, name, member_type):
+ def add_member(self, name: str, member_type: typing.Union[AbstractTypeBuilder, PrimitiveType]):
"""
:param name:
:param member_type:
@@ -393,7 +419,7 @@ class BulkUpsertColumns(AbstractTypeBuilder):
self.__columns_repr = []
self.__proto = _apis.ydb_value.Type(struct_type=_apis.ydb_value.StructType())
- def add_column(self, name, column_type):
+ def add_column(self, name: str, column_type: typing.Union[AbstractTypeBuilder, PrimitiveType]):
"""
:param name: A column name
:param column_type: A column type
diff --git a/ydb/public/sdk/python3/ydb/ydb_version.py b/ydb/public/sdk/python3/ydb/ydb_version.py
index 46c424d30a..709cb0a9e5 100644
--- a/ydb/public/sdk/python3/ydb/ydb_version.py
+++ b/ydb/public/sdk/python3/ydb/ydb_version.py
@@ -1 +1 @@
-VERSION = "3.5.0"
+VERSION = "3.7.0"