diff options
author | rekby <rekby@ydb.tech> | 2022-11-23 13:10:47 +0300 |
---|---|---|
committer | rekby <rekby@ydb.tech> | 2022-11-23 13:10:47 +0300 |
commit | caf24070c3ecf8d3dfb071b2147e9afe89a4e1ac (patch) | |
tree | 0eaca273dbc561dde37a0356f3b2e8767d66ec5e | |
parent | f611c09f526b41bb046026cd6336f6280926816c (diff) | |
download | ydb-caf24070c3ecf8d3dfb071b2147e9afe89a4e1ac.tar.gz |
remove s3internal client for prevent dependency from draft protobuf
-rw-r--r-- | ydb/public/sdk/python/ydb/__init__.py | 1 | ||||
-rw-r--r-- | ydb/public/sdk/python/ydb/s3list.py | 193 | ||||
-rw-r--r-- | ydb/tests/functional/api/test_s3_listing.py | 153 |
3 files changed, 0 insertions, 347 deletions
diff --git a/ydb/public/sdk/python/ydb/__init__.py b/ydb/public/sdk/python/ydb/__init__.py index 5cbe4f1a2d6..d8c23fee4cf 100644 --- a/ydb/public/sdk/python/ydb/__init__.py +++ b/ydb/public/sdk/python/ydb/__init__.py @@ -7,7 +7,6 @@ from .scheme import * # noqa from .settings import * # noqa from .resolver import * # noqa from .export import * # noqa -from .s3list import * # noqa from .auth_helpers import * # noqa from .operation import * # noqa from .scripting import * # noqa diff --git a/ydb/public/sdk/python/ydb/s3list.py b/ydb/public/sdk/python/ydb/s3list.py deleted file mode 100644 index 5b3825933cd..00000000000 --- a/ydb/public/sdk/python/ydb/s3list.py +++ /dev/null @@ -1,193 +0,0 @@ -# -*- coding: utf-8 -*- -from ydb.public.api.grpc.draft import ydb_s3_internal_v1_pb2_grpc -from ydb.public.api.protos import ydb_s3_internal_pb2 -from . import convert, issues, types - - -_S3Listing = "S3Listing" - - -def _prepare_tuple_typed_value(vl): - if vl is None: - return None - - tpb = types.TupleType() - for _ in vl: - tpb.add_element(types.OptionalType(types.PrimitiveType.Utf8)) - - return convert.to_typed_value_from_native(tpb.proto, vl) - - -def _s3_listing_request_factory( - table_name, - key_prefix, - path_column_prefix, - path_column_delimiter, - max_keys, - columns_to_return=None, - start_after_key_suffix=None, -): - columns_to_return = [] if columns_to_return is None else columns_to_return - start_after_key_suffix = ( - [] if start_after_key_suffix is None else start_after_key_suffix - ) - return ydb_s3_internal_pb2.S3ListingRequest( - table_name=table_name, - key_prefix=_prepare_tuple_typed_value(key_prefix), - path_column_prefix=path_column_prefix, - path_column_delimiter=path_column_delimiter, - max_keys=max_keys + 1, - columns_to_return=columns_to_return, - start_after_key_suffix=_prepare_tuple_typed_value(start_after_key_suffix), - ) - - -class S3ListingResult(object): - __slots__ = ( - "is_truncated", - "continue_after", - "path_column_delimiter", - "common_prefixes", - "contents", - ) - - def __init__( - self, - is_truncated, - continue_after, - path_column_delimiter, - common_prefixes, - contents, - ): - self.is_truncated = is_truncated - self.continue_after = continue_after - self.path_column_delimiter = path_column_delimiter - self.common_prefixes = common_prefixes - self.contents = contents - - -def _wrap_s3_listing_response(rpc_state, response, path_column_delimiter, max_keys): - issues._process_response(response.operation) - message = ydb_s3_internal_pb2.S3ListingResult() - response.operation.result.Unpack(message) - common_prefixes_rs = convert.ResultSet.from_message(message.common_prefixes) - common_prefixes = [row[0] for row in common_prefixes_rs.rows] - contents = convert.ResultSet.from_message(message.contents).rows - key_suffix_size = max(1, message.key_suffix_size) - - continue_after = None - is_truncated = False - if len(contents) + len(common_prefixes) > max_keys: - is_truncated = True - - if len(contents) > 0 and len(common_prefixes) > 0: - if contents[-1][0] < common_prefixes[-1]: - common_prefixes.pop() - else: - contents.pop() - elif len(contents) > 0: - contents.pop() - elif len(common_prefixes) > 0: - common_prefixes.pop() - - use_contents_for_continue_after = True - if len(contents) > 0 and len(common_prefixes) > 0: - use_contents_for_continue_after = contents[-1][0] > common_prefixes[-1] - elif len(common_prefixes) > 0: - use_contents_for_continue_after = False - - if use_contents_for_continue_after: - continue_after = contents[-1][:key_suffix_size] - else: - continue_after = [common_prefixes[-1]] - - return S3ListingResult( - is_truncated=is_truncated, - continue_after=continue_after, - path_column_delimiter=path_column_delimiter, - common_prefixes=common_prefixes, - contents=contents, - ) - - -class S3InternalClient(object): - def __init__(self, driver): - self._driver = driver - - @staticmethod - def _handle( - callee, - table_name, - key_prefix, - path_column_prefix, - path_column_delimiter, - max_keys, - columns_to_return=None, - start_after_key_suffix=None, - settings=None, - ): - return callee( - _s3_listing_request_factory( - table_name, - key_prefix, - path_column_prefix, - path_column_delimiter, - max_keys, - columns_to_return, - start_after_key_suffix, - ), - ydb_s3_internal_v1_pb2_grpc.S3InternalServiceStub, - _S3Listing, - _wrap_s3_listing_response, - settings, - ( - path_column_delimiter, - max_keys, - ), - ) - - def s3_listing( - self, - table_name, - key_prefix, - path_column_prefix, - path_column_delimiter, - max_keys, - columns_to_return=None, - start_after_key_suffix=None, - settings=None, - ): - return self._handle( - self._driver, - table_name, - key_prefix, - path_column_prefix, - path_column_delimiter, - max_keys, - columns_to_return, - start_after_key_suffix, - settings, - ) - - def async_s3_listing( - self, - table_name, - key_prefix, - path_column_prefix, - path_column_delimiter, - max_keys, - columns_to_return=None, - start_after_key_suffix=None, - settings=None, - ): - return self._handle( - self._driver.future, - table_name, - key_prefix, - path_column_prefix, - path_column_delimiter, - max_keys, - columns_to_return, - start_after_key_suffix, - settings, - ) diff --git a/ydb/tests/functional/api/test_s3_listing.py b/ydb/tests/functional/api/test_s3_listing.py deleted file mode 100644 index db1a2211ec5..00000000000 --- a/ydb/tests/functional/api/test_s3_listing.py +++ /dev/null @@ -1,153 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import logging - -from hamcrest import assert_that, equal_to, not_, contains - -from ydb.tests.library.harness.kikimr_cluster import kikimr_cluster_factory -import ydb as ydb - - -logger = logging.getLogger(__name__) - - -class TestS3ListingAPI(object): - @classmethod - def setup_class(cls): - cls.cluster = kikimr_cluster_factory() - cls.cluster.start() - cls.driver = ydb.Driver( - ydb.DriverConfig("%s:%s" % (cls.cluster.nodes[1].host, cls.cluster.nodes[1].port))) - cls.s3client = ydb.S3InternalClient(cls.driver) - cls.pool = ydb.SessionPool(cls.driver) - - @classmethod - def teardown_class(cls): - if hasattr(cls, 'kikimr_cluster'): - cls.kikimr_cluster.stop() - - def _prepare_test_data(self, table, bucket, paths): - with self.pool.checkout() as session: - session.execute_scheme(""" - create table `{table_name}` ( - BucketName Utf8, - DataMd5 String, - DataSize Uint64, - DataSource String, - DataSourceType String, - Metadata String, - MetadataType String, - Name utf8, - PartsCount Uint64, - UploadStartedUsec Uint64, - primary key (BucketName, Name) - ); - """.format(table_name=table)) - - for path in paths: - with self.pool.checkout() as session: - session.transaction().execute( - "upsert into `{table_name}` (BucketName, Name, Metadata) values" - " ('{bucket}', '{path}', 'some data');".format(bucket=bucket, table_name=table, path=path), - commit_tx=True - ) - - def test_s3_listing_full(self): - table = '/Root/S3/Objects' - - bucket1_paths = ['/home/test_{name}'.format(name=x) for x in range(50)] - bucket1_paths.extend(['/home/test_{name}/main.cpp'.format(name=x+10) for x in range(30)]) - - self._prepare_test_data(table, 'Bucket1', bucket1_paths) - - bucket2_paths = ['asdf', 'boo/bar', 'boo/baz/xyzzy', 'cquux/thud', 'cquux/bla'] - self._prepare_test_data(table, 'Bucket2', bucket2_paths) - - self.do_test(bucket1_paths, table, 'Bucket1', '', '/', 7, ['DataMd5', 'Metadata']) - self.do_test(bucket1_paths, table, 'Bucket1', '', '', 7, ['DataMd5', 'Metadata']) - self.do_test(bucket1_paths, table, 'Bucket1', '/home', '/', 7, ['DataMd5', 'Metadata']) - self.do_test(bucket1_paths, table, 'Bucket1', '/home/', '/', 7, ['DataMd5', 'Metadata']) - self.do_test(bucket1_paths, table, 'Bucket1', '/etc/', '/', 7, ['DataMd5', 'Metadata']) - self.do_test(bucket1_paths, table, 'Bucket1', '/etc/', '', 7, ['DataMd5', 'Metadata']) - self.do_test(bucket1_paths, table, 'Bucket1', '/home/te', '/', 7, ['DataMd5', 'Metadata']) - self.do_test(bucket1_paths, table, 'Bucket1', '/home/te', '', 7, ['DataMd5', 'Metadata']) - self.do_test(bucket1_paths, table, 'Bucket1', '/home/test_1', '', 7, ['DataMd5', 'Metadata']) - self.do_test(bucket1_paths, table, 'Bucket1', '/home/test_1', '/', 7, ['DataMd5', 'Metadata']) - self.do_test(bucket1_paths, table, 'Bucket1', '/home/test_1/', '', 7, ['DataMd5', 'Metadata']) - self.do_test(bucket1_paths, table, 'Bucket1', '/home/test_1/', '/', 7, ['DataMd5', 'Metadata']) - - for max_keys in range(2, 10): - self.do_test(bucket2_paths, table, 'Bucket2', '', '', max_keys, ['DataMd5', 'Metadata']) - self.do_test(bucket2_paths, table, 'Bucket2', '', '/', max_keys, ['DataMd5', 'Metadata']) - self.do_test(bucket1_paths, table, 'Bucket1', '/home/', '/', max_keys, ['DataMd5', 'Metadata']) - - def test_s3_listing_max_keys(self): - table = '/Root/S3/Objects' - - bucket2_paths = ['asdf', 'boo/bar', 'boo/baz/xyzzy', 'cquux/thud', 'cquux/bla'] - self._prepare_test_data(table, 'Bucket2', bucket2_paths) - - result = self.s3client.s3_listing(table, ['Bucket2'], '', '/', 2, [], '') - logger.debug(result) - assert_that(result.common_prefixes, equal_to(['boo/'])) - assert_that(len(result.contents), equal_to(1)) - assert_that(result.contents[0][0], equal_to('asdf')) - - def do_test(self, all_paths, table, bucket, prefix, delimiter, step, columns): - # compares real result from s3 listing reques to the expected result - prefixes, contents = self.do_s3_listing(table, [bucket], prefix, delimiter, step, columns) - logger.debug(prefixes) - logger.debug(contents) - expected_prefixes, expected_contents = self.emulate_listing(all_paths, prefix, delimiter) - logger.debug(expected_prefixes) - logger.debug(expected_contents) - assert_that(prefixes, equal_to(expected_prefixes)) - assert_that(contents, equal_to(expected_contents)) - - def do_s3_listing(self, table, prefix_columns, path_prefix, delimiter, step, columns): - start_after = None - more_data = True - all_common_prefixes = set() - all_contents = set() - for i in range(1, 50): - result = self.s3client.s3_listing( - table, prefix_columns, path_prefix, delimiter, step, columns, start_after) - # logger.debug(result) - - for prefix in result.common_prefixes: - assert_that(all_common_prefixes, not_(contains(prefix))) - all_common_prefixes.add(prefix) - - for obj in result.contents: - path = obj[0] - assert_that(all_contents, not_(contains(path))) - all_contents.add(path) - - more_data = result.is_truncated - if more_data: - # logger.debug('ContinueAfter: ' + str(result.continue_after)) - start_after = result.continue_after - else: - break - assert_that(not more_data) - return all_common_prefixes, all_contents - - def emulate_listing(self, all_paths, path_prefix, delimiter): - # filter all_paths based on prefix and delimiter - # in order to produce the result similar to s3 listing logic - all_common_prefixes = set() - all_contents = set() - for p in all_paths: - if not p.startswith(path_prefix): - continue - if delimiter == '': - all_contents.add(p) - continue - delim_pos = p.find(delimiter, len(path_prefix)) - if delim_pos == -1: - all_contents.add(p) - else: - all_common_prefixes.add(p[:delim_pos+1]) - - return all_common_prefixes, all_contents |