1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
import os
import json
from six import iteritems
from . import constants
class PackageJsonWorkspaceError(RuntimeError):
pass
class PackageJson(object):
DEP_KEY = "dependencies"
DEV_DEP_KEY = "devDependencies"
PEER_DEP_KEY = "peerDependencies"
OPT_DEP_KEY = "optionalDependencies"
DEP_KEYS = (DEP_KEY, DEV_DEP_KEY, PEER_DEP_KEY, OPT_DEP_KEY)
WORKSPACE_SCHEMA = "workspace:"
@classmethod
def load(cls, path):
"""
:param path: package.json path
:type path: str
:rtype: PackageJson
"""
pj = cls(path)
pj.read()
return pj
def __init__(self, path):
if not os.path.isabs(path):
raise TypeError("Absolute path required, given: {}".format(path))
self.path = path
self.data = None
def read(self):
with open(self.path) as f:
self.data = json.load(f)
def get_name(self):
return self.data.get("name")
def get_workspace_dep_paths(self):
"""
:return: Workspace dependencies.
:rtype: list of (str, str)
"""
dep_paths = []
schema = self.WORKSPACE_SCHEMA
schema_len = len(schema)
for deps in map(lambda x: self.data.get(x), self.DEP_KEYS):
if not deps:
continue
for name, spec in iteritems(deps):
if not spec.startswith(schema):
continue
spec_path = spec[schema_len:]
if not (spec_path.startswith(".") or spec_path.startswith("..")):
raise PackageJsonWorkspaceError(
"Expected relative path specifier for workspace dependency, but got '{}' for {} in {}".format(spec, name, self.path))
dep_paths.append((name, spec_path))
return dep_paths
def get_workspace_deps(self):
"""
:rtype: list of PackageJson
"""
ws_deps = []
pj_dir = os.path.dirname(self.path)
for (name, rel_path) in self.get_workspace_dep_paths():
dep_path = os.path.normpath(os.path.join(pj_dir, rel_path))
dep_pj = PackageJson.load(os.path.join(dep_path, constants.PACKAGE_JSON_FILENAME))
if name != dep_pj.get_name():
raise PackageJsonWorkspaceError(
"Workspace dependency name mismatch, found '{}' instead of '{}' in {}".format(name, dep_pj.get_name(), self.path))
ws_deps.append(dep_pj)
return ws_deps
def get_workspace_map(self):
"""
:return: Absolute paths of workspace dependencies (including transitive) mapped to package.json and depth.
:rtype: dict of (PackageJson, int)
"""
ws_deps = {}
# list of (pj, depth)
pj_queue = [(self, 0)]
while len(pj_queue):
(pj, depth) = pj_queue.pop()
pj_dir = os.path.dirname(pj.path)
if pj_dir in ws_deps:
continue
ws_deps[pj_dir] = (pj, depth)
for dep_pj in pj.get_workspace_deps():
pj_queue.append((dep_pj, depth + 1))
return ws_deps
|