aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/moto/py3/moto/ebs/models.py
blob: aa088329403277f20edea2d7f7f297e96583f164 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
"""EBSBackend class with methods for supported APIs."""

from moto.core import BaseBackend, BaseModel
from moto.core.utils import BackendDict, unix_time
from moto.ec2 import ec2_backends
from moto.ec2.models.elastic_block_store import Snapshot
from moto.moto_api._internal import mock_random


class Block(BaseModel):
    def __init__(self, block_data, checksum, checksum_algorithm, data_length):
        self.block_data = block_data
        self.checksum = checksum
        self.checksum_algorithm = checksum_algorithm
        self.data_length = data_length
        self.block_token = str(mock_random.uuid4())


class EBSSnapshot(BaseModel):
    def __init__(self, account_id, snapshot: Snapshot):
        self.account_id = account_id
        self.snapshot_id = snapshot.id
        self.status = "pending"
        self.start_time = unix_time()
        self.volume_size = snapshot.volume.size
        self.block_size = 512
        self.tags = [
            {"Key": t["key"], "Value": t["value"]} for t in snapshot.get_tags()
        ]
        self.description = snapshot.description

        self.blocks = dict()

    def put_block(
        self, block_idx, block_data, checksum, checksum_algorithm, data_length
    ):
        block = Block(block_data, checksum, checksum_algorithm, data_length)
        self.blocks[block_idx] = block

    def to_json(self):
        return {
            "SnapshotId": self.snapshot_id,
            "OwnerId": self.account_id,
            "Status": self.status,
            "StartTime": self.start_time,
            "VolumeSize": self.volume_size,
            "BlockSize": self.block_size,
            "Tags": self.tags,
            "Description": self.description,
        }


class EBSBackend(BaseBackend):
    """Implementation of EBS APIs."""

    def __init__(self, region_name, account_id):
        super().__init__(region_name, account_id)
        self.snapshots = dict()

    @property
    def ec2_backend(self):
        return ec2_backends[self.account_id][self.region_name]

    def start_snapshot(self, volume_size, tags, description):
        zone_name = f"{self.region_name}a"
        vol = self.ec2_backend.create_volume(size=volume_size, zone_name=zone_name)
        snapshot = self.ec2_backend.create_snapshot(
            volume_id=vol.id, description=description
        )
        if tags:
            tags = {tag["Key"]: tag["Value"] for tag in tags}
            snapshot.add_tags(tags)
        ebs_snapshot = EBSSnapshot(account_id=self.account_id, snapshot=snapshot)
        self.snapshots[ebs_snapshot.snapshot_id] = ebs_snapshot
        return ebs_snapshot

    def complete_snapshot(self, snapshot_id):
        self.snapshots[snapshot_id].status = "completed"
        return {"Status": "completed"}

    def put_snapshot_block(
        self,
        snapshot_id,
        block_index,
        block_data,
        checksum,
        checksum_algorithm,
        data_length,
    ):
        snapshot = self.snapshots[snapshot_id]
        snapshot.put_block(
            block_index, block_data, checksum, checksum_algorithm, data_length
        )
        return checksum, checksum_algorithm

    def get_snapshot_block(self, snapshot_id, block_index):
        """
        The BlockToken-parameter is not yet implemented
        """
        snapshot = self.snapshots[snapshot_id]
        return snapshot.blocks[block_index]

    def list_changed_blocks(self, first_snapshot_id, second_snapshot_id):
        """
        The following parameters are not yet implemented: NextToken, MaxResults, StartingBlockIndex
        """
        snapshot1 = self.snapshots[first_snapshot_id]
        snapshot2 = self.snapshots[second_snapshot_id]
        changed_blocks = dict()  # {idx: (token1, token2), ..}
        for idx in snapshot1.blocks:
            block1 = snapshot1.blocks[idx]
            if idx in snapshot2.blocks:
                block2 = snapshot2.blocks[idx]
                if block1.block_data != block2.block_data:
                    changed_blocks[idx] = (block1.block_token, block2.block_token)
            else:
                changed_blocks[idx] = (block1.block_token, None)

        return changed_blocks, snapshot1

    def list_snapshot_blocks(self, snapshot_id):
        return self.snapshots[snapshot_id]


ebs_backends = BackendDict(EBSBackend, "ebs")