aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrekby <rekby@ydb.tech>2022-11-23 13:10:47 +0300
committerrekby <rekby@ydb.tech>2022-11-23 13:10:47 +0300
commitcaf24070c3ecf8d3dfb071b2147e9afe89a4e1ac (patch)
tree0eaca273dbc561dde37a0356f3b2e8767d66ec5e
parentf611c09f526b41bb046026cd6336f6280926816c (diff)
downloadydb-caf24070c3ecf8d3dfb071b2147e9afe89a4e1ac.tar.gz
remove s3internal client for prevent dependency from draft protobuf
-rw-r--r--ydb/public/sdk/python/ydb/__init__.py1
-rw-r--r--ydb/public/sdk/python/ydb/s3list.py193
-rw-r--r--ydb/tests/functional/api/test_s3_listing.py153
3 files changed, 0 insertions, 347 deletions
diff --git a/ydb/public/sdk/python/ydb/__init__.py b/ydb/public/sdk/python/ydb/__init__.py
index 5cbe4f1a2d6..d8c23fee4cf 100644
--- a/ydb/public/sdk/python/ydb/__init__.py
+++ b/ydb/public/sdk/python/ydb/__init__.py
@@ -7,7 +7,6 @@ from .scheme import * # noqa
from .settings import * # noqa
from .resolver import * # noqa
from .export import * # noqa
-from .s3list import * # noqa
from .auth_helpers import * # noqa
from .operation import * # noqa
from .scripting import * # noqa
diff --git a/ydb/public/sdk/python/ydb/s3list.py b/ydb/public/sdk/python/ydb/s3list.py
deleted file mode 100644
index 5b3825933cd..00000000000
--- a/ydb/public/sdk/python/ydb/s3list.py
+++ /dev/null
@@ -1,193 +0,0 @@
-# -*- coding: utf-8 -*-
-from ydb.public.api.grpc.draft import ydb_s3_internal_v1_pb2_grpc
-from ydb.public.api.protos import ydb_s3_internal_pb2
-from . import convert, issues, types
-
-
-_S3Listing = "S3Listing"
-
-
-def _prepare_tuple_typed_value(vl):
- if vl is None:
- return None
-
- tpb = types.TupleType()
- for _ in vl:
- tpb.add_element(types.OptionalType(types.PrimitiveType.Utf8))
-
- return convert.to_typed_value_from_native(tpb.proto, vl)
-
-
-def _s3_listing_request_factory(
- table_name,
- key_prefix,
- path_column_prefix,
- path_column_delimiter,
- max_keys,
- columns_to_return=None,
- start_after_key_suffix=None,
-):
- columns_to_return = [] if columns_to_return is None else columns_to_return
- start_after_key_suffix = (
- [] if start_after_key_suffix is None else start_after_key_suffix
- )
- return ydb_s3_internal_pb2.S3ListingRequest(
- table_name=table_name,
- key_prefix=_prepare_tuple_typed_value(key_prefix),
- path_column_prefix=path_column_prefix,
- path_column_delimiter=path_column_delimiter,
- max_keys=max_keys + 1,
- columns_to_return=columns_to_return,
- start_after_key_suffix=_prepare_tuple_typed_value(start_after_key_suffix),
- )
-
-
-class S3ListingResult(object):
- __slots__ = (
- "is_truncated",
- "continue_after",
- "path_column_delimiter",
- "common_prefixes",
- "contents",
- )
-
- def __init__(
- self,
- is_truncated,
- continue_after,
- path_column_delimiter,
- common_prefixes,
- contents,
- ):
- self.is_truncated = is_truncated
- self.continue_after = continue_after
- self.path_column_delimiter = path_column_delimiter
- self.common_prefixes = common_prefixes
- self.contents = contents
-
-
-def _wrap_s3_listing_response(rpc_state, response, path_column_delimiter, max_keys):
- issues._process_response(response.operation)
- message = ydb_s3_internal_pb2.S3ListingResult()
- response.operation.result.Unpack(message)
- common_prefixes_rs = convert.ResultSet.from_message(message.common_prefixes)
- common_prefixes = [row[0] for row in common_prefixes_rs.rows]
- contents = convert.ResultSet.from_message(message.contents).rows
- key_suffix_size = max(1, message.key_suffix_size)
-
- continue_after = None
- is_truncated = False
- if len(contents) + len(common_prefixes) > max_keys:
- is_truncated = True
-
- if len(contents) > 0 and len(common_prefixes) > 0:
- if contents[-1][0] < common_prefixes[-1]:
- common_prefixes.pop()
- else:
- contents.pop()
- elif len(contents) > 0:
- contents.pop()
- elif len(common_prefixes) > 0:
- common_prefixes.pop()
-
- use_contents_for_continue_after = True
- if len(contents) > 0 and len(common_prefixes) > 0:
- use_contents_for_continue_after = contents[-1][0] > common_prefixes[-1]
- elif len(common_prefixes) > 0:
- use_contents_for_continue_after = False
-
- if use_contents_for_continue_after:
- continue_after = contents[-1][:key_suffix_size]
- else:
- continue_after = [common_prefixes[-1]]
-
- return S3ListingResult(
- is_truncated=is_truncated,
- continue_after=continue_after,
- path_column_delimiter=path_column_delimiter,
- common_prefixes=common_prefixes,
- contents=contents,
- )
-
-
-class S3InternalClient(object):
- def __init__(self, driver):
- self._driver = driver
-
- @staticmethod
- def _handle(
- callee,
- table_name,
- key_prefix,
- path_column_prefix,
- path_column_delimiter,
- max_keys,
- columns_to_return=None,
- start_after_key_suffix=None,
- settings=None,
- ):
- return callee(
- _s3_listing_request_factory(
- table_name,
- key_prefix,
- path_column_prefix,
- path_column_delimiter,
- max_keys,
- columns_to_return,
- start_after_key_suffix,
- ),
- ydb_s3_internal_v1_pb2_grpc.S3InternalServiceStub,
- _S3Listing,
- _wrap_s3_listing_response,
- settings,
- (
- path_column_delimiter,
- max_keys,
- ),
- )
-
- def s3_listing(
- self,
- table_name,
- key_prefix,
- path_column_prefix,
- path_column_delimiter,
- max_keys,
- columns_to_return=None,
- start_after_key_suffix=None,
- settings=None,
- ):
- return self._handle(
- self._driver,
- table_name,
- key_prefix,
- path_column_prefix,
- path_column_delimiter,
- max_keys,
- columns_to_return,
- start_after_key_suffix,
- settings,
- )
-
- def async_s3_listing(
- self,
- table_name,
- key_prefix,
- path_column_prefix,
- path_column_delimiter,
- max_keys,
- columns_to_return=None,
- start_after_key_suffix=None,
- settings=None,
- ):
- return self._handle(
- self._driver.future,
- table_name,
- key_prefix,
- path_column_prefix,
- path_column_delimiter,
- max_keys,
- columns_to_return,
- start_after_key_suffix,
- settings,
- )
diff --git a/ydb/tests/functional/api/test_s3_listing.py b/ydb/tests/functional/api/test_s3_listing.py
deleted file mode 100644
index db1a2211ec5..00000000000
--- a/ydb/tests/functional/api/test_s3_listing.py
+++ /dev/null
@@ -1,153 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-import logging
-
-from hamcrest import assert_that, equal_to, not_, contains
-
-from ydb.tests.library.harness.kikimr_cluster import kikimr_cluster_factory
-import ydb as ydb
-
-
-logger = logging.getLogger(__name__)
-
-
-class TestS3ListingAPI(object):
- @classmethod
- def setup_class(cls):
- cls.cluster = kikimr_cluster_factory()
- cls.cluster.start()
- cls.driver = ydb.Driver(
- ydb.DriverConfig("%s:%s" % (cls.cluster.nodes[1].host, cls.cluster.nodes[1].port)))
- cls.s3client = ydb.S3InternalClient(cls.driver)
- cls.pool = ydb.SessionPool(cls.driver)
-
- @classmethod
- def teardown_class(cls):
- if hasattr(cls, 'kikimr_cluster'):
- cls.kikimr_cluster.stop()
-
- def _prepare_test_data(self, table, bucket, paths):
- with self.pool.checkout() as session:
- session.execute_scheme("""
- create table `{table_name}` (
- BucketName Utf8,
- DataMd5 String,
- DataSize Uint64,
- DataSource String,
- DataSourceType String,
- Metadata String,
- MetadataType String,
- Name utf8,
- PartsCount Uint64,
- UploadStartedUsec Uint64,
- primary key (BucketName, Name)
- );
- """.format(table_name=table))
-
- for path in paths:
- with self.pool.checkout() as session:
- session.transaction().execute(
- "upsert into `{table_name}` (BucketName, Name, Metadata) values"
- " ('{bucket}', '{path}', 'some data');".format(bucket=bucket, table_name=table, path=path),
- commit_tx=True
- )
-
- def test_s3_listing_full(self):
- table = '/Root/S3/Objects'
-
- bucket1_paths = ['/home/test_{name}'.format(name=x) for x in range(50)]
- bucket1_paths.extend(['/home/test_{name}/main.cpp'.format(name=x+10) for x in range(30)])
-
- self._prepare_test_data(table, 'Bucket1', bucket1_paths)
-
- bucket2_paths = ['asdf', 'boo/bar', 'boo/baz/xyzzy', 'cquux/thud', 'cquux/bla']
- self._prepare_test_data(table, 'Bucket2', bucket2_paths)
-
- self.do_test(bucket1_paths, table, 'Bucket1', '', '/', 7, ['DataMd5', 'Metadata'])
- self.do_test(bucket1_paths, table, 'Bucket1', '', '', 7, ['DataMd5', 'Metadata'])
- self.do_test(bucket1_paths, table, 'Bucket1', '/home', '/', 7, ['DataMd5', 'Metadata'])
- self.do_test(bucket1_paths, table, 'Bucket1', '/home/', '/', 7, ['DataMd5', 'Metadata'])
- self.do_test(bucket1_paths, table, 'Bucket1', '/etc/', '/', 7, ['DataMd5', 'Metadata'])
- self.do_test(bucket1_paths, table, 'Bucket1', '/etc/', '', 7, ['DataMd5', 'Metadata'])
- self.do_test(bucket1_paths, table, 'Bucket1', '/home/te', '/', 7, ['DataMd5', 'Metadata'])
- self.do_test(bucket1_paths, table, 'Bucket1', '/home/te', '', 7, ['DataMd5', 'Metadata'])
- self.do_test(bucket1_paths, table, 'Bucket1', '/home/test_1', '', 7, ['DataMd5', 'Metadata'])
- self.do_test(bucket1_paths, table, 'Bucket1', '/home/test_1', '/', 7, ['DataMd5', 'Metadata'])
- self.do_test(bucket1_paths, table, 'Bucket1', '/home/test_1/', '', 7, ['DataMd5', 'Metadata'])
- self.do_test(bucket1_paths, table, 'Bucket1', '/home/test_1/', '/', 7, ['DataMd5', 'Metadata'])
-
- for max_keys in range(2, 10):
- self.do_test(bucket2_paths, table, 'Bucket2', '', '', max_keys, ['DataMd5', 'Metadata'])
- self.do_test(bucket2_paths, table, 'Bucket2', '', '/', max_keys, ['DataMd5', 'Metadata'])
- self.do_test(bucket1_paths, table, 'Bucket1', '/home/', '/', max_keys, ['DataMd5', 'Metadata'])
-
- def test_s3_listing_max_keys(self):
- table = '/Root/S3/Objects'
-
- bucket2_paths = ['asdf', 'boo/bar', 'boo/baz/xyzzy', 'cquux/thud', 'cquux/bla']
- self._prepare_test_data(table, 'Bucket2', bucket2_paths)
-
- result = self.s3client.s3_listing(table, ['Bucket2'], '', '/', 2, [], '')
- logger.debug(result)
- assert_that(result.common_prefixes, equal_to(['boo/']))
- assert_that(len(result.contents), equal_to(1))
- assert_that(result.contents[0][0], equal_to('asdf'))
-
- def do_test(self, all_paths, table, bucket, prefix, delimiter, step, columns):
- # compares real result from s3 listing reques to the expected result
- prefixes, contents = self.do_s3_listing(table, [bucket], prefix, delimiter, step, columns)
- logger.debug(prefixes)
- logger.debug(contents)
- expected_prefixes, expected_contents = self.emulate_listing(all_paths, prefix, delimiter)
- logger.debug(expected_prefixes)
- logger.debug(expected_contents)
- assert_that(prefixes, equal_to(expected_prefixes))
- assert_that(contents, equal_to(expected_contents))
-
- def do_s3_listing(self, table, prefix_columns, path_prefix, delimiter, step, columns):
- start_after = None
- more_data = True
- all_common_prefixes = set()
- all_contents = set()
- for i in range(1, 50):
- result = self.s3client.s3_listing(
- table, prefix_columns, path_prefix, delimiter, step, columns, start_after)
- # logger.debug(result)
-
- for prefix in result.common_prefixes:
- assert_that(all_common_prefixes, not_(contains(prefix)))
- all_common_prefixes.add(prefix)
-
- for obj in result.contents:
- path = obj[0]
- assert_that(all_contents, not_(contains(path)))
- all_contents.add(path)
-
- more_data = result.is_truncated
- if more_data:
- # logger.debug('ContinueAfter: ' + str(result.continue_after))
- start_after = result.continue_after
- else:
- break
- assert_that(not more_data)
- return all_common_prefixes, all_contents
-
- def emulate_listing(self, all_paths, path_prefix, delimiter):
- # filter all_paths based on prefix and delimiter
- # in order to produce the result similar to s3 listing logic
- all_common_prefixes = set()
- all_contents = set()
- for p in all_paths:
- if not p.startswith(path_prefix):
- continue
- if delimiter == '':
- all_contents.add(p)
- continue
- delim_pos = p.find(delimiter, len(path_prefix))
- if delim_pos == -1:
- all_contents.add(p)
- else:
- all_common_prefixes.add(p[:delim_pos+1])
-
- return all_common_prefixes, all_contents