aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/moto/py3/moto/dax/responses.py
blob: 3ebd66026c3e824fa59a6dd340a4d6e5e0c4b82e (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import json
import re

from moto.core.responses import BaseResponse
from .exceptions import InvalidParameterValueException
from .models import dax_backends


class DAXResponse(BaseResponse):
    @property
    def dax_backend(self):
        return dax_backends[self.region]

    def create_cluster(self):
        params = json.loads(self.body)
        cluster_name = params.get("ClusterName")
        node_type = params.get("NodeType")
        description = params.get("Description")
        replication_factor = params.get("ReplicationFactor")
        iam_role_arn = params.get("IamRoleArn")
        tags = params.get("Tags", [])
        sse_specification = params.get("SSESpecification", {})
        encryption_type = params.get("ClusterEndpointEncryptionType", "NONE")

        self._validate_arn(iam_role_arn)
        self._validate_name(cluster_name)

        cluster = self.dax_backend.create_cluster(
            cluster_name=cluster_name,
            node_type=node_type,
            description=description,
            replication_factor=replication_factor,
            iam_role_arn=iam_role_arn,
            tags=tags,
            sse_specification=sse_specification,
            encryption_type=encryption_type,
        )
        return json.dumps(dict(Cluster=cluster.to_json()))

    def delete_cluster(self):
        cluster_name = json.loads(self.body).get("ClusterName")
        cluster = self.dax_backend.delete_cluster(cluster_name)
        return json.dumps(dict(Cluster=cluster.to_json()))

    def describe_clusters(self):
        params = json.loads(self.body)
        cluster_names = params.get("ClusterNames", [])
        max_results = params.get("MaxResults")
        next_token = params.get("NextToken")

        for name in cluster_names:
            self._validate_name(name)

        clusters, next_token = self.dax_backend.describe_clusters(
            cluster_names=cluster_names, max_results=max_results, next_token=next_token
        )
        return json.dumps(
            {"Clusters": [c.to_json() for c in clusters], "NextToken": next_token}
        )

    def _validate_arn(self, arn):
        if not arn.startswith("arn:"):
            raise InvalidParameterValueException(f"ARNs must start with 'arn:': {arn}")
        sections = arn.split(":")
        if len(sections) < 3:
            raise InvalidParameterValueException(
                f"Second colon partition not found: {arn}"
            )
        if len(sections) < 4:
            raise InvalidParameterValueException(f"Third colon vendor not found: {arn}")
        if len(sections) < 5:
            raise InvalidParameterValueException(
                f"Fourth colon (region/namespace delimiter) not found: {arn}"
            )
        if len(sections) < 6:
            raise InvalidParameterValueException(
                f"Fifth colon (namespace/relative-id delimiter) not found: {arn}"
            )

    def _validate_name(self, name):
        msg = "Cluster ID specified is not a valid identifier. Identifiers must begin with a letter; must contain only ASCII letters, digits, and hyphens; and must not end with a hyphen or contain two consecutive hyphens."
        if not re.match("^[a-z][a-z0-9-]+[a-z0-9]$", name):
            raise InvalidParameterValueException(msg)
        if "--" in name:
            raise InvalidParameterValueException(msg)

    def list_tags(self):
        params = json.loads(self.body)
        resource_name = params.get("ResourceName")
        tags = self.dax_backend.list_tags(resource_name=resource_name)
        return json.dumps(tags)

    def increase_replication_factor(self):
        params = json.loads(self.body)
        cluster_name = params.get("ClusterName")
        new_replication_factor = params.get("NewReplicationFactor")
        cluster = self.dax_backend.increase_replication_factor(
            cluster_name=cluster_name, new_replication_factor=new_replication_factor
        )
        return json.dumps({"Cluster": cluster.to_json()})

    def decrease_replication_factor(self):
        params = json.loads(self.body)
        cluster_name = params.get("ClusterName")
        new_replication_factor = params.get("NewReplicationFactor")
        node_ids_to_remove = params.get("NodeIdsToRemove")
        cluster = self.dax_backend.decrease_replication_factor(
            cluster_name=cluster_name,
            new_replication_factor=new_replication_factor,
            node_ids_to_remove=node_ids_to_remove,
        )
        return json.dumps({"Cluster": cluster.to_json()})