diff options
author | robot-kikimr-dev <robot-kikimr-dev@yandex-team.com> | 2022-08-10 09:47:27 +0300 |
---|---|---|
committer | robot-kikimr-dev <robot-kikimr-dev@yandex-team.com> | 2022-08-10 09:47:27 +0300 |
commit | 807c3abb5fde940cb3c6c1a3848e56b3e4bf1c7d (patch) | |
tree | f9f9c6d1de04210f13625ac2d48f80e0ab1ff5f8 | |
parent | 432e33c7f18f5b6c2c253eec913d041bad139390 (diff) | |
download | ydb-807c3abb5fde940cb3c6c1a3848e56b3e4bf1c7d.tar.gz |
Sync YDB SDK from github
Release YDB recipe binaries
-rw-r--r-- | ydb/public/sdk/python/ydb/_utilities.py | 30 | ||||
-rw-r--r-- | ydb/public/sdk/python/ydb/aio/credentials.py | 113 | ||||
-rw-r--r-- | ydb/public/sdk/python/ydb/aio/iam.py | 133 | ||||
-rw-r--r-- | ydb/public/sdk/python/ydb/aio/table.py | 20 | ||||
-rw-r--r-- | ydb/public/sdk/python/ydb/connection.py | 3 | ||||
-rw-r--r-- | ydb/public/sdk/python/ydb/credentials.py | 183 | ||||
-rw-r--r-- | ydb/public/sdk/python/ydb/driver.py | 38 | ||||
-rw-r--r-- | ydb/public/sdk/python/ydb/iam/auth.py | 175 | ||||
-rw-r--r-- | ydb/public/sdk/python/ydb/settings.py | 7 | ||||
-rw-r--r-- | ydb/public/sdk/python/ydb/table.py | 12 | ||||
-rw-r--r-- | ydb/public/sdk/python/ydb/types.py | 57 | ||||
-rw-r--r-- | ydb/public/sdk/python/ydb/ydb_version.py | 2 |
12 files changed, 446 insertions, 327 deletions
diff --git a/ydb/public/sdk/python/ydb/_utilities.py b/ydb/public/sdk/python/ydb/_utilities.py index 765a694c4f..32419b1bf9 100644 --- a/ydb/public/sdk/python/ydb/_utilities.py +++ b/ydb/public/sdk/python/ydb/_utilities.py @@ -13,6 +13,10 @@ except ImportError: interceptor = None +_grpcs_protocol = "grpcs://" +_grpc_protocol = "grpc://" + + def wrap_result_in_future(result): f = futures.Future() f.set_result(result) @@ -33,6 +37,32 @@ def x_ydb_sdk_build_info_header(): return ("x-ydb-sdk-build-info", "ydb-python-sdk/" + ydb_version.VERSION) +def is_secure_protocol(endpoint): + return endpoint.startswith("grpcs://") + + +def wrap_endpoint(endpoint): + if endpoint.startswith(_grpcs_protocol): + return endpoint[len(_grpcs_protocol) :] + if endpoint.startswith(_grpc_protocol): + return endpoint[len(_grpc_protocol) :] + return endpoint + + +def parse_connection_string(connection_string): + cs = connection_string + if not cs.startswith(_grpc_protocol) and not cs.startswith(_grpcs_protocol): + # default is grpcs + cs = _grpcs_protocol + cs + + p = six.moves.urllib.parse.urlparse(connection_string) + b = six.moves.urllib.parse.parse_qs(p.query) + database = b.get("database", []) + assert len(database) > 0 + + return p.scheme + "://" + p.netloc, database[0] + + # Decorator that ensures no exceptions are leaked from decorated async call def wrap_async_call_exceptions(f): @functools.wraps(f) diff --git a/ydb/public/sdk/python/ydb/aio/credentials.py b/ydb/public/sdk/python/ydb/aio/credentials.py new file mode 100644 index 0000000000..e98404407b --- /dev/null +++ b/ydb/public/sdk/python/ydb/aio/credentials.py @@ -0,0 +1,113 @@ +import time + +import abc +import asyncio +import logging +import six +from ydb import issues, credentials + +logger = logging.getLogger(__name__) + + +class _OneToManyValue(object): + def __init__(self): + self._value = None + self._condition = asyncio.Condition() + + async def consume(self, timeout=3): + async with self._condition: + if self._value is None: + try: + await asyncio.wait_for(self._condition.wait(), timeout=timeout) + except Exception: + return self._value + return self._value + + async def update(self, n_value): + async with self._condition: + prev_value = self._value + self._value = n_value + if prev_value is None: + self._condition.notify_all() + + +class _AtMostOneExecution(object): + def __init__(self): + self._can_schedule = True + self._lock = asyncio.Lock() # Lock to guarantee only one execution + + async def _wrapped_execution(self, callback): + await self._lock.acquire() + try: + res = callback() + if asyncio.iscoroutine(res): + await res + except Exception: + pass + + finally: + self._lock.release() + self._can_schedule = True + + def submit(self, callback): + if self._can_schedule: + self._can_schedule = False + asyncio.ensure_future(self._wrapped_execution(callback)) + + +@six.add_metaclass(abc.ABCMeta) +class AbstractExpiringTokenCredentials(credentials.AbstractExpiringTokenCredentials): + def __init__(self): + super(AbstractExpiringTokenCredentials, self).__init__() + self._tp = _AtMostOneExecution() + self._cached_token = _OneToManyValue() + + @abc.abstractmethod + async def _make_token_request(self): + pass + + async def _refresh(self): + current_time = time.time() + self._log_refresh_start(current_time) + + try: + auth_metadata = await self._make_token_request() + await self._cached_token.update(auth_metadata["access_token"]) + self.update_expiration_info(auth_metadata) + self.logger.info( + "Token refresh successful. current_time %s, refresh_in %s", + current_time, + self._refresh_in, + ) + + except (KeyboardInterrupt, SystemExit): + return + + except Exception as e: + self.last_error = str(e) + await asyncio.sleep(1) + self._tp.submit(self._refresh) + + async def token(self): + current_time = time.time() + if current_time > self._refresh_in: + self._tp.submit(self._refresh) + + cached_token = await self._cached_token.consume(timeout=3) + if cached_token is None: + if self.last_error is None: + raise issues.ConnectionError( + "%s: timeout occurred while waiting for token.\n%s" + % ( + self.__class__.__name__, + self.extra_error_message, + ) + ) + raise issues.ConnectionError( + "%s: %s.\n%s" + % (self.__class__.__name__, self.last_error, self.extra_error_message) + ) + return cached_token + + async def auth_metadata(self): + return [(credentials.YDB_AUTH_TICKET_HEADER, await self.token())] diff --git a/ydb/public/sdk/python/ydb/aio/iam.py b/ydb/public/sdk/python/ydb/aio/iam.py index bf06340df6..a2da48b782 100644 --- a/ydb/public/sdk/python/ydb/aio/iam.py +++ b/ydb/public/sdk/python/ydb/aio/iam.py @@ -2,11 +2,10 @@ import grpc.aio import time import abc -import asyncio import logging import six -from ydb import issues, credentials from ydb.iam import auth +from .credentials import AbstractExpiringTokenCredentials logger = logging.getLogger(__name__) @@ -25,127 +24,20 @@ except ImportError: aiohttp = None -class _OneToManyValue(object): - def __init__(self): - self._value = None - self._condition = asyncio.Condition() - - async def consume(self, timeout=3): - async with self._condition: - if self._value is None: - try: - await asyncio.wait_for(self._condition.wait(), timeout=timeout) - except Exception: - return self._value - return self._value - - async def update(self, n_value): - async with self._condition: - prev_value = self._value - self._value = n_value - if prev_value is None: - self._condition.notify_all() - - -class _AtMostOneExecution(object): - def __init__(self): - self._can_schedule = True - self._lock = asyncio.Lock() # Lock to guarantee only one execution - - async def _wrapped_execution(self, callback): - await self._lock.acquire() - try: - res = callback() - if asyncio.iscoroutine(res): - await res - except Exception: - pass - - finally: - self._lock.release() - self._can_schedule = True - - def submit(self, callback): - if self._can_schedule: - self._can_schedule = False - asyncio.ensure_future(self._wrapped_execution(callback)) - - @six.add_metaclass(abc.ABCMeta) -class IamTokenCredentials(auth.IamTokenCredentials): - def __init__(self): - super(IamTokenCredentials, self).__init__() - self._tp = _AtMostOneExecution() - self._iam_token = _OneToManyValue() - - @abc.abstractmethod - async def _get_iam_token(self): - pass - - async def _refresh(self): - current_time = time.time() - self._log_refresh_start(current_time) - - try: - auth_metadata = await self._get_iam_token() - await self._iam_token.update(auth_metadata["access_token"]) - self.update_expiration_info(auth_metadata) - self.logger.info( - "Token refresh successful. current_time %s, refresh_in %s", - current_time, - self._refresh_in, - ) - - except (KeyboardInterrupt, SystemExit): - return - - except Exception as e: - self.last_error = str(e) - await asyncio.sleep(1) - self._tp.submit(self._refresh) - - async def iam_token(self): - current_time = time.time() - if current_time > self._refresh_in: - self._tp.submit(self._refresh) - - iam_token = await self._iam_token.consume(timeout=3) - if iam_token is None: - if self.last_error is None: - raise issues.ConnectionError( - "%s: timeout occurred while waiting for token.\n%s" - % self.__class__.__name__, - self.extra_error_message, - ) - raise issues.ConnectionError( - "%s: %s.\n%s" - % (self.__class__.__name__, self.last_error, self.extra_error_message) - ) - return iam_token - - async def auth_metadata(self): - return [(credentials.YDB_AUTH_TICKET_HEADER, await self.iam_token())] - - -@six.add_metaclass(abc.ABCMeta) -class TokenServiceCredentials(IamTokenCredentials): +class TokenServiceCredentials(AbstractExpiringTokenCredentials): def __init__(self, iam_endpoint=None, iam_channel_credentials=None): super(TokenServiceCredentials, self).__init__() + assert ( + iam_token_service_pb2_grpc is not None + ), "run pip install==ydb[yc] to use service account credentials" + self._get_token_request_timeout = 10 self._iam_endpoint = ( "iam.api.cloud.yandex.net:443" if iam_endpoint is None else iam_endpoint ) self._iam_channel_credentials = ( {} if iam_channel_credentials is None else iam_channel_credentials ) - self._get_token_request_timeout = 10 - if ( - iam_token_service_pb2_grpc is None - or jwt is None - or iam_token_service_pb2 is None - ): - raise RuntimeError( - "Install jwt & yandex python cloud library to use service account credentials provider" - ) def _channel_factory(self): return grpc.aio.secure_channel( @@ -157,7 +49,7 @@ class TokenServiceCredentials(IamTokenCredentials): def _get_token_request(self): pass - async def _get_iam_token(self): + async def _make_token_request(self): async with self._channel_factory() as channel: stub = iam_token_service_pb2_grpc.IamTokenServiceStub(channel) response = await stub.Create( @@ -209,20 +101,19 @@ class YandexPassportOAuthIamCredentials(TokenServiceCredentials): ) -class MetadataUrlCredentials(IamTokenCredentials): +class MetadataUrlCredentials(AbstractExpiringTokenCredentials): def __init__(self, metadata_url=None): super(MetadataUrlCredentials, self).__init__() - if aiohttp is None: - raise RuntimeError( - "Install aiohttp library to use metadata credentials provider" - ) + assert ( + aiohttp is not None + ), "Install aiohttp library to use metadata credentials provider" self._metadata_url = ( auth.DEFAULT_METADATA_URL if metadata_url is None else metadata_url ) self._tp.submit(self._refresh) self.extra_error_message = "Check that metadata service configured properly and application deployed in VM or function at Yandex.Cloud." - async def _get_iam_token(self): + async def _make_token_request(self): timeout = aiohttp.ClientTimeout(total=2) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get( diff --git a/ydb/public/sdk/python/ydb/aio/table.py b/ydb/public/sdk/python/ydb/aio/table.py index f5e4fd9796..9df797eaf0 100644 --- a/ydb/public/sdk/python/ydb/aio/table.py +++ b/ydb/public/sdk/python/ydb/aio/table.py @@ -54,7 +54,10 @@ class Session(BaseSession): return await super().keep_alive(settings) async def create(self, settings=None): # pylint: disable=W0236 - return await super().create(settings) + res = super().create(settings) + if asyncio.iscoroutine(res): + res = await res + return res async def delete(self, settings=None): # pylint: disable=W0236 return await super().delete(settings) @@ -184,13 +187,22 @@ class TxContext(BaseTxContext): return await super().execute(query, parameters, commit_tx, settings) async def commit(self, settings=None): # pylint: disable=W0236 - return await super().commit(settings) + res = super().commit(settings) + if asyncio.iscoroutine(res): + res = await res + return res async def rollback(self, settings=None): # pylint: disable=W0236 - return await super().rollback(settings) + res = super().rollback(settings) + if asyncio.iscoroutine(res): + res = await res + return res async def begin(self, settings=None): # pylint: disable=W0236 - return await super().begin(settings) + res = super().begin(settings) + if asyncio.iscoroutine(res): + res = await res + return res async def retry_operation( diff --git a/ydb/public/sdk/python/ydb/connection.py b/ydb/public/sdk/python/ydb/connection.py index ed86de4479..95db084a3c 100644 --- a/ydb/public/sdk/python/ydb/connection.py +++ b/ydb/public/sdk/python/ydb/connection.py @@ -138,7 +138,8 @@ def _construct_metadata(driver_config, settings): if driver_config.database is not None: metadata.append((YDB_DATABASE_HEADER, driver_config.database)) - if driver_config.credentials is not None: + need_rpc_auth = getattr(settings, "need_rpc_auth", True) + if driver_config.credentials is not None and need_rpc_auth: metadata.extend(driver_config.credentials.auth_metadata()) if settings is not None: diff --git a/ydb/public/sdk/python/ydb/credentials.py b/ydb/public/sdk/python/ydb/credentials.py index a291258db2..96247ef32e 100644 --- a/ydb/public/sdk/python/ydb/credentials.py +++ b/ydb/public/sdk/python/ydb/credentials.py @@ -1,9 +1,18 @@ # -*- coding: utf-8 -*- import abc import six -from . import tracing +from . import tracing, issues, connection +from . import settings as settings_impl +import threading +from concurrent import futures +import logging +import time +from ydb.public.api.protos import ydb_auth_pb2 +from ydb.public.api.grpc import ydb_auth_v1_pb2_grpc + YDB_AUTH_TICKET_HEADER = "x-ydb-auth-ticket" +logger = logging.getLogger(__name__) @six.add_metaclass(abc.ABCMeta) @@ -26,6 +35,178 @@ class Credentials(object): pass +class OneToManyValue(object): + def __init__(self): + self._value = None + self._condition = threading.Condition() + + def consume(self, timeout=3): + with self._condition: + if self._value is None: + self._condition.wait(timeout=timeout) + return self._value + + def update(self, n_value): + with self._condition: + prev_value = self._value + self._value = n_value + if prev_value is None: + self._condition.notify_all() + + +class AtMostOneExecution(object): + def __init__(self): + self._can_schedule = True + self._lock = threading.Lock() + self._tp = futures.ThreadPoolExecutor(1) + + def wrapped_execution(self, callback): + try: + callback() + except Exception: + pass + + finally: + self.cleanup() + + def submit(self, callback): + with self._lock: + if self._can_schedule: + self._tp.submit(self.wrapped_execution, callback) + self._can_schedule = False + + def cleanup(self): + with self._lock: + self._can_schedule = True + + +@six.add_metaclass(abc.ABCMeta) +class AbstractExpiringTokenCredentials(Credentials): + def __init__(self, tracer=None): + super(AbstractExpiringTokenCredentials, self).__init__(tracer) + self._expires_in = 0 + self._refresh_in = 0 + self._hour = 60 * 60 + self._cached_token = OneToManyValue() + self._tp = AtMostOneExecution() + self.logger = logger.getChild(self.__class__.__name__) + self.last_error = None + self.extra_error_message = "" + + @abc.abstractmethod + def _make_token_request(self): + pass + + def _log_refresh_start(self, current_time): + self.logger.debug("Start refresh token from metadata") + if current_time > self._refresh_in: + self.logger.info( + "Cached token reached refresh_in deadline, current time %s, deadline %s", + current_time, + self._refresh_in, + ) + + if current_time > self._expires_in and self._expires_in > 0: + self.logger.error( + "Cached token reached expires_in deadline, current time %s, deadline %s", + current_time, + self._expires_in, + ) + + def _update_expiration_info(self, auth_metadata): + self._expires_in = time.time() + min( + self._hour, auth_metadata["expires_in"] / 2 + ) + self._refresh_in = time.time() + min( + self._hour / 2, auth_metadata["expires_in"] / 4 + ) + + def _refresh(self): + current_time = time.time() + self._log_refresh_start(current_time) + try: + token_response = self._make_token_request() + self._cached_token.update(token_response["access_token"]) + self._update_expiration_info(token_response) + self.logger.info( + "Token refresh successful. current_time %s, refresh_in %s", + current_time, + self._refresh_in, + ) + + except (KeyboardInterrupt, SystemExit): + return + + except Exception as e: + self.last_error = str(e) + time.sleep(1) + self._tp.submit(self._refresh) + + @property + @tracing.with_trace() + def token(self): + current_time = time.time() + if current_time > self._refresh_in: + tracing.trace(self.tracer, {"refresh": True}) + self._tp.submit(self._refresh) + cached_token = self._cached_token.consume(timeout=3) + tracing.trace(self.tracer, {"consumed": True}) + if cached_token is None: + if self.last_error is None: + raise issues.ConnectionError( + "%s: timeout occurred while waiting for token.\n%s" + % ( + self.__class__.__name__, + self.extra_error_message, + ) + ) + raise issues.ConnectionError( + "%s: %s.\n%s" + % (self.__class__.__name__, self.last_error, self.extra_error_message) + ) + return cached_token + + def auth_metadata(self): + return [(YDB_AUTH_TICKET_HEADER, self.token)] + + +def _wrap_static_credentials_response(rpc_state, response): + issues._process_response(response.operation) + result = ydb_auth_pb2.LoginResult() + response.operation.result.Unpack(result) + return result + + +class StaticCredentials(AbstractExpiringTokenCredentials): + def __init__(self, driver_config, user, password="", tracer=None): + super(StaticCredentials, self).__init__(tracer) + self.driver_config = driver_config + self.user = user + self.password = password + self.request_timeout = 10 + + def _make_token_request(self): + conn = connection.Connection.ready_factory( + self.driver_config.endpoint, self.driver_config + ) + assert conn is not None, ( + "Failed to establish connection in to %s" % self.driver_config.endpoint + ) + try: + result = conn( + ydb_auth_pb2.LoginRequest(user=self.user, password=self.password), + ydb_auth_v1_pb2_grpc.AuthServiceStub, + "Login", + _wrap_static_credentials_response, + settings_impl.BaseRequestSettings() + .with_timeout(self.request_timeout) + .with_need_rpc_auth(False), + ) + finally: + conn.close() + return {"expires_in": 30 * 60, "access_token": result.token} + + class AnonymousCredentials(Credentials): @staticmethod def auth_metadata(): diff --git a/ydb/public/sdk/python/ydb/driver.py b/ydb/public/sdk/python/ydb/driver.py index da300373c6..9b3fa99cfa 100644 --- a/ydb/public/sdk/python/ydb/driver.py +++ b/ydb/public/sdk/python/ydb/driver.py @@ -4,41 +4,13 @@ from . import tracing import six import os import grpc +from . import _utilities if six.PY2: Any = None else: from typing import Any # noqa -_grpcs_protocol = "grpcs://" -_grpc_protocol = "grpc://" - - -def is_secure_protocol(endpoint): - return endpoint.startswith("grpcs://") - - -def wrap_endpoint(endpoint): - if endpoint.startswith(_grpcs_protocol): - return endpoint[len(_grpcs_protocol) :] - if endpoint.startswith(_grpc_protocol): - return endpoint[len(_grpc_protocol) :] - return endpoint - - -def parse_connection_string(connection_string): - cs = connection_string - if not cs.startswith(_grpc_protocol) and not cs.startswith(_grpcs_protocol): - # default is grpcs - cs = _grpcs_protocol + cs - - p = six.moves.urllib.parse.urlparse(connection_string) - b = six.moves.urllib.parse.parse_qs(p.query) - database = b.get("database", []) - assert len(database) > 0 - - return p.scheme + "://" + p.netloc, database[0] - class RPCCompression: """Indicates the compression method to be used for an RPC.""" @@ -152,11 +124,11 @@ class DriverConfig(object): self.database = database self.ca_cert = ca_cert self.channel_options = channel_options - self.secure_channel = is_secure_protocol(endpoint) - self.endpoint = wrap_endpoint(self.endpoint) + self.secure_channel = _utilities.is_secure_protocol(endpoint) + self.endpoint = _utilities.wrap_endpoint(self.endpoint) self.endpoints = [] if endpoints is not None: - self.endpoints = [wrap_endpoint(endp) for endp in endpoints] + self.endpoints = [_utilities.wrap_endpoint(endp) for endp in endpoints] if auth_token is not None: credentials = credentials_impl.AuthTokenCredentials(auth_token) self.credentials = credentials @@ -192,7 +164,7 @@ class DriverConfig(object): def default_from_connection_string( cls, connection_string, root_certificates=None, credentials=None, **kwargs ): - endpoint, database = parse_connection_string(connection_string) + endpoint, database = _utilities.parse_connection_string(connection_string) return cls( endpoint, database, diff --git a/ydb/public/sdk/python/ydb/iam/auth.py b/ydb/public/sdk/python/ydb/iam/auth.py index 5bc59ef3c0..06b07e917e 100644 --- a/ydb/public/sdk/python/ydb/iam/auth.py +++ b/ydb/public/sdk/python/ydb/iam/auth.py @@ -6,13 +6,7 @@ import abc import six from datetime import datetime import json -import threading -from concurrent import futures import os -import logging -from ydb import issues - -logger = logging.getLogger(__name__) try: from yandex.cloud.iam.v1 import iam_token_service_pb2_grpc @@ -51,161 +45,22 @@ def get_jwt(account_id, access_key_id, private_key, jwt_expiration_timeout): ) -class OneToManyValue(object): - def __init__(self): - self._value = None - self._condition = threading.Condition() - - def consume(self, timeout=3): - with self._condition: - if self._value is None: - self._condition.wait(timeout=timeout) - return self._value - - def update(self, n_value): - with self._condition: - prev_value = self._value - self._value = n_value - if prev_value is None: - self._condition.notify_all() - - -class AtMostOneExecution(object): - def __init__(self): - self._can_schedule = True - self._lock = threading.Lock() - self._tp = futures.ThreadPoolExecutor(1) - - def wrapped_execution(self, callback): - try: - callback() - except Exception: - pass - - finally: - self.cleanup() - - def submit(self, callback): - with self._lock: - if self._can_schedule: - self._tp.submit(self.wrapped_execution, callback) - self._can_schedule = False - - def cleanup(self): - with self._lock: - self._can_schedule = True - - -@six.add_metaclass(abc.ABCMeta) -class IamTokenCredentials(credentials.Credentials): - def __init__(self, tracer=None): - super(IamTokenCredentials, self).__init__(tracer) - self._expires_in = 0 - self._refresh_in = 0 - self._hour = 60 * 60 - self._iam_token = OneToManyValue() - self._tp = AtMostOneExecution() - self.logger = logger.getChild(self.__class__.__name__) - self.last_error = None - self.extra_error_message = "" - - @abc.abstractmethod - def _get_iam_token(self): - pass - - def _log_refresh_start(self, current_time): - self.logger.debug("Start refresh token from metadata") - if current_time > self._refresh_in: - self.logger.info( - "Cached token reached refresh_in deadline, current time %s, deadline %s", - current_time, - self._refresh_in, - ) - - if current_time > self._expires_in and self._expires_in > 0: - self.logger.error( - "Cached token reached expires_in deadline, current time %s, deadline %s", - current_time, - self._expires_in, - ) - - def _update_expiration_info(self, auth_metadata): - self._expires_in = time.time() + min( - self._hour, auth_metadata["expires_in"] / 2 - ) - self._refresh_in = time.time() + min( - self._hour / 2, auth_metadata["expires_in"] / 4 - ) - - def _refresh(self): - current_time = time.time() - self._log_refresh_start(current_time) - try: - auth_metadata = self._get_iam_token() - self._iam_token.update(auth_metadata["access_token"]) - self._update_expiration_info(auth_metadata) - self.logger.info( - "Token refresh successful. current_time %s, refresh_in %s", - current_time, - self._refresh_in, - ) - - except (KeyboardInterrupt, SystemExit): - return - - except Exception as e: - self.last_error = str(e) - time.sleep(1) - self._tp.submit(self._refresh) - - @property - @tracing.with_trace() - def iam_token(self): - current_time = time.time() - if current_time > self._refresh_in: - tracing.trace(self.tracer, {"refresh": True}) - self._tp.submit(self._refresh) - iam_token = self._iam_token.consume(timeout=3) - tracing.trace(self.tracer, {"consumed": True}) - if iam_token is None: - if self.last_error is None: - raise issues.ConnectionError( - "%s: timeout occurred while waiting for token.\n%s" - % self.__class__.__name__, - self.extra_error_message, - ) - raise issues.ConnectionError( - "%s: %s.\n%s" - % (self.__class__.__name__, self.last_error, self.extra_error_message) - ) - return iam_token - - def auth_metadata(self): - return [(credentials.YDB_AUTH_TICKET_HEADER, self.iam_token)] - - @six.add_metaclass(abc.ABCMeta) -class TokenServiceCredentials(IamTokenCredentials): +class TokenServiceCredentials(credentials.AbstractExpiringTokenCredentials): def __init__(self, iam_endpoint=None, iam_channel_credentials=None, tracer=None): super(TokenServiceCredentials, self).__init__(tracer) + assert ( + iam_token_service_pb2_grpc is not None + ), "run pip install==ydb[yc] to use service account credentials" + self._get_token_request_timeout = 10 + self._iam_token_service_pb2 = iam_token_service_pb2 + self._iam_token_service_pb2_grpc = iam_token_service_pb2_grpc self._iam_endpoint = ( "iam.api.cloud.yandex.net:443" if iam_endpoint is None else iam_endpoint ) self._iam_channel_credentials = ( {} if iam_channel_credentials is None else iam_channel_credentials ) - self._get_token_request_timeout = 10 - if ( - iam_token_service_pb2_grpc is None - or jwt is None - or iam_token_service_pb2 is None - ): - raise RuntimeError( - "Install jwt & yandex python cloud library to use service account credentials provider" - ) - - self._iam_token_service_pb2 = iam_token_service_pb2 - self._iam_token_service_pb2_grpc = iam_token_service_pb2_grpc def _channel_factory(self): return grpc.secure_channel( @@ -218,7 +73,7 @@ class TokenServiceCredentials(IamTokenCredentials): pass @tracing.with_trace() - def _get_iam_token(self): + def _make_token_request(self): with self._channel_factory() as channel: tracing.trace(self.tracer, {"iam_token.from_service": True}) stub = self._iam_token_service_pb2_grpc.IamTokenServiceStub(channel) @@ -299,26 +154,24 @@ class YandexPassportOAuthIamCredentials(TokenServiceCredentials): ) -class MetadataUrlCredentials(IamTokenCredentials): +class MetadataUrlCredentials(credentials.AbstractExpiringTokenCredentials): def __init__(self, metadata_url=None, tracer=None): """ - :param metadata_url: Metadata url :param ydb.Tracer tracer: ydb tracer """ super(MetadataUrlCredentials, self).__init__(tracer) - if requests is None: - raise RuntimeError( - "Install requests library to use metadata credentials provider" - ) + assert ( + requests is not None + ), "Install requests library to use metadata credentials provider" + self.extra_error_message = "Check that metadata service configured properly since we failed to fetch it from metadata_url." self._metadata_url = ( DEFAULT_METADATA_URL if metadata_url is None else metadata_url ) self._tp.submit(self._refresh) - self.extra_error_message = "Check that metadata service configured properly and application deployed in VM or function at Yandex.Cloud." @tracing.with_trace() - def _get_iam_token(self): + def _make_token_request(self): response = requests.get( self._metadata_url, headers={"Metadata-Flavor": "Google"}, timeout=3 ) diff --git a/ydb/public/sdk/python/ydb/settings.py b/ydb/public/sdk/python/ydb/settings.py index f55b1abb30..6739a46fab 100644 --- a/ydb/public/sdk/python/ydb/settings.py +++ b/ydb/public/sdk/python/ydb/settings.py @@ -11,6 +11,7 @@ class BaseRequestSettings(object): "tracer", "compression", "headers", + "need_rpc_auth", ) def __init__(self): @@ -23,6 +24,7 @@ class BaseRequestSettings(object): self.cancel_after = None self.operation_timeout = None self.compression = None + self.need_rpc_auth = True self.headers = [] def make_copy(self): @@ -34,6 +36,7 @@ class BaseRequestSettings(object): .with_cancel_after(self.cancel_after) .with_operation_timeout(self.operation_timeout) .with_compression(self.compression) + .with_need_rpc_auth(self.need_rpc_auth) ) def with_compression(self, compression): @@ -45,6 +48,10 @@ class BaseRequestSettings(object): self.compression = compression return self + def with_need_rpc_auth(self, need_rpc_auth): + self.need_rpc_auth = need_rpc_auth + return self + def with_header(self, key, value): """ Adds a key-value pair to the request headers. diff --git a/ydb/public/sdk/python/ydb/table.py b/ydb/public/sdk/python/ydb/table.py index c3c083d312..2241de3590 100644 --- a/ydb/public/sdk/python/ydb/table.py +++ b/ydb/public/sdk/python/ydb/table.py @@ -997,6 +997,18 @@ class TableClientSettings(object): self._native_date_in_result_sets = False self._make_result_sets_lazy = False self._native_json_in_result_sets = False + self._native_interval_in_result_sets = False + self._native_timestamp_in_result_sets = False + + def with_native_timestamp_in_result_sets(self, enabled): + # type:(bool) -> ydb.TableClientSettings + self._native_timestamp_in_result_sets = enabled + return self + + def with_native_interval_in_result_sets(self, enabled): + # type:(bool) -> ydb.TableClientSettings + self._native_interval_in_result_sets = enabled + return self def with_native_json_in_result_sets(self, enabled): # type:(bool) -> ydb.TableClientSettings diff --git a/ydb/public/sdk/python/ydb/types.py b/ydb/public/sdk/python/ydb/types.py index 1f40bb230c..598a9013aa 100644 --- a/ydb/public/sdk/python/ydb/types.py +++ b/ydb/public/sdk/python/ydb/types.py @@ -4,12 +4,14 @@ import enum import six import json from . import _utilities, _apis -from datetime import date, datetime +from datetime import date, datetime, timedelta import uuid import struct from google.protobuf import struct_pb2 +_SECONDS_IN_DAY = 60 * 60 * 24 +_EPOCH = datetime(1970, 1, 1) if six.PY3: _from_bytes = None else: @@ -56,6 +58,42 @@ def _from_uuid(pb, value): pb.high_128 = struct.unpack("Q", value.bytes_le[8:16])[0] +def _from_interval(value_pb, table_client_settings): + if ( + table_client_settings is not None + and table_client_settings._native_interval_in_result_sets + ): + return timedelta(microseconds=value_pb.int64_value) + return value_pb.int64_value + + +def _timedelta_to_microseconds(value): + return (value.days * _SECONDS_IN_DAY + value.seconds) * 1000000 + value.microseconds + + +def _to_interval(pb, value): + if isinstance(value, timedelta): + pb.int64_value = _timedelta_to_microseconds(value) + else: + pb.int64_value = value + + +def _from_timestamp(value_pb, table_client_settings): + if ( + table_client_settings is not None + and table_client_settings._native_timestamp_in_result_sets + ): + return _EPOCH + timedelta(microseconds=value_pb.uint64_value) + return value_pb.uint64_value + + +def _to_timestamp(pb, value): + if isinstance(value, datetime): + pb.uint64_value = _timedelta_to_microseconds(value - _EPOCH) + else: + pb.uint64_value = value + + @enum.unique class PrimitiveType(enum.Enum): """ @@ -81,8 +119,7 @@ class PrimitiveType(enum.Enum): Yson = _apis.primitive_types.YSON, "bytes_value" Json = _apis.primitive_types.JSON, "text_value", _from_json JsonDocument = _apis.primitive_types.JSON_DOCUMENT, "text_value", _from_json - UUID = _apis.primitive_types.UUID, None, _to_uuid, _from_uuid - + UUID = (_apis.primitive_types.UUID, None, _to_uuid, _from_uuid) Date = ( _apis.primitive_types.DATE, "uint32_value", @@ -93,8 +130,18 @@ class PrimitiveType(enum.Enum): "uint32_value", _from_datetime_number, ) - Timestamp = _apis.primitive_types.TIMESTAMP, "uint64_value" - Interval = _apis.primitive_types.INTERVAL, "int64_value" + Timestamp = ( + _apis.primitive_types.TIMESTAMP, + None, + _from_timestamp, + _to_timestamp, + ) + Interval = ( + _apis.primitive_types.INTERVAL, + None, + _from_interval, + _to_interval, + ) DyNumber = _apis.primitive_types.DYNUMBER, "text_value", _from_bytes diff --git a/ydb/public/sdk/python/ydb/ydb_version.py b/ydb/public/sdk/python/ydb/ydb_version.py index 84e6495dd8..7035cc17c4 100644 --- a/ydb/public/sdk/python/ydb/ydb_version.py +++ b/ydb/public/sdk/python/ydb/ydb_version.py @@ -1 +1 @@ -VERSION = "2.5.0" +VERSION = "2.8.0" |