import pathlib
import io
import os
import re
import sys
import warnings
from importlib.metadata import (
Distribution,
DistributionFinder,
Prepared,
)
from importlib.resources.abc import Traversable
import __res
with warnings.catch_warnings(action="ignore", category=DeprecationWarning):
from importlib.abc import ResourceReader
ResourceReader.register(__res._ResfsResourceReader)
METADATA_NAME = re.compile("^Name: (.*)$", re.MULTILINE)
class ArcadiaTraversable(Traversable):
def __init__(self, resfs):
self._resfs = resfs
self._path = pathlib.Path(resfs)
def __eq__(self, other) -> bool:
if isinstance(other, ArcadiaTraversable):
return self._path == other._path
raise NotImplementedError
def __lt__(self, other) -> bool:
if isinstance(other, ArcadiaTraversable):
return self._path < other._path
raise NotImplementedError
def __hash__(self) -> int:
return hash(self._path)
@property
def name(self):
return self._path.name
@property
def suffix(self):
return self._path.suffix
class ArcadiaResource(ArcadiaTraversable):
def is_file(self):
return True
def is_dir(self):
return False
def open(self, mode="r", *args, **kwargs):
data = __res.find(self._resfs.encode("utf-8"))
if data is None:
raise FileNotFoundError(self._resfs)
stream = io.BytesIO(data)
if "b" not in mode:
stream = io.TextIOWrapper(stream, *args, **kwargs)
return stream
def joinpath(self, *name):
raise RuntimeError("Cannot traverse into a resource")
def iterdir(self):
return iter(())
def __repr__(self) -> str:
return f"ArcadiaResource({self._resfs!r})"
class ArcadiaResourceContainer(ArcadiaTraversable):
def is_dir(self):
return True
def is_file(self):
return False
def iterdir(self):
seen = set()
for key, path_without_prefix in __res.iter_keys(self._resfs.encode("utf-8")):
if b"/" in path_without_prefix:
subdir = path_without_prefix.split(b"/", maxsplit=1)[0].decode("utf-8")
if subdir not in seen:
seen.add(subdir)
yield ArcadiaResourceContainer(f"{self._resfs}{subdir}/")
else:
yield ArcadiaResource(key.decode("utf-8"))
def open(self, *args, **kwargs):
raise IsADirectoryError(self._resfs)
@staticmethod
def _flatten(compound_names):
for name in compound_names:
yield from name.split("/")
def joinpath(self, *descendants):
if not descendants:
return self
names = self._flatten(descendants)
target = next(names)
for traversable in self.iterdir():
if traversable.name == target:
if isinstance(traversable, ArcadiaResource):
return traversable
else:
return traversable.joinpath(*names)
raise FileNotFoundError("/".join(self._flatten(descendants)))
def __repr__(self):
return f"ArcadiaResourceContainer({self._resfs!r})"
class ArcadiaDistribution(Distribution):
def __init__(self, prefix):
self._prefix = prefix
self._path = pathlib.Path(prefix)
def read_text(self, filename):
data = __res.resfs_read(f"{self._prefix}{filename}")
if data is not None:
return data.decode("utf-8")
read_text.__doc__ = Distribution.read_text.__doc__
def locate_file(self, path):
return self._path.parent / path
class MetadataArcadiaFinder(DistributionFinder):
prefixes = {}
@classmethod
def find_distributions(cls, context=DistributionFinder.Context()):
found = cls._search_prefixes(context.name)
return map(ArcadiaDistribution, found)
@classmethod
def _init_prefixes(cls):
cls.prefixes.clear()
for resource in __res.resfs_files():
resource = resource.decode("utf-8")
if not resource.endswith("METADATA"):
continue
data = __res.resfs_read(resource).decode("utf-8")
metadata_name = METADATA_NAME.search(data)
if metadata_name:
cls.prefixes[Prepared(metadata_name.group(1)).normalized] = resource.removesuffix("METADATA")
@classmethod
def _search_prefixes(cls, name):
if not cls.prefixes:
cls._init_prefixes()
if name:
try:
yield cls.prefixes[Prepared(name).normalized]
except KeyError:
pass
else:
yield from sorted(cls.prefixes.values())