# -*- coding: utf-8 -*-
from __future__ import absolute_import

import atexit
import errno
import fnmatch
import itertools
import operator
import os
import shutil
import sys
import uuid
import warnings
from functools import partial
from functools import reduce
from os.path import expanduser
from os.path import expandvars
from os.path import isabs
from os.path import sep
from posixpath import sep as posix_sep

import six
from six.moves import map

from .compat import PY36
from _pytest.warning_types import PytestWarning

if PY36:
    from pathlib import Path, PurePath
else:
    from pathlib2 import Path, PurePath

__all__ = ["Path", "PurePath"]


LOCK_TIMEOUT = 60 * 60 * 3

get_lock_path = operator.methodcaller("joinpath", ".lock")


def ensure_reset_dir(path):
    """
    ensures the given path is an empty directory
    """
    if path.exists():
        rm_rf(path)
    path.mkdir()


def on_rm_rf_error(func, path, exc, **kwargs):
    """Handles known read-only errors during rmtree.

    The returned value is used only by our own tests.
    """
    start_path = kwargs["start_path"]
    exctype, excvalue = exc[:2]

    # another process removed the file in the middle of the "rm_rf" (xdist for example)
    # more context: https://github.com/pytest-dev/pytest/issues/5974#issuecomment-543799018
    if isinstance(excvalue, OSError) and excvalue.errno == errno.ENOENT:
        return False

    if not isinstance(excvalue, OSError) or excvalue.errno not in (
        errno.EACCES,
        errno.EPERM,
    ):
        warnings.warn(
            PytestWarning(
                "(rm_rf) error removing {}\n{}: {}".format(path, exctype, excvalue)
            )
        )
        return False

    if func not in (os.rmdir, os.remove, os.unlink):
        warnings.warn(
            PytestWarning(
                "(rm_rf) unknown function {} when removing {}:\n{}: {}".format(
                    path, func, exctype, excvalue
                )
            )
        )
        return False

    # Chmod + retry.
    import stat

    def chmod_rw(p):
        mode = os.stat(p).st_mode
        os.chmod(p, mode | stat.S_IRUSR | stat.S_IWUSR)

    # For files, we need to recursively go upwards in the directories to
    # ensure they all are also writable.
    p = Path(path)
    if p.is_file():
        for parent in p.parents:
            chmod_rw(str(parent))
            # stop when we reach the original path passed to rm_rf
            if parent == start_path:
                break
    chmod_rw(str(path))

    func(path)
    return True


def rm_rf(path):
    """Remove the path contents recursively, even if some elements
    are read-only.
    """
    onerror = partial(on_rm_rf_error, start_path=path)
    shutil.rmtree(str(path), onerror=onerror)


def find_prefixed(root, prefix):
    """finds all elements in root that begin with the prefix, case insensitive"""
    l_prefix = prefix.lower()
    for x in root.iterdir():
        if x.name.lower().startswith(l_prefix):
            yield x


def extract_suffixes(iter, prefix):
    """
    :param iter: iterator over path names
    :param prefix: expected prefix of the path names
    :returns: the parts of the paths following the prefix
    """
    p_len = len(prefix)
    for p in iter:
        yield p.name[p_len:]


def find_suffixes(root, prefix):
    """combines find_prefixes and extract_suffixes
    """
    return extract_suffixes(find_prefixed(root, prefix), prefix)


def parse_num(maybe_num):
    """parses number path suffixes, returns -1 on error"""
    try:
        return int(maybe_num)
    except ValueError:
        return -1


if six.PY2:

    def _max(iterable, default):
        """needed due to python2.7 lacking the default argument for max"""
        return reduce(max, iterable, default)


else:
    _max = max


def _force_symlink(root, target, link_to):
    """helper to create the current symlink

    it's full of race conditions that are reasonably ok to ignore
    for the context of best effort linking to the latest testrun

    the presumption being thatin case of much parallelism
    the inaccuracy is going to be acceptable
    """
    current_symlink = root.joinpath(target)
    try:
        current_symlink.unlink()
    except OSError:
        pass
    try:
        current_symlink.symlink_to(link_to)
    except Exception:
        pass


def make_numbered_dir(root, prefix):
    """create a directory with an increased number as suffix for the given prefix"""
    for i in range(10):
        # try up to 10 times to create the folder
        max_existing = _max(map(parse_num, find_suffixes(root, prefix)), default=-1)
        new_number = max_existing + 1
        new_path = root.joinpath("{}{}".format(prefix, new_number))
        try:
            new_path.mkdir()
        except Exception:
            pass
        else:
            _force_symlink(root, prefix + "current", new_path)
            return new_path
    else:
        raise EnvironmentError(
            "could not create numbered dir with prefix "
            "{prefix} in {root} after 10 tries".format(prefix=prefix, root=root)
        )


def create_cleanup_lock(p):
    """crates a lock to prevent premature folder cleanup"""
    lock_path = get_lock_path(p)
    try:
        fd = os.open(str(lock_path), os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644)
    except OSError as e:
        if e.errno == errno.EEXIST:
            six.raise_from(
                EnvironmentError("cannot create lockfile in {path}".format(path=p)), e
            )
        else:
            raise
    else:
        pid = os.getpid()
        spid = str(pid)
        if not isinstance(spid, bytes):
            spid = spid.encode("ascii")
        os.write(fd, spid)
        os.close(fd)
        if not lock_path.is_file():
            raise EnvironmentError("lock path got renamed after successful creation")
        return lock_path


