import pathlib
import io
import os
import re
import sys
import warnings
from importlib.metadata import Distribution, DistributionFinder, PackageNotFoundError, Prepared
from importlib.resources.abc import Traversable
import __res
with warnings.catch_warnings(action="ignore", category=DeprecationWarning):
from importlib.abc import ResourceReader
ResourceReader.register(__res._ResfsResourceReader)
METADATA_NAME = re.compile('^Name: (.*)$', re.MULTILINE)
class ArcadiaResourceHandle(Traversable):
def __init__(self, key):
self.resfs_key = key
def is_file(self):
return True
def is_dir(self):
return False
def open(self, mode='r', *args, **kwargs):
data = __res.find(self.resfs_key.encode("utf-8"))
if data is None:
raise FileNotFoundError(self.resfs_key)
stream = io.BytesIO(data)
if 'b' not in mode:
stream = io.TextIOWrapper(stream, *args, **kwargs)
return stream
def joinpath(self, *name):
raise RuntimeError("Cannot traverse into a resource")
def iterdir(self):
return iter(())
@property
def name(self):
return os.path.basename(self.resfs_key)
class ArcadiaResourceContainer(Traversable):
def __init__(self, prefix):
self.resfs_prefix = prefix
def is_dir(self):
return True
def is_file(self):
return False
def iterdir(self):
for key, path_without_prefix in __res.iter_keys(self.resfs_prefix.encode("utf-8")):
if b"/" in path_without_prefix:
name = path_without_prefix.decode("utf-8").split("/", maxsplit=1)[0]
yield ArcadiaResourceContainer(f"{self.resfs_prefix}{name}/")
else:
yield ArcadiaResourceHandle(key.decode("utf-8"))
def open(self, *args, **kwargs):
raise IsADirectoryError(self.resfs_prefix)
def joinpath(self, *descendants):
if not descendants:
return self
return ArcadiaResourceHandle(os.path.join(self.resfs_prefix, *descendants))
@property
def name(self):
return os.path.basename(self.resfs_prefix[:-1])
class ArcadiaDistribution(Distribution):
def __init__(self, prefix):
self.prefix = prefix
@property
def _path(self):
return pathlib.Path(self.prefix)
def read_text(self, filename):
data = __res.resfs_read(f'{self.prefix}{filename}')
if data:
return data.decode('utf-8')
read_text.__doc__ = Distribution.read_text.__doc__
def locate_file(self, path):
return f'{self.prefix}{path}'
class ArcadiaMetadataFinder(DistributionFinder):
prefixes = {}
@classmethod
def find_distributions(cls, context=DistributionFinder.Context()):
found = cls._search_prefixes(context.name)
return map(ArcadiaDistribution, found)
@classmethod
def _init_prefixes(cls):
cls.prefixes.clear()
for resource in __res.resfs_files():
resource = resource.decode('utf-8')
if not resource.endswith('METADATA'):
continue
data = __res.resfs_read(resource).decode('utf-8')
metadata_name = METADATA_NAME.search(data)
if metadata_name:
metadata_name = Prepared(metadata_name.group(1))
cls.prefixes[metadata_name.normalized] = resource[:-len('METADATA')]
@classmethod
def _search_prefixes(cls, name):
if not cls.prefixes:
cls._init_prefixes()
if name:
try:
yield cls.prefixes[Prepared(name).normalized]
except KeyError:
raise PackageNotFoundError(name)
else:
for prefix in sorted(cls.prefixes.values()):
yield prefix
# monkeypatch standart library
import importlib.metadata
importlib.metadata.MetadataPathFinder = ArcadiaMetadataFinder