diff options
author | nkozlovskiy <nmk@ydb.tech> | 2023-09-29 12:24:06 +0300 |
---|---|---|
committer | nkozlovskiy <nmk@ydb.tech> | 2023-09-29 12:41:34 +0300 |
commit | e0e3e1717e3d33762ce61950504f9637a6e669ed (patch) | |
tree | bca3ff6939b10ed60c3d5c12439963a1146b9711 /contrib/python/s3transfer/py2/tests/functional/test_manager.py | |
parent | 38f2c5852db84c7b4d83adfcb009eb61541d1ccd (diff) | |
download | ydb-e0e3e1717e3d33762ce61950504f9637a6e669ed.tar.gz |
add ydb deps
Diffstat (limited to 'contrib/python/s3transfer/py2/tests/functional/test_manager.py')
-rw-r--r-- | contrib/python/s3transfer/py2/tests/functional/test_manager.py | 184 |
1 files changed, 184 insertions, 0 deletions
diff --git a/contrib/python/s3transfer/py2/tests/functional/test_manager.py b/contrib/python/s3transfer/py2/tests/functional/test_manager.py new file mode 100644 index 0000000000..a6db74b262 --- /dev/null +++ b/contrib/python/s3transfer/py2/tests/functional/test_manager.py @@ -0,0 +1,184 @@ +# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the 'License'). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the 'license' file accompanying this file. This file is +# distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from io import BytesIO +from botocore.awsrequest import create_request_object +import mock + +from __tests__ import skip_if_using_serial_implementation +from __tests__ import StubbedClientTest +from s3transfer.exceptions import CancelledError +from s3transfer.exceptions import FatalError +from s3transfer.futures import BaseExecutor +from s3transfer.manager import TransferManager +from s3transfer.manager import TransferConfig + + +class ArbitraryException(Exception): + pass + + +class SignalTransferringBody(BytesIO): + """A mocked body with the ability to signal when transfers occur""" + def __init__(self): + super(SignalTransferringBody, self).__init__() + self.signal_transferring_call_count = 0 + self.signal_not_transferring_call_count = 0 + + def signal_transferring(self): + self.signal_transferring_call_count += 1 + + def signal_not_transferring(self): + self.signal_not_transferring_call_count += 1 + + def seek(self, where, whence=0): + pass + + def tell(self): + return 0 + + def read(self, amount=0): + return b'' + + +class TestTransferManager(StubbedClientTest): + @skip_if_using_serial_implementation( + 'Exception is thrown once all transfers are submitted. ' + 'However for the serial implementation, transfers are performed ' + 'in main thread meaning all transfers will complete before the ' + 'exception being thrown.' + ) + def test_error_in_context_manager_cancels_incomplete_transfers(self): + # The purpose of this test is to make sure if an error is raised + # in the body of the context manager, incomplete transfers will + # be cancelled with value of the exception wrapped by a CancelledError + + # NOTE: The fact that delete() was chosen to test this is arbitrary + # other than it is the easiet to set up for the stubber. + # The specific operation is not important to the purpose of this test. + num_transfers = 100 + futures = [] + ref_exception_msg = 'arbitrary exception' + + for _ in range(num_transfers): + self.stubber.add_response('delete_object', {}) + + manager = TransferManager( + self.client, + TransferConfig( + max_request_concurrency=1, max_submission_concurrency=1) + ) + try: + with manager: + for i in range(num_transfers): + futures.append(manager.delete('mybucket', 'mykey')) + raise ArbitraryException(ref_exception_msg) + except ArbitraryException: + # At least one of the submitted futures should have been + # cancelled. + with self.assertRaisesRegexp(FatalError, ref_exception_msg): + for future in futures: + future.result() + + @skip_if_using_serial_implementation( + 'Exception is thrown once all transfers are submitted. ' + 'However for the serial implementation, transfers are performed ' + 'in main thread meaning all transfers will complete before the ' + 'exception being thrown.' + ) + def test_cntrl_c_in_context_manager_cancels_incomplete_transfers(self): + # The purpose of this test is to make sure if an error is raised + # in the body of the context manager, incomplete transfers will + # be cancelled with value of the exception wrapped by a CancelledError + + # NOTE: The fact that delete() was chosen to test this is arbitrary + # other than it is the easiet to set up for the stubber. + # The specific operation is not important to the purpose of this test. + num_transfers = 100 + futures = [] + + for _ in range(num_transfers): + self.stubber.add_response('delete_object', {}) + + manager = TransferManager( + self.client, + TransferConfig( + max_request_concurrency=1, max_submission_concurrency=1) + ) + try: + with manager: + for i in range(num_transfers): + futures.append(manager.delete('mybucket', 'mykey')) + raise KeyboardInterrupt() + except KeyboardInterrupt: + # At least one of the submitted futures should have been + # cancelled. + with self.assertRaisesRegexp( + CancelledError, 'KeyboardInterrupt()'): + for future in futures: + future.result() + + def test_enable_disable_callbacks_only_ever_registered_once(self): + body = SignalTransferringBody() + request = create_request_object({ + 'method': 'PUT', + 'url': 'https://s3.amazonaws.com', + 'body': body, + 'headers': {}, + 'context': {} + }) + # Create two TransferManager's using the same client + TransferManager(self.client) + TransferManager(self.client) + self.client.meta.events.emit( + 'request-created.s3', request=request, operation_name='PutObject') + # The client should have only have the enable/disable callback + # handlers registered once depite being used for two different + # TransferManagers. + self.assertEqual( + body.signal_transferring_call_count, 1, + 'The enable_callback() should have only ever been registered once') + self.assertEqual( + body.signal_not_transferring_call_count, 1, + 'The disable_callback() should have only ever been registered ' + 'once') + + def test_use_custom_executor_implementation(self): + mocked_executor_cls = mock.Mock(BaseExecutor) + transfer_manager = TransferManager( + self.client, executor_cls=mocked_executor_cls) + transfer_manager.delete('bucket', 'key') + self.assertTrue(mocked_executor_cls.return_value.submit.called) + + def test_unicode_exception_in_context_manager(self): + with self.assertRaises(ArbitraryException): + with TransferManager(self.client): + raise ArbitraryException(u'\u2713') + + def test_client_property(self): + manager = TransferManager(self.client) + self.assertIs(manager.client, self.client) + + def test_config_property(self): + config = TransferConfig() + manager = TransferManager(self.client, config) + self.assertIs(manager.config, config) + + def test_can_disable_bucket_validation(self): + s3_object_lambda_arn = ( + 'arn:aws:s3-object-lambda:us-west-2:123456789012:' + 'accesspoint:my-accesspoint' + ) + config = TransferConfig() + manager = TransferManager(self.client, config) + manager.VALIDATE_SUPPORTED_BUCKET_VALUES = False + manager.delete(s3_object_lambda_arn, 'my-key') |