1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
|
import json
import re
from moto.core.responses import BaseResponse
from .exceptions import InvalidParameterValueException
from .models import dax_backends
class DAXResponse(BaseResponse):
@property
def dax_backend(self):
return dax_backends[self.region]
def create_cluster(self):
params = json.loads(self.body)
cluster_name = params.get("ClusterName")
node_type = params.get("NodeType")
description = params.get("Description")
replication_factor = params.get("ReplicationFactor")
iam_role_arn = params.get("IamRoleArn")
tags = params.get("Tags", [])
sse_specification = params.get("SSESpecification", {})
encryption_type = params.get("ClusterEndpointEncryptionType", "NONE")
self._validate_arn(iam_role_arn)
self._validate_name(cluster_name)
cluster = self.dax_backend.create_cluster(
cluster_name=cluster_name,
node_type=node_type,
description=description,
replication_factor=replication_factor,
iam_role_arn=iam_role_arn,
tags=tags,
sse_specification=sse_specification,
encryption_type=encryption_type,
)
return json.dumps(dict(Cluster=cluster.to_json()))
def delete_cluster(self):
cluster_name = json.loads(self.body).get("ClusterName")
cluster = self.dax_backend.delete_cluster(cluster_name)
return json.dumps(dict(Cluster=cluster.to_json()))
def describe_clusters(self):
params = json.loads(self.body)
cluster_names = params.get("ClusterNames", [])
max_results = params.get("MaxResults")
next_token = params.get("NextToken")
for name in cluster_names:
self._validate_name(name)
clusters, next_token = self.dax_backend.describe_clusters(
cluster_names=cluster_names, max_results=max_results, next_token=next_token
)
return json.dumps(
{"Clusters": [c.to_json() for c in clusters], "NextToken": next_token}
)
def _validate_arn(self, arn):
if not arn.startswith("arn:"):
raise InvalidParameterValueException(f"ARNs must start with 'arn:': {arn}")
sections = arn.split(":")
if len(sections) < 3:
raise InvalidParameterValueException(
f"Second colon partition not found: {arn}"
)
if len(sections) < 4:
raise InvalidParameterValueException(f"Third colon vendor not found: {arn}")
if len(sections) < 5:
raise InvalidParameterValueException(
f"Fourth colon (region/namespace delimiter) not found: {arn}"
)
if len(sections) < 6:
raise InvalidParameterValueException(
f"Fifth colon (namespace/relative-id delimiter) not found: {arn}"
)
def _validate_name(self, name):
msg = "Cluster ID specified is not a valid identifier. Identifiers must begin with a letter; must contain only ASCII letters, digits, and hyphens; and must not end with a hyphen or contain two consecutive hyphens."
if not re.match("^[a-z][a-z0-9-]+[a-z0-9]$", name):
raise InvalidParameterValueException(msg)
if "--" in name:
raise InvalidParameterValueException(msg)
def list_tags(self):
params = json.loads(self.body)
resource_name = params.get("ResourceName")
tags = self.dax_backend.list_tags(resource_name=resource_name)
return json.dumps(tags)
def increase_replication_factor(self):
params = json.loads(self.body)
cluster_name = params.get("ClusterName")
new_replication_factor = params.get("NewReplicationFactor")
cluster = self.dax_backend.increase_replication_factor(
cluster_name=cluster_name, new_replication_factor=new_replication_factor
)
return json.dumps({"Cluster": cluster.to_json()})
def decrease_replication_factor(self):
params = json.loads(self.body)
cluster_name = params.get("ClusterName")
new_replication_factor = params.get("NewReplicationFactor")
node_ids_to_remove = params.get("NodeIdsToRemove")
cluster = self.dax_backend.decrease_replication_factor(
cluster_name=cluster_name,
new_replication_factor=new_replication_factor,
node_ids_to_remove=node_ids_to_remove,
)
return json.dumps({"Cluster": cluster.to_json()})
|