def register_cleanup_lock_removal(lock_path, register=atexit.register):
    """registers a cleanup function for removing a lock, by default on atexit"""
    pid = os.getpid()

    def cleanup_on_exit(lock_path=lock_path, original_pid=pid):
        current_pid = os.getpid()
        if current_pid != original_pid:
            # fork
            return
        try:
            lock_path.unlink()
        except (OSError, IOError):
            pass

    return register(cleanup_on_exit)


def maybe_delete_a_numbered_dir(path):
    """removes a numbered directory if its lock can be obtained and it does not seem to be in use"""
    lock_path = None
    try:
        lock_path = create_cleanup_lock(path)
        parent = path.parent

        garbage = parent.joinpath("garbage-{}".format(uuid.uuid4()))
        path.rename(garbage)
        rm_rf(garbage)
    except (OSError, EnvironmentError):
        #  known races:
        #  * other process did a cleanup at the same time
        #  * deletable folder was found
        #  * process cwd (Windows)
        return
    finally:
        # if we created the lock, ensure we remove it even if we failed
        # to properly remove the numbered dir
        if lock_path is not None:
            try:
                lock_path.unlink()
            except (OSError, IOError):
                pass


def ensure_deletable(path, consider_lock_dead_if_created_before):
    """checks if a lock exists and breaks it if its considered dead"""
    if path.is_symlink():
        return False
    lock = get_lock_path(path)
    if not lock.exists():
        return True
    try:
        lock_time = lock.stat().st_mtime
    except Exception:
        return False
    else:
        if lock_time < consider_lock_dead_if_created_before:
            lock.unlink()
            return True
        else:
            return False


def try_cleanup(path, consider_lock_dead_if_created_before):
    """tries to cleanup a folder if we can ensure it's deletable"""
    if ensure_deletable(path, consider_lock_dead_if_created_before):
        maybe_delete_a_numbered_dir(path)


def cleanup_candidates(root, prefix, keep):
    """lists candidates for numbered directories to be removed - follows py.path"""
    max_existing = _max(map(parse_num, find_suffixes(root, prefix)), default=-1)
    max_delete = max_existing - keep
    paths = find_prefixed(root, prefix)
    paths, paths2 = itertools.tee(paths)
    numbers = map(parse_num, extract_suffixes(paths2, prefix))
    for path, number in zip(paths, numbers):
        if number <= max_delete:
            yield path


def cleanup_numbered_dir(root, prefix, keep, consider_lock_dead_if_created_before):
    """cleanup for lock driven numbered directories"""
    for path in cleanup_candidates(root, prefix, keep):
        try_cleanup(path, consider_lock_dead_if_created_before)
    for path in root.glob("garbage-*"):
        try_cleanup(path, consider_lock_dead_if_created_before)


def make_numbered_dir_with_cleanup(root, prefix, keep, lock_timeout):
    """creates a numbered dir with a cleanup lock and removes old ones"""
    e = None
    for i in range(10):
        try:
            p = make_numbered_dir(root, prefix)
            lock_path = create_cleanup_lock(p)
            register_cleanup_lock_removal(lock_path)
        except Exception as exc:
            e = exc
        else:
            consider_lock_dead_if_created_before = p.stat().st_mtime - lock_timeout
            cleanup_numbered_dir(
                root=root,
                prefix=prefix,
                keep=keep,
                consider_lock_dead_if_created_before=consider_lock_dead_if_created_before,
            )
            return p
    assert e is not None
    raise e


def resolve_from_str(input, root):
    assert not isinstance(input, Path), "would break on py2"
    root = Path(root)
    input = expanduser(input)
    input = expandvars(input)
    if isabs(input):
        return Path(input)
    else:
        return root.joinpath(input)


def fnmatch_ex(pattern, path):
    """FNMatcher port from py.path.common which works with PurePath() instances.

    The difference between this algorithm and PurePath.match() is that the latter matches "**" glob expressions
    for each part of the path, while this algorithm uses the whole path instead.

    For example:
        "tests/foo/bar/doc/test_foo.py" matches pattern "tests/**/doc/test*.py" with this algorithm, but not with
        PurePath.match().

    This algorithm was ported to keep backward-compatibility with existing settings which assume paths match according
    this logic.

    References:
    * https://bugs.python.org/issue29249
    * https://bugs.python.org/issue34731
    """
    path = PurePath(path)
    iswin32 = sys.platform.startswith("win")

    if iswin32 and sep not in pattern and posix_sep in pattern:
        # Running on Windows, the pattern has no Windows path separators,
        # and the pattern has one or more Posix path separators. Replace
        # the Posix path separators with the Windows path separator.
        pattern = pattern.replace(posix_sep, sep)

    if sep not in pattern:
        name = path.name
    else:
        name = six.text_type(path)
    return fnmatch.fnmatch(name, pattern)


def parts(s):
    parts = s.split(sep)
    return {sep.join(parts[: i + 1]) or sep for i in range(len(parts))}