1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
"""EBSBackend class with methods for supported APIs."""
from moto.core import BaseBackend, BaseModel
from moto.core.utils import BackendDict, unix_time
from moto.ec2 import ec2_backends
from moto.ec2.models.elastic_block_store import Snapshot
from moto.moto_api._internal import mock_random
class Block(BaseModel):
def __init__(self, block_data, checksum, checksum_algorithm, data_length):
self.block_data = block_data
self.checksum = checksum
self.checksum_algorithm = checksum_algorithm
self.data_length = data_length
self.block_token = str(mock_random.uuid4())
class EBSSnapshot(BaseModel):
def __init__(self, account_id, snapshot: Snapshot):
self.account_id = account_id
self.snapshot_id = snapshot.id
self.status = "pending"
self.start_time = unix_time()
self.volume_size = snapshot.volume.size
self.block_size = 512
self.tags = [
{"Key": t["key"], "Value": t["value"]} for t in snapshot.get_tags()
]
self.description = snapshot.description
self.blocks = dict()
def put_block(
self, block_idx, block_data, checksum, checksum_algorithm, data_length
):
block = Block(block_data, checksum, checksum_algorithm, data_length)
self.blocks[block_idx] = block
def to_json(self):
return {
"SnapshotId": self.snapshot_id,
"OwnerId": self.account_id,
"Status": self.status,
"StartTime": self.start_time,
"VolumeSize": self.volume_size,
"BlockSize": self.block_size,
"Tags": self.tags,
"Description": self.description,
}
class EBSBackend(BaseBackend):
"""Implementation of EBS APIs."""
def __init__(self, region_name, account_id):
super().__init__(region_name, account_id)
self.snapshots = dict()
@property
def ec2_backend(self):
return ec2_backends[self.account_id][self.region_name]
def start_snapshot(self, volume_size, tags, description):
zone_name = f"{self.region_name}a"
vol = self.ec2_backend.create_volume(size=volume_size, zone_name=zone_name)
snapshot = self.ec2_backend.create_snapshot(
volume_id=vol.id, description=description
)
if tags:
tags = {tag["Key"]: tag["Value"] for tag in tags}
snapshot.add_tags(tags)
ebs_snapshot = EBSSnapshot(account_id=self.account_id, snapshot=snapshot)
self.snapshots[ebs_snapshot.snapshot_id] = ebs_snapshot
return ebs_snapshot
def complete_snapshot(self, snapshot_id):
self.snapshots[snapshot_id].status = "completed"
return {"Status": "completed"}
def put_snapshot_block(
self,
snapshot_id,
block_index,
block_data,
checksum,
checksum_algorithm,
data_length,
):
snapshot = self.snapshots[snapshot_id]
snapshot.put_block(
block_index, block_data, checksum, checksum_algorithm, data_length
)
return checksum, checksum_algorithm
def get_snapshot_block(self, snapshot_id, block_index):
"""
The BlockToken-parameter is not yet implemented
"""
snapshot = self.snapshots[snapshot_id]
return snapshot.blocks[block_index]
def list_changed_blocks(self, first_snapshot_id, second_snapshot_id):
"""
The following parameters are not yet implemented: NextToken, MaxResults, StartingBlockIndex
"""
snapshot1 = self.snapshots[first_snapshot_id]
snapshot2 = self.snapshots[second_snapshot_id]
changed_blocks = dict() # {idx: (token1, token2), ..}
for idx in snapshot1.blocks:
block1 = snapshot1.blocks[idx]
if idx in snapshot2.blocks:
block2 = snapshot2.blocks[idx]
if block1.block_data != block2.block_data:
changed_blocks[idx] = (block1.block_token, block2.block_token)
else:
changed_blocks[idx] = (block1.block_token, None)
return changed_blocks, snapshot1
def list_snapshot_blocks(self, snapshot_id):
return self.snapshots[snapshot_id]
ebs_backends = BackendDict(EBSBackend, "ebs")
|