aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/s3transfer/py2/tests/functional/test_manager.py
diff options
context:
space:
mode:
authornkozlovskiy <nmk@ydb.tech>2023-09-29 12:24:06 +0300
committernkozlovskiy <nmk@ydb.tech>2023-09-29 12:41:34 +0300
commite0e3e1717e3d33762ce61950504f9637a6e669ed (patch)
treebca3ff6939b10ed60c3d5c12439963a1146b9711 /contrib/python/s3transfer/py2/tests/functional/test_manager.py
parent38f2c5852db84c7b4d83adfcb009eb61541d1ccd (diff)
downloadydb-e0e3e1717e3d33762ce61950504f9637a6e669ed.tar.gz
add ydb deps
Diffstat (limited to 'contrib/python/s3transfer/py2/tests/functional/test_manager.py')
-rw-r--r--contrib/python/s3transfer/py2/tests/functional/test_manager.py184
1 files changed, 184 insertions, 0 deletions
diff --git a/contrib/python/s3transfer/py2/tests/functional/test_manager.py b/contrib/python/s3transfer/py2/tests/functional/test_manager.py
new file mode 100644
index 0000000000..a6db74b262
--- /dev/null
+++ b/contrib/python/s3transfer/py2/tests/functional/test_manager.py
@@ -0,0 +1,184 @@
+# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the 'License'). You
+# may not use this file except in compliance with the License. A copy of
+# the License is located at
+#
+# http://aws.amazon.com/apache2.0/
+#
+# or in the 'license' file accompanying this file. This file is
+# distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+# ANY KIND, either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
+from io import BytesIO
+from botocore.awsrequest import create_request_object
+import mock
+
+from __tests__ import skip_if_using_serial_implementation
+from __tests__ import StubbedClientTest
+from s3transfer.exceptions import CancelledError
+from s3transfer.exceptions import FatalError
+from s3transfer.futures import BaseExecutor
+from s3transfer.manager import TransferManager
+from s3transfer.manager import TransferConfig
+
+
+class ArbitraryException(Exception):
+ pass
+
+
+class SignalTransferringBody(BytesIO):
+ """A mocked body with the ability to signal when transfers occur"""
+ def __init__(self):
+ super(SignalTransferringBody, self).__init__()
+ self.signal_transferring_call_count = 0
+ self.signal_not_transferring_call_count = 0
+
+ def signal_transferring(self):
+ self.signal_transferring_call_count += 1
+
+ def signal_not_transferring(self):
+ self.signal_not_transferring_call_count += 1
+
+ def seek(self, where, whence=0):
+ pass
+
+ def tell(self):
+ return 0
+
+ def read(self, amount=0):
+ return b''
+
+
+class TestTransferManager(StubbedClientTest):
+ @skip_if_using_serial_implementation(
+ 'Exception is thrown once all transfers are submitted. '
+ 'However for the serial implementation, transfers are performed '
+ 'in main thread meaning all transfers will complete before the '
+ 'exception being thrown.'
+ )
+ def test_error_in_context_manager_cancels_incomplete_transfers(self):
+ # The purpose of this test is to make sure if an error is raised
+ # in the body of the context manager, incomplete transfers will
+ # be cancelled with value of the exception wrapped by a CancelledError
+
+ # NOTE: The fact that delete() was chosen to test this is arbitrary
+ # other than it is the easiet to set up for the stubber.
+ # The specific operation is not important to the purpose of this test.
+ num_transfers = 100
+ futures = []
+ ref_exception_msg = 'arbitrary exception'
+
+ for _ in range(num_transfers):
+ self.stubber.add_response('delete_object', {})
+
+ manager = TransferManager(
+ self.client,
+ TransferConfig(
+ max_request_concurrency=1, max_submission_concurrency=1)
+ )
+ try:
+ with manager:
+ for i in range(num_transfers):
+ futures.append(manager.delete('mybucket', 'mykey'))
+ raise ArbitraryException(ref_exception_msg)
+ except ArbitraryException:
+ # At least one of the submitted futures should have been
+ # cancelled.
+ with self.assertRaisesRegexp(FatalError, ref_exception_msg):
+ for future in futures:
+ future.result()
+
+ @skip_if_using_serial_implementation(
+ 'Exception is thrown once all transfers are submitted. '
+ 'However for the serial implementation, transfers are performed '
+ 'in main thread meaning all transfers will complete before the '
+ 'exception being thrown.'
+ )
+ def test_cntrl_c_in_context_manager_cancels_incomplete_transfers(self):
+ # The purpose of this test is to make sure if an error is raised
+ # in the body of the context manager, incomplete transfers will
+ # be cancelled with value of the exception wrapped by a CancelledError
+
+ # NOTE: The fact that delete() was chosen to test this is arbitrary
+ # other than it is the easiet to set up for the stubber.
+ # The specific operation is not important to the purpose of this test.
+ num_transfers = 100
+ futures = []
+
+ for _ in range(num_transfers):
+ self.stubber.add_response('delete_object', {})
+
+ manager = TransferManager(
+ self.client,
+ TransferConfig(
+ max_request_concurrency=1, max_submission_concurrency=1)
+ )
+ try:
+ with manager:
+ for i in range(num_transfers):
+ futures.append(manager.delete('mybucket', 'mykey'))
+ raise KeyboardInterrupt()
+ except KeyboardInterrupt:
+ # At least one of the submitted futures should have been
+ # cancelled.
+ with self.assertRaisesRegexp(
+ CancelledError, 'KeyboardInterrupt()'):
+ for future in futures:
+ future.result()
+
+ def test_enable_disable_callbacks_only_ever_registered_once(self):
+ body = SignalTransferringBody()
+ request = create_request_object({
+ 'method': 'PUT',
+ 'url': 'https://s3.amazonaws.com',
+ 'body': body,
+ 'headers': {},
+ 'context': {}
+ })
+ # Create two TransferManager's using the same client
+ TransferManager(self.client)
+ TransferManager(self.client)
+ self.client.meta.events.emit(
+ 'request-created.s3', request=request, operation_name='PutObject')
+ # The client should have only have the enable/disable callback
+ # handlers registered once depite being used for two different
+ # TransferManagers.
+ self.assertEqual(
+ body.signal_transferring_call_count, 1,
+ 'The enable_callback() should have only ever been registered once')
+ self.assertEqual(
+ body.signal_not_transferring_call_count, 1,
+ 'The disable_callback() should have only ever been registered '
+ 'once')
+
+ def test_use_custom_executor_implementation(self):
+ mocked_executor_cls = mock.Mock(BaseExecutor)
+ transfer_manager = TransferManager(
+ self.client, executor_cls=mocked_executor_cls)
+ transfer_manager.delete('bucket', 'key')
+ self.assertTrue(mocked_executor_cls.return_value.submit.called)
+
+ def test_unicode_exception_in_context_manager(self):
+ with self.assertRaises(ArbitraryException):
+ with TransferManager(self.client):
+ raise ArbitraryException(u'\u2713')
+
+ def test_client_property(self):
+ manager = TransferManager(self.client)
+ self.assertIs(manager.client, self.client)
+
+ def test_config_property(self):
+ config = TransferConfig()
+ manager = TransferManager(self.client, config)
+ self.assertIs(manager.config, config)
+
+ def test_can_disable_bucket_validation(self):
+ s3_object_lambda_arn = (
+ 'arn:aws:s3-object-lambda:us-west-2:123456789012:'
+ 'accesspoint:my-accesspoint'
+ )
+ config = TransferConfig()
+ manager = TransferManager(self.client, config)
+ manager.VALIDATE_SUPPORTED_BUCKET_VALUES = False
+ manager.delete(s3_object_lambda_arn, 'my-key')