diff options
| author | nkozlovskiy <[email protected]> | 2023-09-29 12:24:06 +0300 | 
|---|---|---|
| committer | nkozlovskiy <[email protected]> | 2023-09-29 12:41:34 +0300 | 
| commit | e0e3e1717e3d33762ce61950504f9637a6e669ed (patch) | |
| tree | bca3ff6939b10ed60c3d5c12439963a1146b9711 /contrib/python/Jinja2/py3/jinja2/utils.py | |
| parent | 38f2c5852db84c7b4d83adfcb009eb61541d1ccd (diff) | |
add ydb deps
Diffstat (limited to 'contrib/python/Jinja2/py3/jinja2/utils.py')
| -rw-r--r-- | contrib/python/Jinja2/py3/jinja2/utils.py | 755 | 
1 files changed, 755 insertions, 0 deletions
| diff --git a/contrib/python/Jinja2/py3/jinja2/utils.py b/contrib/python/Jinja2/py3/jinja2/utils.py new file mode 100644 index 00000000000..9b5f5a50eb6 --- /dev/null +++ b/contrib/python/Jinja2/py3/jinja2/utils.py @@ -0,0 +1,755 @@ +import enum +import json +import os +import re +import typing as t +from collections import abc +from collections import deque +from random import choice +from random import randrange +from threading import Lock +from types import CodeType +from urllib.parse import quote_from_bytes + +import markupsafe + +if t.TYPE_CHECKING: +    import typing_extensions as te + +F = t.TypeVar("F", bound=t.Callable[..., t.Any]) + +# special singleton representing missing values for the runtime +missing: t.Any = type("MissingType", (), {"__repr__": lambda x: "missing"})() + +internal_code: t.MutableSet[CodeType] = set() + +concat = "".join + + +def pass_context(f: F) -> F: +    """Pass the :class:`~jinja2.runtime.Context` as the first argument +    to the decorated function when called while rendering a template. + +    Can be used on functions, filters, and tests. + +    If only ``Context.eval_context`` is needed, use +    :func:`pass_eval_context`. If only ``Context.environment`` is +    needed, use :func:`pass_environment`. + +    .. versionadded:: 3.0.0 +        Replaces ``contextfunction`` and ``contextfilter``. +    """ +    f.jinja_pass_arg = _PassArg.context  # type: ignore +    return f + + +def pass_eval_context(f: F) -> F: +    """Pass the :class:`~jinja2.nodes.EvalContext` as the first argument +    to the decorated function when called while rendering a template. +    See :ref:`eval-context`. + +    Can be used on functions, filters, and tests. + +    If only ``EvalContext.environment`` is needed, use +    :func:`pass_environment`. + +    .. versionadded:: 3.0.0 +        Replaces ``evalcontextfunction`` and ``evalcontextfilter``. +    """ +    f.jinja_pass_arg = _PassArg.eval_context  # type: ignore +    return f + + +def pass_environment(f: F) -> F: +    """Pass the :class:`~jinja2.Environment` as the first argument to +    the decorated function when called while rendering a template. + +    Can be used on functions, filters, and tests. + +    .. versionadded:: 3.0.0 +        Replaces ``environmentfunction`` and ``environmentfilter``. +    """ +    f.jinja_pass_arg = _PassArg.environment  # type: ignore +    return f + + +class _PassArg(enum.Enum): +    context = enum.auto() +    eval_context = enum.auto() +    environment = enum.auto() + +    @classmethod +    def from_obj(cls, obj: F) -> t.Optional["_PassArg"]: +        if hasattr(obj, "jinja_pass_arg"): +            return obj.jinja_pass_arg  # type: ignore + +        return None + + +def internalcode(f: F) -> F: +    """Marks the function as internally used""" +    internal_code.add(f.__code__) +    return f + + +def is_undefined(obj: t.Any) -> bool: +    """Check if the object passed is undefined.  This does nothing more than +    performing an instance check against :class:`Undefined` but looks nicer. +    This can be used for custom filters or tests that want to react to +    undefined variables.  For example a custom default filter can look like +    this:: + +        def default(var, default=''): +            if is_undefined(var): +                return default +            return var +    """ +    from .runtime import Undefined + +    return isinstance(obj, Undefined) + + +def consume(iterable: t.Iterable[t.Any]) -> None: +    """Consumes an iterable without doing anything with it.""" +    for _ in iterable: +        pass + + +def clear_caches() -> None: +    """Jinja keeps internal caches for environments and lexers.  These are +    used so that Jinja doesn't have to recreate environments and lexers all +    the time.  Normally you don't have to care about that but if you are +    measuring memory consumption you may want to clean the caches. +    """ +    from .environment import get_spontaneous_environment +    from .lexer import _lexer_cache + +    get_spontaneous_environment.cache_clear() +    _lexer_cache.clear() + + +def import_string(import_name: str, silent: bool = False) -> t.Any: +    """Imports an object based on a string.  This is useful if you want to +    use import paths as endpoints or something similar.  An import path can +    be specified either in dotted notation (``xml.sax.saxutils.escape``) +    or with a colon as object delimiter (``xml.sax.saxutils:escape``). + +    If the `silent` is True the return value will be `None` if the import +    fails. + +    :return: imported object +    """ +    try: +        if ":" in import_name: +            module, obj = import_name.split(":", 1) +        elif "." in import_name: +            module, _, obj = import_name.rpartition(".") +        else: +            return __import__(import_name) +        return getattr(__import__(module, None, None, [obj]), obj) +    except (ImportError, AttributeError): +        if not silent: +            raise + + +def open_if_exists(filename: str, mode: str = "rb") -> t.Optional[t.IO]: +    """Returns a file descriptor for the filename if that file exists, +    otherwise ``None``. +    """ +    if not os.path.isfile(filename): +        return None + +    return open(filename, mode) + + +def object_type_repr(obj: t.Any) -> str: +    """Returns the name of the object's type.  For some recognized +    singletons the name of the object is returned instead. (For +    example for `None` and `Ellipsis`). +    """ +    if obj is None: +        return "None" +    elif obj is Ellipsis: +        return "Ellipsis" + +    cls = type(obj) + +    if cls.__module__ == "builtins": +        return f"{cls.__name__} object" + +    return f"{cls.__module__}.{cls.__name__} object" + + +def pformat(obj: t.Any) -> str: +    """Format an object using :func:`pprint.pformat`.""" +    from pprint import pformat  # type: ignore + +    return pformat(obj) + + +_http_re = re.compile( +    r""" +    ^ +    ( +        (https?://|www\.)  # scheme or www +        (([\w%-]+\.)+)?  # subdomain +        ( +            [a-z]{2,63}  # basic tld +        | +            xn--[\w%]{2,59}  # idna tld +        ) +    | +        ([\w%-]{2,63}\.)+  # basic domain +        (com|net|int|edu|gov|org|info|mil)  # basic tld +    | +        (https?://)  # scheme +        ( +            (([\d]{1,3})(\.[\d]{1,3}){3})  # IPv4 +        | +            (\[([\da-f]{0,4}:){2}([\da-f]{0,4}:?){1,6}])  # IPv6 +        ) +    ) +    (?::[\d]{1,5})?  # port +    (?:[/?#]\S*)?  # path, query, and fragment +    $ +    """, +    re.IGNORECASE | re.VERBOSE, +) +_email_re = re.compile(r"^\S+@\w[\w.-]*\.\w+$") + + +def urlize( +    text: str, +    trim_url_limit: t.Optional[int] = None, +    rel: t.Optional[str] = None, +    target: t.Optional[str] = None, +    extra_schemes: t.Optional[t.Iterable[str]] = None, +) -> str: +    """Convert URLs in text into clickable links. + +    This may not recognize links in some situations. Usually, a more +    comprehensive formatter, such as a Markdown library, is a better +    choice. + +    Works on ``http://``, ``https://``, ``www.``, ``mailto:``, and email +    addresses. Links with trailing punctuation (periods, commas, closing +    parentheses) and leading punctuation (opening parentheses) are +    recognized excluding the punctuation. Email addresses that include +    header fields are not recognized (for example, +    ``mailto:[email protected][email protected]``). + +    :param text: Original text containing URLs to link. +    :param trim_url_limit: Shorten displayed URL values to this length. +    :param target: Add the ``target`` attribute to links. +    :param rel: Add the ``rel`` attribute to links. +    :param extra_schemes: Recognize URLs that start with these schemes +        in addition to the default behavior. + +    .. versionchanged:: 3.0 +        The ``extra_schemes`` parameter was added. + +    .. versionchanged:: 3.0 +        Generate ``https://`` links for URLs without a scheme. + +    .. versionchanged:: 3.0 +        The parsing rules were updated. Recognize email addresses with +        or without the ``mailto:`` scheme. Validate IP addresses. Ignore +        parentheses and brackets in more cases. +    """ +    if trim_url_limit is not None: + +        def trim_url(x: str) -> str: +            if len(x) > trim_url_limit:  # type: ignore +                return f"{x[:trim_url_limit]}..." + +            return x + +    else: + +        def trim_url(x: str) -> str: +            return x + +    words = re.split(r"(\s+)", str(markupsafe.escape(text))) +    rel_attr = f' rel="{markupsafe.escape(rel)}"' if rel else "" +    target_attr = f' target="{markupsafe.escape(target)}"' if target else "" + +    for i, word in enumerate(words): +        head, middle, tail = "", word, "" +        match = re.match(r"^([(<]|<)+", middle) + +        if match: +            head = match.group() +            middle = middle[match.end() :] + +        # Unlike lead, which is anchored to the start of the string, +        # need to check that the string ends with any of the characters +        # before trying to match all of them, to avoid backtracking. +        if middle.endswith((")", ">", ".", ",", "\n", ">")): +            match = re.search(r"([)>.,\n]|>)+$", middle) + +            if match: +                tail = match.group() +                middle = middle[: match.start()] + +        # Prefer balancing parentheses in URLs instead of ignoring a +        # trailing character. +        for start_char, end_char in ("(", ")"), ("<", ">"), ("<", ">"): +            start_count = middle.count(start_char) + +            if start_count <= middle.count(end_char): +                # Balanced, or lighter on the left +                continue + +            # Move as many as possible from the tail to balance +            for _ in range(min(start_count, tail.count(end_char))): +                end_index = tail.index(end_char) + len(end_char) +                # Move anything in the tail before the end char too +                middle += tail[:end_index] +                tail = tail[end_index:] + +        if _http_re.match(middle): +            if middle.startswith("https://") or middle.startswith("http://"): +                middle = ( +                    f'<a href="{middle}"{rel_attr}{target_attr}>{trim_url(middle)}</a>' +                ) +            else: +                middle = ( +                    f'<a href="https://{middle}"{rel_attr}{target_attr}>' +                    f"{trim_url(middle)}</a>" +                ) + +        elif middle.startswith("mailto:") and _email_re.match(middle[7:]): +            middle = f'<a href="{middle}">{middle[7:]}</a>' + +        elif ( +            "@" in middle +            and not middle.startswith("www.") +            and ":" not in middle +            and _email_re.match(middle) +        ): +            middle = f'<a href="mailto:{middle}">{middle}</a>' + +        elif extra_schemes is not None: +            for scheme in extra_schemes: +                if middle != scheme and middle.startswith(scheme): +                    middle = f'<a href="{middle}"{rel_attr}{target_attr}>{middle}</a>' + +        words[i] = f"{head}{middle}{tail}" + +    return "".join(words) + + +def generate_lorem_ipsum( +    n: int = 5, html: bool = True, min: int = 20, max: int = 100 +) -> str: +    """Generate some lorem ipsum for the template.""" +    from .constants import LOREM_IPSUM_WORDS + +    words = LOREM_IPSUM_WORDS.split() +    result = [] + +    for _ in range(n): +        next_capitalized = True +        last_comma = last_fullstop = 0 +        word = None +        last = None +        p = [] + +        # each paragraph contains out of 20 to 100 words. +        for idx, _ in enumerate(range(randrange(min, max))): +            while True: +                word = choice(words) +                if word != last: +                    last = word +                    break +            if next_capitalized: +                word = word.capitalize() +                next_capitalized = False +            # add commas +            if idx - randrange(3, 8) > last_comma: +                last_comma = idx +                last_fullstop += 2 +                word += "," +            # add end of sentences +            if idx - randrange(10, 20) > last_fullstop: +                last_comma = last_fullstop = idx +                word += "." +                next_capitalized = True +            p.append(word) + +        # ensure that the paragraph ends with a dot. +        p_str = " ".join(p) + +        if p_str.endswith(","): +            p_str = p_str[:-1] + "." +        elif not p_str.endswith("."): +            p_str += "." + +        result.append(p_str) + +    if not html: +        return "\n\n".join(result) +    return markupsafe.Markup( +        "\n".join(f"<p>{markupsafe.escape(x)}</p>" for x in result) +    ) + + +def url_quote(obj: t.Any, charset: str = "utf-8", for_qs: bool = False) -> str: +    """Quote a string for use in a URL using the given charset. + +    :param obj: String or bytes to quote. Other types are converted to +        string then encoded to bytes using the given charset. +    :param charset: Encode text to bytes using this charset. +    :param for_qs: Quote "/" and use "+" for spaces. +    """ +    if not isinstance(obj, bytes): +        if not isinstance(obj, str): +            obj = str(obj) + +        obj = obj.encode(charset) + +    safe = b"" if for_qs else b"/" +    rv = quote_from_bytes(obj, safe) + +    if for_qs: +        rv = rv.replace("%20", "+") + +    return rv + + +class LRUCache: +    """A simple LRU Cache implementation.""" + +    # this is fast for small capacities (something below 1000) but doesn't +    # scale.  But as long as it's only used as storage for templates this +    # won't do any harm. + +    def __init__(self, capacity: int) -> None: +        self.capacity = capacity +        self._mapping: t.Dict[t.Any, t.Any] = {} +        self._queue: "te.Deque[t.Any]" = deque() +        self._postinit() + +    def _postinit(self) -> None: +        # alias all queue methods for faster lookup +        self._popleft = self._queue.popleft +        self._pop = self._queue.pop +        self._remove = self._queue.remove +        self._wlock = Lock() +        self._append = self._queue.append + +    def __getstate__(self) -> t.Mapping[str, t.Any]: +        return { +            "capacity": self.capacity, +            "_mapping": self._mapping, +            "_queue": self._queue, +        } + +    def __setstate__(self, d: t.Mapping[str, t.Any]) -> None: +        self.__dict__.update(d) +        self._postinit() + +    def __getnewargs__(self) -> t.Tuple: +        return (self.capacity,) + +    def copy(self) -> "LRUCache": +        """Return a shallow copy of the instance.""" +        rv = self.__class__(self.capacity) +        rv._mapping.update(self._mapping) +        rv._queue.extend(self._queue) +        return rv + +    def get(self, key: t.Any, default: t.Any = None) -> t.Any: +        """Return an item from the cache dict or `default`""" +        try: +            return self[key] +        except KeyError: +            return default + +    def setdefault(self, key: t.Any, default: t.Any = None) -> t.Any: +        """Set `default` if the key is not in the cache otherwise +        leave unchanged. Return the value of this key. +        """ +        try: +            return self[key] +        except KeyError: +            self[key] = default +            return default + +    def clear(self) -> None: +        """Clear the cache.""" +        with self._wlock: +            self._mapping.clear() +            self._queue.clear() + +    def __contains__(self, key: t.Any) -> bool: +        """Check if a key exists in this cache.""" +        return key in self._mapping + +    def __len__(self) -> int: +        """Return the current size of the cache.""" +        return len(self._mapping) + +    def __repr__(self) -> str: +        return f"<{type(self).__name__} {self._mapping!r}>" + +    def __getitem__(self, key: t.Any) -> t.Any: +        """Get an item from the cache. Moves the item up so that it has the +        highest priority then. + +        Raise a `KeyError` if it does not exist. +        """ +        with self._wlock: +            rv = self._mapping[key] + +            if self._queue[-1] != key: +                try: +                    self._remove(key) +                except ValueError: +                    # if something removed the key from the container +                    # when we read, ignore the ValueError that we would +                    # get otherwise. +                    pass + +                self._append(key) + +            return rv + +    def __setitem__(self, key: t.Any, value: t.Any) -> None: +        """Sets the value for an item. Moves the item up so that it +        has the highest priority then. +        """ +        with self._wlock: +            if key in self._mapping: +                self._remove(key) +            elif len(self._mapping) == self.capacity: +                del self._mapping[self._popleft()] + +            self._append(key) +            self._mapping[key] = value + +    def __delitem__(self, key: t.Any) -> None: +        """Remove an item from the cache dict. +        Raise a `KeyError` if it does not exist. +        """ +        with self._wlock: +            del self._mapping[key] + +            try: +                self._remove(key) +            except ValueError: +                pass + +    def items(self) -> t.Iterable[t.Tuple[t.Any, t.Any]]: +        """Return a list of items.""" +        result = [(key, self._mapping[key]) for key in list(self._queue)] +        result.reverse() +        return result + +    def values(self) -> t.Iterable[t.Any]: +        """Return a list of all values.""" +        return [x[1] for x in self.items()] + +    def keys(self) -> t.Iterable[t.Any]: +        """Return a list of all keys ordered by most recent usage.""" +        return list(self) + +    def __iter__(self) -> t.Iterator[t.Any]: +        return reversed(tuple(self._queue)) + +    def __reversed__(self) -> t.Iterator[t.Any]: +        """Iterate over the keys in the cache dict, oldest items +        coming first. +        """ +        return iter(tuple(self._queue)) + +    __copy__ = copy + + +def select_autoescape( +    enabled_extensions: t.Collection[str] = ("html", "htm", "xml"), +    disabled_extensions: t.Collection[str] = (), +    default_for_string: bool = True, +    default: bool = False, +) -> t.Callable[[t.Optional[str]], bool]: +    """Intelligently sets the initial value of autoescaping based on the +    filename of the template.  This is the recommended way to configure +    autoescaping if you do not want to write a custom function yourself. + +    If you want to enable it for all templates created from strings or +    for all templates with `.html` and `.xml` extensions:: + +        from jinja2 import Environment, select_autoescape +        env = Environment(autoescape=select_autoescape( +            enabled_extensions=('html', 'xml'), +            default_for_string=True, +        )) + +    Example configuration to turn it on at all times except if the template +    ends with `.txt`:: + +        from jinja2 import Environment, select_autoescape +        env = Environment(autoescape=select_autoescape( +            disabled_extensions=('txt',), +            default_for_string=True, +            default=True, +        )) + +    The `enabled_extensions` is an iterable of all the extensions that +    autoescaping should be enabled for.  Likewise `disabled_extensions` is +    a list of all templates it should be disabled for.  If a template is +    loaded from a string then the default from `default_for_string` is used. +    If nothing matches then the initial value of autoescaping is set to the +    value of `default`. + +    For security reasons this function operates case insensitive. + +    .. versionadded:: 2.9 +    """ +    enabled_patterns = tuple(f".{x.lstrip('.').lower()}" for x in enabled_extensions) +    disabled_patterns = tuple(f".{x.lstrip('.').lower()}" for x in disabled_extensions) + +    def autoescape(template_name: t.Optional[str]) -> bool: +        if template_name is None: +            return default_for_string +        template_name = template_name.lower() +        if template_name.endswith(enabled_patterns): +            return True +        if template_name.endswith(disabled_patterns): +            return False +        return default + +    return autoescape + + +def htmlsafe_json_dumps( +    obj: t.Any, dumps: t.Optional[t.Callable[..., str]] = None, **kwargs: t.Any +) -> markupsafe.Markup: +    """Serialize an object to a string of JSON with :func:`json.dumps`, +    then replace HTML-unsafe characters with Unicode escapes and mark +    the result safe with :class:`~markupsafe.Markup`. + +    This is available in templates as the ``|tojson`` filter. + +    The following characters are escaped: ``<``, ``>``, ``&``, ``'``. + +    The returned string is safe to render in HTML documents and +    ``<script>`` tags. The exception is in HTML attributes that are +    double quoted; either use single quotes or the ``|forceescape`` +    filter. + +    :param obj: The object to serialize to JSON. +    :param dumps: The ``dumps`` function to use. Defaults to +        ``env.policies["json.dumps_function"]``, which defaults to +        :func:`json.dumps`. +    :param kwargs: Extra arguments to pass to ``dumps``. Merged onto +        ``env.policies["json.dumps_kwargs"]``. + +    .. versionchanged:: 3.0 +        The ``dumper`` parameter is renamed to ``dumps``. + +    .. versionadded:: 2.9 +    """ +    if dumps is None: +        dumps = json.dumps + +    return markupsafe.Markup( +        dumps(obj, **kwargs) +        .replace("<", "\\u003c") +        .replace(">", "\\u003e") +        .replace("&", "\\u0026") +        .replace("'", "\\u0027") +    ) + + +class Cycler: +    """Cycle through values by yield them one at a time, then restarting +    once the end is reached. Available as ``cycler`` in templates. + +    Similar to ``loop.cycle``, but can be used outside loops or across +    multiple loops. For example, render a list of folders and files in a +    list, alternating giving them "odd" and "even" classes. + +    .. code-block:: html+jinja + +        {% set row_class = cycler("odd", "even") %} +        <ul class="browser"> +        {% for folder in folders %} +          <li class="folder {{ row_class.next() }}">{{ folder }} +        {% endfor %} +        {% for file in files %} +          <li class="file {{ row_class.next() }}">{{ file }} +        {% endfor %} +        </ul> + +    :param items: Each positional argument will be yielded in the order +        given for each cycle. + +    .. versionadded:: 2.1 +    """ + +    def __init__(self, *items: t.Any) -> None: +        if not items: +            raise RuntimeError("at least one item has to be provided") +        self.items = items +        self.pos = 0 + +    def reset(self) -> None: +        """Resets the current item to the first item.""" +        self.pos = 0 + +    @property +    def current(self) -> t.Any: +        """Return the current item. Equivalent to the item that will be +        returned next time :meth:`next` is called. +        """ +        return self.items[self.pos] + +    def next(self) -> t.Any: +        """Return the current item, then advance :attr:`current` to the +        next item. +        """ +        rv = self.current +        self.pos = (self.pos + 1) % len(self.items) +        return rv + +    __next__ = next + + +class Joiner: +    """A joining helper for templates.""" + +    def __init__(self, sep: str = ", ") -> None: +        self.sep = sep +        self.used = False + +    def __call__(self) -> str: +        if not self.used: +            self.used = True +            return "" +        return self.sep + + +class Namespace: +    """A namespace object that can hold arbitrary attributes.  It may be +    initialized from a dictionary or with keyword arguments.""" + +    def __init__(*args: t.Any, **kwargs: t.Any) -> None:  # noqa: B902 +        self, args = args[0], args[1:] +        self.__attrs = dict(*args, **kwargs) + +    def __getattribute__(self, name: str) -> t.Any: +        # __class__ is needed for the awaitable check in async mode +        if name in {"_Namespace__attrs", "__class__"}: +            return object.__getattribute__(self, name) +        try: +            return self.__attrs[name] +        except KeyError: +            raise AttributeError(name) from None + +    def __setitem__(self, name: str, value: t.Any) -> None: +        self.__attrs[name] = value + +    def __repr__(self) -> str: +        return f"<Namespace {self.__attrs!r}>" | 
