diff options
author | robot-kikimr-dev <robot-kikimr-dev@yandex-team.com> | 2023-12-08 13:51:54 +0300 |
---|---|---|
committer | robot-kikimr-dev <robot-kikimr-dev@yandex-team.com> | 2023-12-08 15:11:31 +0300 |
commit | e1fd0f7bb4712ff4be116d4b16c2bac076cd1318 (patch) | |
tree | 24ac762600a8c46b29b088eab7e8dbc5e89043c9 | |
parent | ab9f19c8c2ce7ef36f054686c8a22d8c7e9133b3 (diff) | |
download | ydb-e1fd0f7bb4712ff4be116d4b16c2bac076cd1318.tar.gz |
YDB SDK Sync from git
-rw-r--r-- | ydb/public/sdk/python3/ydb/aio/pool.py | 7 | ||||
-rw-r--r-- | ydb/public/sdk/python3/ydb/export.py | 8 | ||||
-rw-r--r-- | ydb/public/sdk/python3/ydb/issues.py | 26 | ||||
-rw-r--r-- | ydb/public/sdk/python3/ydb/pool.py | 7 | ||||
-rw-r--r-- | ydb/public/sdk/python3/ydb/resolver.py | 38 | ||||
-rw-r--r-- | ydb/public/sdk/python3/ydb/topic.py | 24 | ||||
-rw-r--r-- | ydb/public/sdk/python3/ydb/types.py | 68 | ||||
-rw-r--r-- | ydb/public/sdk/python3/ydb/ydb_version.py | 2 |
8 files changed, 134 insertions, 46 deletions
diff --git a/ydb/public/sdk/python3/ydb/aio/pool.py b/ydb/public/sdk/python3/ydb/aio/pool.py index 6e95dd6fe1..c637a7ca96 100644 --- a/ydb/public/sdk/python3/ydb/aio/pool.py +++ b/ydb/public/sdk/python3/ydb/aio/pool.py @@ -1,11 +1,12 @@ import asyncio import logging import random +import typing from ydb import issues from ydb.pool import ConnectionsCache as _ConnectionsCache, IConnectionPool -from .connection import Connection +from .connection import Connection, EndpointKey from . import resolver @@ -21,7 +22,7 @@ class ConnectionsCache(_ConnectionsCache): self._fast_fail_error = None - async def get(self, preferred_endpoint=None, fast_fail=False, wait_timeout=10): + async def get(self, preferred_endpoint: typing.Optional[EndpointKey] = None, fast_fail=False, wait_timeout=10): if fast_fail: await asyncio.wait_for(self._fast_fail_event.wait(), timeout=wait_timeout) @@ -34,7 +35,7 @@ class ConnectionsCache(_ConnectionsCache): return self.connections_by_node_id[preferred_endpoint.node_id] if preferred_endpoint is not None and preferred_endpoint.endpoint in self.connections: - return self.connections[preferred_endpoint] + return self.connections[preferred_endpoint.endpoint] for conn_lst in self.conn_lst_order: try: diff --git a/ydb/public/sdk/python3/ydb/export.py b/ydb/public/sdk/python3/ydb/export.py index 1b75a6d53f..827925c7bc 100644 --- a/ydb/public/sdk/python3/ydb/export.py +++ b/ydb/public/sdk/python3/ydb/export.py @@ -83,6 +83,7 @@ class ExportToYTSettings(s_impl.BaseRequestSettings): self.host = None self.port = None self.uid = None + self.use_type_v3 = False def with_port(self, port): self.port = port @@ -118,6 +119,10 @@ class ExportToYTSettings(s_impl.BaseRequestSettings): self.items.extend(items) return self + def with_use_type_v3(self, use_type_v3): + self.use_type_v3 = use_type_v3 + return self + class ExportToS3Settings(s_impl.BaseRequestSettings): def __init__(self): @@ -192,6 +197,9 @@ def _export_to_yt_request_factory(settings): if settings.port: request.settings.port = settings.port + if settings.use_type_v3: + request.settings.use_type_v3 = settings.use_type_v3 + for source_path, destination_path in settings.items: request.settings.items.add(source_path=source_path, destination_path=destination_path) diff --git a/ydb/public/sdk/python3/ydb/issues.py b/ydb/public/sdk/python3/ydb/issues.py index 66dba50843..065dcbc80c 100644 --- a/ydb/public/sdk/python3/ydb/issues.py +++ b/ydb/public/sdk/python3/ydb/issues.py @@ -1,10 +1,19 @@ # -*- coding: utf-8 -*- +from __future__ import annotations + from google.protobuf import text_format import enum import queue +import typing from . import _apis +# Workaround for good IDE and universal for runtime +if typing.TYPE_CHECKING: + from _grpc.v4.protos import ydb_issue_message_pb2, ydb_operation_pb2 +else: + from ._grpc.common.protos import ydb_issue_message_pb2, ydb_operation_pb2 + _TRANSPORT_STATUSES_FIRST = 401000 _CLIENT_STATUSES_FIRST = 402000 @@ -44,10 +53,19 @@ class StatusCode(enum.IntEnum): SESSION_POOL_EMPTY = _CLIENT_STATUSES_FIRST + 40 +# TODO: convert from proto IssueMessage +class _IssueMessage: + def __init__(self, message: str, issue_code: int, severity: int, issues) -> None: + self.message = message + self.issue_code = issue_code + self.severity = severity + self.issues = issues + + class Error(Exception): status = None - def __init__(self, message, issues=None): + def __init__(self, message: str, issues: typing.Optional[typing.Iterable[_IssueMessage]] = None): super(Error, self).__init__(message) self.issues = issues self.message = message @@ -166,14 +184,14 @@ class UnexpectedGrpcMessage(Error): super().__init__(message) -def _format_issues(issues): +def _format_issues(issues: typing.Iterable[ydb_issue_message_pb2.IssueMessage]) -> str: if not issues: return "" return " ,".join(text_format.MessageToString(issue, as_utf8=False, as_one_line=True) for issue in issues) -def _format_response(response): +def _format_response(response: ydb_operation_pb2.Operation) -> str: fmt_issues = _format_issues(response.issues) return f"{fmt_issues} (server_code: {response.status})" @@ -202,7 +220,7 @@ _server_side_error_map = { } -def _process_response(response_proto): +def _process_response(response_proto: ydb_operation_pb2.Operation) -> None: if response_proto.status not in _success_status_codes: exc_obj = _server_side_error_map.get(response_proto.status) raise exc_obj(_format_response(response_proto), response_proto.issues) diff --git a/ydb/public/sdk/python3/ydb/pool.py b/ydb/public/sdk/python3/ydb/pool.py index e0bf2f1587..1e75950ea8 100644 --- a/ydb/public/sdk/python3/ydb/pool.py +++ b/ydb/public/sdk/python3/ydb/pool.py @@ -5,11 +5,12 @@ import logging from concurrent import futures import collections import random +import typing from . import connection as connection_impl, issues, resolver, _utilities, tracing from abc import abstractmethod -from .connection import Connection +from .connection import Connection, EndpointKey logger = logging.getLogger(__name__) @@ -123,13 +124,13 @@ class ConnectionsCache(object): return subscription @tracing.with_trace() - def get(self, preferred_endpoint=None) -> Connection: + def get(self, preferred_endpoint: typing.Optional[EndpointKey] = None) -> Connection: with self.lock: if preferred_endpoint is not None and preferred_endpoint.node_id in self.connections_by_node_id: return self.connections_by_node_id[preferred_endpoint.node_id] if preferred_endpoint is not None and preferred_endpoint.endpoint in self.connections: - return self.connections[preferred_endpoint] + return self.connections[preferred_endpoint.endpoint] for conn_lst in self.conn_lst_order: try: diff --git a/ydb/public/sdk/python3/ydb/resolver.py b/ydb/public/sdk/python3/ydb/resolver.py index d4fb1aff76..b795af928c 100644 --- a/ydb/public/sdk/python3/ydb/resolver.py +++ b/ydb/public/sdk/python3/ydb/resolver.py @@ -1,10 +1,21 @@ # -*- coding: utf-8 -*- +from __future__ import annotations + import contextlib import logging import threading import random import itertools -from . import connection as conn_impl, issues, settings as settings_impl, _apis +import typing +from . import connection as conn_impl, driver, issues, settings as settings_impl, _apis + + +# Workaround for good IDE and universal for runtime +if typing.TYPE_CHECKING: + from ._grpc.v4.protos import ydb_discovery_pb2 +else: + from ._grpc.common.protos import ydb_discovery_pb2 + logger = logging.getLogger(__name__) @@ -22,7 +33,7 @@ class EndpointInfo(object): "node_id", ) - def __init__(self, endpoint_info): + def __init__(self, endpoint_info: ydb_discovery_pb2.EndpointInfo): self.address = endpoint_info.address self.endpoint = "%s:%s" % (endpoint_info.address, endpoint_info.port) self.location = endpoint_info.location @@ -33,7 +44,7 @@ class EndpointInfo(object): self.ssl_target_name_override = endpoint_info.ssl_target_name_override self.node_id = endpoint_info.node_id - def endpoints_with_options(self): + def endpoints_with_options(self) -> typing.Generator[typing.Tuple[str, conn_impl.EndpointOptions], None, None]: ssl_target_name_override = None if self.ssl: if self.ssl_target_name_override: @@ -73,14 +84,14 @@ class EndpointInfo(object): return self.endpoint == other.endpoint -def _list_endpoints_request_factory(connection_params): +def _list_endpoints_request_factory(connection_params: driver.DriverConfig) -> _apis.ydb_discovery.ListEndpointsRequest: request = _apis.ydb_discovery.ListEndpointsRequest() request.database = connection_params.database return request class DiscoveryResult(object): - def __init__(self, self_location, endpoints): + def __init__(self, self_location: str, endpoints: "list[EndpointInfo]"): self.self_location = self_location self.endpoints = endpoints @@ -94,7 +105,12 @@ class DiscoveryResult(object): return self.__str__() @classmethod - def from_response(cls, rpc_state, response, use_all_nodes=False): + def from_response( + cls, + rpc_state: conn_impl._RpcState, + response: ydb_discovery_pb2.ListEndpointsResponse, + use_all_nodes: bool = False, + ) -> DiscoveryResult: issues._process_response(response.operation) message = _apis.ydb_discovery.ListEndpointsResult() response.operation.result.Unpack(message) @@ -123,7 +139,7 @@ class DiscoveryResult(object): class DiscoveryEndpointsResolver(object): - def __init__(self, driver_config): + def __init__(self, driver_config: driver.DriverConfig): self.logger = logger.getChild(self.__class__.__name__) self._driver_config = driver_config self._ready_timeout = getattr(self._driver_config, "discovery_request_timeout", 10) @@ -136,7 +152,7 @@ class DiscoveryEndpointsResolver(object): random.shuffle(self._endpoints) self._endpoints_iter = itertools.cycle(self._endpoints) - def _add_debug_details(self, message, *args): + def _add_debug_details(self, message: str, *args): self.logger.debug(message, *args) message = message % args with self._lock: @@ -144,19 +160,19 @@ class DiscoveryEndpointsResolver(object): if len(self._debug_details_items) > self._debug_details_history_size: self._debug_details_items.pop() - def debug_details(self): + def debug_details(self) -> str: """ Returns last resolver errors as a debug string. """ with self._lock: return "\n".join(self._debug_details_items) - def resolve(self): + def resolve(self) -> typing.ContextManager[typing.Optional[DiscoveryResult]]: with self.context_resolve() as result: return result @contextlib.contextmanager - def context_resolve(self): + def context_resolve(self) -> typing.ContextManager[typing.Optional[DiscoveryResult]]: self.logger.debug("Preparing initial endpoint to resolve endpoints") endpoint = next(self._endpoints_iter) initial = conn_impl.Connection.ready_factory(endpoint, self._driver_config, ready_timeout=self._ready_timeout) diff --git a/ydb/public/sdk/python3/ydb/topic.py b/ydb/public/sdk/python3/ydb/topic.py index 00ffb1c479..2175af47f7 100644 --- a/ydb/public/sdk/python3/ydb/topic.py +++ b/ydb/public/sdk/python3/ydb/topic.py @@ -170,8 +170,11 @@ class TopicClientAsyncIO: consumer: str, buffer_size_bytes: int = 50 * 1024 * 1024, # decoders: map[codec_code] func(encoded_bytes)->decoded_bytes + # the func will be called from multiply threads in parallel decoders: Union[Mapping[int, Callable[[bytes], bytes]], None] = None, - decoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool + # custom decoder executor for call builtin and custom decoders. If None - use shared executor pool. + # if max_worker in the executor is 1 - then decoders will be called from the thread without parallel + decoder_executor: Optional[concurrent.futures.Executor] = None, ) -> TopicReaderAsyncIO: if not decoder_executor: @@ -194,8 +197,12 @@ class TopicClientAsyncIO: auto_seqno: bool = True, auto_created_at: bool = True, codec: Optional[TopicCodec] = None, # default mean auto-select + # encoders: map[codec_code] func(encoded_bytes)->decoded_bytes + # the func will be called from multiply threads in parallel. encoders: Optional[Mapping[_ydb_topic_public_types.PublicCodec, Callable[[bytes], bytes]]] = None, - encoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool + # custom encoder executor for call builtin and custom decoders. If None - use shared executor pool. + # If max_worker in the executor is 1 - then encoders will be called from the thread without parallel. + encoder_executor: Optional[concurrent.futures.Executor] = None, ) -> TopicWriterAsyncIO: args = locals().copy() del args["self"] @@ -319,7 +326,10 @@ class TopicClient: consumer: str, buffer_size_bytes: int = 50 * 1024 * 1024, # decoders: map[codec_code] func(encoded_bytes)->decoded_bytes + # the func will be called from multiply threads in parallel decoders: Union[Mapping[int, Callable[[bytes], bytes]], None] = None, + # custom decoder executor for call builtin and custom decoders. If None - use shared executor pool. + # if max_worker in the executor is 1 - then decoders will be called from the thread without parallel decoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool ) -> TopicReader: if not decoder_executor: @@ -343,7 +353,11 @@ class TopicClient: auto_seqno: bool = True, auto_created_at: bool = True, codec: Optional[TopicCodec] = None, # default mean auto-select + # encoders: map[codec_code] func(encoded_bytes)->decoded_bytes + # the func will be called from multiply threads in parallel. encoders: Optional[Mapping[_ydb_topic_public_types.PublicCodec, Callable[[bytes], bytes]]] = None, + # custom encoder executor for call builtin and custom decoders. If None - use shared executor pool. + # If max_worker in the executor is 1 - then encoders will be called from the thread without parallel. encoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool ) -> TopicWriter: args = locals().copy() @@ -373,7 +387,11 @@ class TopicClient: @dataclass class TopicClientSettings: - encode_decode_threads_count: int = 4 + # ATTENTION + # When set the encode_decode_threads_count - all custom encoders/decoders for topic reader/writer + # MUST be thread-safe + # because they will be called from parallel threads + encode_decode_threads_count: int = 1 class TopicError(issues.Error): diff --git a/ydb/public/sdk/python3/ydb/types.py b/ydb/public/sdk/python3/ydb/types.py index 9771e037f7..cf13aac0a2 100644 --- a/ydb/public/sdk/python3/ydb/types.py +++ b/ydb/public/sdk/python3/ydb/types.py @@ -1,76 +1,94 @@ # -*- coding: utf-8 -*- +from __future__ import annotations + import abc import enum import json from . import _utilities, _apis from datetime import date, datetime, timedelta +import typing import uuid import struct from google.protobuf import struct_pb2 +from . import table + + +# Workaround for good IDE and universal for runtime +if typing.TYPE_CHECKING: + from ._grpc.v4.protos import ydb_value_pb2 +else: + from ._grpc.common.protos import ydb_value_pb2 + _SECONDS_IN_DAY = 60 * 60 * 24 _EPOCH = datetime(1970, 1, 1) -def _from_date(x, table_client_settings): +def _from_date(x: ydb_value_pb2.Value, table_client_settings: table.TableClientSettings) -> typing.Union[date, int]: if table_client_settings is not None and table_client_settings._native_date_in_result_sets: return _EPOCH.date() + timedelta(days=x.uint32_value) return x.uint32_value -def _to_date(pb, value): +def _to_date(pb: ydb_value_pb2.Value, value: typing.Union[date, int]) -> None: if isinstance(value, date): pb.uint32_value = (value - _EPOCH.date()).days else: pb.uint32_value = value -def _from_datetime_number(x, table_client_settings): +def _from_datetime_number( + x: typing.Union[float, datetime], table_client_settings: table.TableClientSettings +) -> datetime: if table_client_settings is not None and table_client_settings._native_datetime_in_result_sets: return datetime.utcfromtimestamp(x) return x -def _from_json(x, table_client_settings): +def _from_json(x: typing.Union[str, bytearray, bytes], table_client_settings: table.TableClientSettings): if table_client_settings is not None and table_client_settings._native_json_in_result_sets: return json.loads(x) return x -def _to_uuid(value_pb, table_client_settings): +def _to_uuid(value_pb: ydb_value_pb2.Value, table_client_settings: table.TableClientSettings) -> uuid.UUID: return uuid.UUID(bytes_le=struct.pack("QQ", value_pb.low_128, value_pb.high_128)) -def _from_uuid(pb, value): +def _from_uuid(pb: ydb_value_pb2.Value, value: uuid.UUID): pb.low_128 = struct.unpack("Q", value.bytes_le[0:8])[0] pb.high_128 = struct.unpack("Q", value.bytes_le[8:16])[0] -def _from_interval(value_pb, table_client_settings): +def _from_interval( + value_pb: ydb_value_pb2.Value, table_client_settings: table.TableClientSettings +) -> typing.Union[timedelta, int]: if table_client_settings is not None and table_client_settings._native_interval_in_result_sets: return timedelta(microseconds=value_pb.int64_value) return value_pb.int64_value -def _timedelta_to_microseconds(value): +def _timedelta_to_microseconds(value: timedelta) -> int: return (value.days * _SECONDS_IN_DAY + value.seconds) * 1000000 + value.microseconds -def _to_interval(pb, value): +def _to_interval(pb: ydb_value_pb2.Value, value: typing.Union[timedelta, int]): if isinstance(value, timedelta): pb.int64_value = _timedelta_to_microseconds(value) else: pb.int64_value = value -def _from_timestamp(value_pb, table_client_settings): +def _from_timestamp( + value_pb: ydb_value_pb2.Value, table_client_settings: table.TableClientSettings +) -> typing.Union[datetime, int]: if table_client_settings is not None and table_client_settings._native_timestamp_in_result_sets: return _EPOCH + timedelta(microseconds=value_pb.uint64_value) return value_pb.uint64_value -def _to_timestamp(pb, value): +def _to_timestamp(pb: ydb_value_pb2.Value, value: typing.Union[datetime, int]): if isinstance(value, datetime): pb.uint64_value = _timedelta_to_microseconds(value - _EPOCH) else: @@ -129,13 +147,15 @@ class PrimitiveType(enum.Enum): DyNumber = _apis.primitive_types.DYNUMBER, "text_value" - def __init__(self, idn, proto_field, to_obj=None, from_obj=None): + def __init__( + self, idn: ydb_value_pb2.Type.PrimitiveTypeId, proto_field: typing.Optional[str], to_obj=None, from_obj=None + ): self._idn_ = idn self._to_obj = to_obj self._from_obj = from_obj self._proto_field = proto_field - def get_value(self, value_pb, table_client_settings): + def get_value(self, value_pb: ydb_value_pb2.Value, table_client_settings: table.TableClientSettings): """ Extracts value from protocol buffer :param value_pb: A protocol buffer @@ -149,7 +169,7 @@ class PrimitiveType(enum.Enum): return getattr(value_pb, self._proto_field) - def set_value(self, pb, value): + def set_value(self, pb: ydb_value_pb2.Value, value): """ Sets value in a protocol buffer :param pb: A protocol buffer @@ -176,7 +196,9 @@ class PrimitiveType(enum.Enum): class DataQuery(object): __slots__ = ("yql_text", "parameters_types", "name") - def __init__(self, query_id, parameters_types, name=None): + def __init__( + self, query_id: str, parameters_types: "dict[str, ydb_value_pb2.Type]", name: typing.Optional[str] = None + ): self.yql_text = query_id self.parameters_types = parameters_types self.name = _utilities.get_query_hash(self.yql_text) if name is None else name @@ -259,7 +281,7 @@ class NullType(AbstractTypeBuilder): class OptionalType(AbstractTypeBuilder): __slots__ = ("_repr", "_proto", "_item") - def __init__(self, optional_type): + def __init__(self, optional_type: typing.Union[AbstractTypeBuilder, PrimitiveType]): """ Represents optional type that wraps inner type :param optional_type: An instance of an inner type @@ -291,7 +313,7 @@ class OptionalType(AbstractTypeBuilder): class ListType(AbstractTypeBuilder): __slots__ = ("_repr", "_proto") - def __init__(self, list_type): + def __init__(self, list_type: typing.Union[AbstractTypeBuilder, PrimitiveType]): """ :param list_type: List item type builder """ @@ -313,7 +335,11 @@ class ListType(AbstractTypeBuilder): class DictType(AbstractTypeBuilder): __slots__ = ("__repr", "__proto") - def __init__(self, key_type, payload_type): + def __init__( + self, + key_type: typing.Union[AbstractTypeBuilder, PrimitiveType], + payload_type: typing.Union[AbstractTypeBuilder, PrimitiveType], + ): """ :param key_type: Key type builder :param payload_type: Payload type builder @@ -341,7 +367,7 @@ class TupleType(AbstractTypeBuilder): self.__elements_repr = [] self.__proto = _apis.ydb_value.Type(tuple_type=_apis.ydb_value.TupleType()) - def add_element(self, element_type): + def add_element(self, element_type: typing.Union[AbstractTypeBuilder, PrimitiveType]): """ :param element_type: Adds additional element of tuple :return: self @@ -366,7 +392,7 @@ class StructType(AbstractTypeBuilder): self.__members_repr = [] self.__proto = _apis.ydb_value.Type(struct_type=_apis.ydb_value.StructType()) - def add_member(self, name, member_type): + def add_member(self, name: str, member_type: typing.Union[AbstractTypeBuilder, PrimitiveType]): """ :param name: :param member_type: @@ -393,7 +419,7 @@ class BulkUpsertColumns(AbstractTypeBuilder): self.__columns_repr = [] self.__proto = _apis.ydb_value.Type(struct_type=_apis.ydb_value.StructType()) - def add_column(self, name, column_type): + def add_column(self, name: str, column_type: typing.Union[AbstractTypeBuilder, PrimitiveType]): """ :param name: A column name :param column_type: A column type diff --git a/ydb/public/sdk/python3/ydb/ydb_version.py b/ydb/public/sdk/python3/ydb/ydb_version.py index 46c424d30a..709cb0a9e5 100644 --- a/ydb/public/sdk/python3/ydb/ydb_version.py +++ b/ydb/public/sdk/python3/ydb/ydb_version.py @@ -1 +1 @@ -VERSION = "3.5.0" +VERSION = "3.7.0" |