1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
|
import json
import logging
import os
from six import iteritems
from .utils import build_pj_path
logger = logging.getLogger(__name__)
class PackageJsonWorkspaceError(RuntimeError):
pass
class PackageJson(object):
DEP_KEY = "dependencies"
DEV_DEP_KEY = "devDependencies"
PEER_DEP_KEY = "peerDependencies"
OPT_DEP_KEY = "optionalDependencies"
DEP_KEYS = (DEP_KEY, DEV_DEP_KEY, PEER_DEP_KEY, OPT_DEP_KEY)
WORKSPACE_SCHEMA = "workspace:"
@classmethod
def load(cls, path):
"""
:param path: package.json path
:type path: str
:rtype: PackageJson
"""
pj = cls(path)
pj.read()
return pj
def __init__(self, path):
if not os.path.isabs(path):
raise TypeError("Absolute path required, given: {}".format(path))
self.path = path
self.data = None
def read(self):
with open(self.path) as f:
self.data = json.load(f)
def write(self, path=None):
"""
:param path: path to store package.json, defaults to original path
:type path: str
"""
if path is None:
path = self.path
directory = os.path.dirname(path)
if not os.path.exists(directory):
os.mkdir(directory)
with open(path, "w") as f:
json.dump(self.data, f, indent=2, ensure_ascii=False)
f.write('\n') # it's better for diff algorithm in arc
logger.debug("Written {}".format(path))
def get_name(self):
name = self.data.get("name")
if not name:
name = self.path.replace("/", "-")
return name
def get_version(self):
return self.data["version"]
def get_description(self):
return self.data.get("description")
def get_nodejs_version(self):
return self.data.get("engines", {}).get("node")
def get_dep_specifier(self, dep_name):
for name, spec in self.dependencies_iter():
if dep_name == name:
return spec
return None
def dependencies_iter(self):
for key in self.DEP_KEYS:
deps = self.data.get(key)
if not deps:
continue
for name, spec in iteritems(deps):
yield (name, spec)
def has_dependencies(self):
first_dep = next(self.dependencies_iter(), None)
return first_dep is not None
def bins_iter(self):
bins = self.data.get("bin")
if isinstance(bins, str):
yield bins
elif isinstance(bins, dict):
for bin in bins.values():
yield bin
def get_workspace_dep_spec_paths(self):
"""
Returns names and paths from specifiers of the defined workspace dependencies.
:rtype: list of (str, str)
"""
spec_paths = []
schema = self.WORKSPACE_SCHEMA
schema_len = len(schema)
for name, spec in self.dependencies_iter():
if not spec.startswith(schema):
continue
spec_path = spec[schema_len:]
if not (spec_path.startswith(".") or spec_path.startswith("..")):
raise PackageJsonWorkspaceError(
"Expected relative path specifier for workspace dependency, but got '{}' for {} in {}".format(
spec, name, self.path
)
)
spec_paths.append((name, spec_path))
return spec_paths
def get_workspace_dep_paths(self, base_path=None):
"""
Returns paths of the defined workspace dependencies.
:param base_path: base path to resolve relative dep paths
:type base_path: str
:rtype: list of str
"""
if base_path is None:
base_path = os.path.dirname(self.path)
return [os.path.normpath(os.path.join(base_path, p)) for _, p in self.get_workspace_dep_spec_paths()]
def get_workspace_deps(self):
"""
:rtype: list of PackageJson
"""
ws_deps = []
pj_dir = os.path.dirname(self.path)
for name, rel_path in self.get_workspace_dep_spec_paths():
dep_path = os.path.normpath(os.path.join(pj_dir, rel_path))
dep_pj = PackageJson.load(build_pj_path(dep_path))
if name != dep_pj.get_name():
raise PackageJsonWorkspaceError(
"Workspace dependency name mismatch, found '{}' instead of '{}' in {}".format(
name, dep_pj.get_name(), self.path
)
)
ws_deps.append(dep_pj)
return ws_deps
def get_workspace_map(self, ignore_self=False):
"""
Returns absolute paths of the workspace dependencies (including transitive) mapped to package.json and depth.
:param ignore_self: whether path of the current module will be excluded
:type ignore_self: bool
:rtype: dict of (PackageJson, int)
"""
ws_deps = {}
# list of (pj, depth)
pj_queue = [(self, 0)]
while len(pj_queue):
(pj, depth) = pj_queue.pop()
pj_dir = os.path.dirname(pj.path)
if pj_dir in ws_deps:
continue
if not ignore_self or pj != self:
ws_deps[pj_dir] = (pj, depth)
for dep_pj in pj.get_workspace_deps():
pj_queue.append((dep_pj, depth + 1))
return ws_deps
def get_dep_paths_by_names(self):
"""
Returns dict of {dependency_name: dependency_path}
"""
ws_map = self.get_workspace_map()
return {pj.get_name(): path for path, (pj, _) in ws_map.items()}
|