aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/s3transfer/py3/tests/unit/test_manager.py
blob: fc3caa843fc33b33ed9379df7c8f9e7056142fd7 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import time
from concurrent.futures import ThreadPoolExecutor

from s3transfer.exceptions import CancelledError, FatalError
from s3transfer.futures import TransferCoordinator
from s3transfer.manager import TransferConfig, TransferCoordinatorController
from __tests__ import TransferCoordinatorWithInterrupt, unittest


class FutureResultException(Exception):
    pass


class TestTransferConfig(unittest.TestCase):
    def test_exception_on_zero_attr_value(self):
        with self.assertRaises(ValueError):
            TransferConfig(max_request_queue_size=0)


class TestTransferCoordinatorController(unittest.TestCase):
    def setUp(self):
        self.coordinator_controller = TransferCoordinatorController()

    def sleep_then_announce_done(self, transfer_coordinator, sleep_time):
        time.sleep(sleep_time)
        transfer_coordinator.set_result('done')
        transfer_coordinator.announce_done()

    def assert_coordinator_is_cancelled(self, transfer_coordinator):
        self.assertEqual(transfer_coordinator.status, 'cancelled')

    def test_add_transfer_coordinator(self):
        transfer_coordinator = TransferCoordinator()
        # Add the transfer coordinator
        self.coordinator_controller.add_transfer_coordinator(
            transfer_coordinator
        )
        # Ensure that is tracked.
        self.assertEqual(
            self.coordinator_controller.tracked_transfer_coordinators,
            {transfer_coordinator},
        )

    def test_remove_transfer_coordinator(self):
        transfer_coordinator = TransferCoordinator()
        # Add the coordinator
        self.coordinator_controller.add_transfer_coordinator(
            transfer_coordinator
        )
        # Now remove the coordinator
        self.coordinator_controller.remove_transfer_coordinator(
            transfer_coordinator
        )
        # Make sure that it is no longer getting tracked.
        self.assertEqual(
            self.coordinator_controller.tracked_transfer_coordinators, set()
        )

    def test_cancel(self):
        transfer_coordinator = TransferCoordinator()
        # Add the transfer coordinator
        self.coordinator_controller.add_transfer_coordinator(
            transfer_coordinator
        )
        # Cancel with the canceler
        self.coordinator_controller.cancel()
        # Check that coordinator got canceled
        self.assert_coordinator_is_cancelled(transfer_coordinator)

    def test_cancel_with_message(self):
        message = 'my cancel message'
        transfer_coordinator = TransferCoordinator()
        self.coordinator_controller.add_transfer_coordinator(
            transfer_coordinator
        )
        self.coordinator_controller.cancel(message)
        transfer_coordinator.announce_done()
        with self.assertRaisesRegex(CancelledError, message):
            transfer_coordinator.result()

    def test_cancel_with_provided_exception(self):
        message = 'my cancel message'
        transfer_coordinator = TransferCoordinator()
        self.coordinator_controller.add_transfer_coordinator(
            transfer_coordinator
        )
        self.coordinator_controller.cancel(message, exc_type=FatalError)
        transfer_coordinator.announce_done()
        with self.assertRaisesRegex(FatalError, message):
            transfer_coordinator.result()

    def test_wait_for_done_transfer_coordinators(self):
        # Create a coordinator and add it to the canceler
        transfer_coordinator = TransferCoordinator()
        self.coordinator_controller.add_transfer_coordinator(
            transfer_coordinator
        )

        sleep_time = 0.02
        with ThreadPoolExecutor(max_workers=1) as executor:
            # In a separate thread sleep and then set the transfer coordinator
            # to done after sleeping.
            start_time = time.time()
            executor.submit(
                self.sleep_then_announce_done, transfer_coordinator, sleep_time
            )
            # Now call wait to wait for the transfer coordinator to be done.
            self.coordinator_controller.wait()
            end_time = time.time()
            wait_time = end_time - start_time
        # The time waited should not be less than the time it took to sleep in
        # the separate thread because the wait ending should be dependent on
        # the sleeping thread announcing that the transfer coordinator is done.
        self.assertTrue(sleep_time <= wait_time)

    def test_wait_does_not_propogate_exceptions_from_result(self):
        transfer_coordinator = TransferCoordinator()
        transfer_coordinator.set_exception(FutureResultException())
        transfer_coordinator.announce_done()
        try:
            self.coordinator_controller.wait()
        except FutureResultException as e:
            self.fail('%s should not have been raised.' % e)

    def test_wait_can_be_interrupted(self):
        inject_interrupt_coordinator = TransferCoordinatorWithInterrupt()
        self.coordinator_controller.add_transfer_coordinator(
            inject_interrupt_coordinator
        )
        with self.assertRaises(KeyboardInterrupt):
            self.coordinator_controller.wait()