1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
|
# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the 'License'). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the 'license' file accompanying this file. This file is
# distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
from io import BytesIO
from botocore.awsrequest import create_request_object
import mock
from __tests__ import skip_if_using_serial_implementation
from __tests__ import StubbedClientTest
from s3transfer.exceptions import CancelledError
from s3transfer.exceptions import FatalError
from s3transfer.futures import BaseExecutor
from s3transfer.manager import TransferManager
from s3transfer.manager import TransferConfig
class ArbitraryException(Exception):
pass
class SignalTransferringBody(BytesIO):
"""A mocked body with the ability to signal when transfers occur"""
def __init__(self):
super(SignalTransferringBody, self).__init__()
self.signal_transferring_call_count = 0
self.signal_not_transferring_call_count = 0
def signal_transferring(self):
self.signal_transferring_call_count += 1
def signal_not_transferring(self):
self.signal_not_transferring_call_count += 1
def seek(self, where, whence=0):
pass
def tell(self):
return 0
def read(self, amount=0):
return b''
class TestTransferManager(StubbedClientTest):
@skip_if_using_serial_implementation(
'Exception is thrown once all transfers are submitted. '
'However for the serial implementation, transfers are performed '
'in main thread meaning all transfers will complete before the '
'exception being thrown.'
)
def test_error_in_context_manager_cancels_incomplete_transfers(self):
# The purpose of this test is to make sure if an error is raised
# in the body of the context manager, incomplete transfers will
# be cancelled with value of the exception wrapped by a CancelledError
# NOTE: The fact that delete() was chosen to test this is arbitrary
# other than it is the easiet to set up for the stubber.
# The specific operation is not important to the purpose of this test.
num_transfers = 100
futures = []
ref_exception_msg = 'arbitrary exception'
for _ in range(num_transfers):
self.stubber.add_response('delete_object', {})
manager = TransferManager(
self.client,
TransferConfig(
max_request_concurrency=1, max_submission_concurrency=1)
)
try:
with manager:
for i in range(num_transfers):
futures.append(manager.delete('mybucket', 'mykey'))
raise ArbitraryException(ref_exception_msg)
except ArbitraryException:
# At least one of the submitted futures should have been
# cancelled.
with self.assertRaisesRegexp(FatalError, ref_exception_msg):
for future in futures:
future.result()
@skip_if_using_serial_implementation(
'Exception is thrown once all transfers are submitted. '
'However for the serial implementation, transfers are performed '
'in main thread meaning all transfers will complete before the '
'exception being thrown.'
)
def test_cntrl_c_in_context_manager_cancels_incomplete_transfers(self):
# The purpose of this test is to make sure if an error is raised
# in the body of the context manager, incomplete transfers will
# be cancelled with value of the exception wrapped by a CancelledError
# NOTE: The fact that delete() was chosen to test this is arbitrary
# other than it is the easiet to set up for the stubber.
# The specific operation is not important to the purpose of this test.
num_transfers = 100
futures = []
for _ in range(num_transfers):
self.stubber.add_response('delete_object', {})
manager = TransferManager(
self.client,
TransferConfig(
max_request_concurrency=1, max_submission_concurrency=1)
)
try:
with manager:
for i in range(num_transfers):
futures.append(manager.delete('mybucket', 'mykey'))
raise KeyboardInterrupt()
except KeyboardInterrupt:
# At least one of the submitted futures should have been
# cancelled.
with self.assertRaisesRegexp(
CancelledError, 'KeyboardInterrupt()'):
for future in futures:
future.result()
def test_enable_disable_callbacks_only_ever_registered_once(self):
body = SignalTransferringBody()
request = create_request_object({
'method': 'PUT',
'url': 'https://s3.amazonaws.com',
'body': body,
'headers': {},
'context': {}
})
# Create two TransferManager's using the same client
TransferManager(self.client)
TransferManager(self.client)
self.client.meta.events.emit(
'request-created.s3', request=request, operation_name='PutObject')
# The client should have only have the enable/disable callback
# handlers registered once depite being used for two different
# TransferManagers.
self.assertEqual(
body.signal_transferring_call_count, 1,
'The enable_callback() should have only ever been registered once')
self.assertEqual(
body.signal_not_transferring_call_count, 1,
'The disable_callback() should have only ever been registered '
'once')
def test_use_custom_executor_implementation(self):
mocked_executor_cls = mock.Mock(BaseExecutor)
transfer_manager = TransferManager(
self.client, executor_cls=mocked_executor_cls)
transfer_manager.delete('bucket', 'key')
self.assertTrue(mocked_executor_cls.return_value.submit.called)
def test_unicode_exception_in_context_manager(self):
with self.assertRaises(ArbitraryException):
with TransferManager(self.client):
raise ArbitraryException(u'\u2713')
def test_client_property(self):
manager = TransferManager(self.client)
self.assertIs(manager.client, self.client)
def test_config_property(self):
config = TransferConfig()
manager = TransferManager(self.client, config)
self.assertIs(manager.config, config)
def test_can_disable_bucket_validation(self):
s3_object_lambda_arn = (
'arn:aws:s3-object-lambda:us-west-2:123456789012:'
'accesspoint:my-accesspoint'
)
config = TransferConfig()
manager = TransferManager(self.client, config)
manager.VALIDATE_SUPPORTED_BUCKET_VALUES = False
manager.delete(s3_object_lambda_arn, 'my-key')
|