diff options
author | shmel1k <shmel1k@ydb.tech> | 2023-11-26 18:16:14 +0300 |
---|---|---|
committer | shmel1k <shmel1k@ydb.tech> | 2023-11-26 18:43:30 +0300 |
commit | b8cf9e88f4c5c64d9406af533d8948deb050d695 (patch) | |
tree | 218eb61fb3c3b96ec08b4d8cdfef383104a87d63 /contrib/python/Twisted/py3/twisted/web/_flatten.py | |
parent | 523f645a83a0ec97a0332dbc3863bb354c92a328 (diff) | |
download | ydb-b8cf9e88f4c5c64d9406af533d8948deb050d695.tar.gz |
add kikimr_configure
Diffstat (limited to 'contrib/python/Twisted/py3/twisted/web/_flatten.py')
-rw-r--r-- | contrib/python/Twisted/py3/twisted/web/_flatten.py | 487 |
1 files changed, 487 insertions, 0 deletions
diff --git a/contrib/python/Twisted/py3/twisted/web/_flatten.py b/contrib/python/Twisted/py3/twisted/web/_flatten.py new file mode 100644 index 0000000000..87a8bf2dfb --- /dev/null +++ b/contrib/python/Twisted/py3/twisted/web/_flatten.py @@ -0,0 +1,487 @@ +# -*- test-case-name: twisted.web.test.test_flatten,twisted.web.test.test_template -*- +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Context-free flattener/serializer for rendering Python objects, possibly +complex or arbitrarily nested, as strings. +""" +from __future__ import annotations + +from inspect import iscoroutine +from io import BytesIO +from sys import exc_info +from traceback import extract_tb +from types import GeneratorType +from typing import ( + Any, + Callable, + Coroutine, + Generator, + List, + Mapping, + Optional, + Sequence, + Tuple, + TypeVar, + Union, + cast, +) + +from twisted.internet.defer import Deferred, ensureDeferred +from twisted.python.compat import nativeString +from twisted.python.failure import Failure +from twisted.web._stan import CDATA, CharRef, Comment, Tag, slot, voidElements +from twisted.web.error import FlattenerError, UnfilledSlot, UnsupportedType +from twisted.web.iweb import IRenderable, IRequest + +T = TypeVar("T") + +FlattenableRecursive = Any +""" +For documentation purposes, read C{FlattenableRecursive} as L{Flattenable}. +However, since mypy doesn't support recursive type definitions (yet?), +we'll put Any in the actual definition. +""" + +Flattenable = Union[ + bytes, + str, + slot, + CDATA, + Comment, + Tag, + Tuple[FlattenableRecursive, ...], + List[FlattenableRecursive], + Generator[FlattenableRecursive, None, None], + CharRef, + Deferred[FlattenableRecursive], + Coroutine[Deferred[FlattenableRecursive], object, FlattenableRecursive], + IRenderable, +] +""" +Type alias containing all types that can be flattened by L{flatten()}. +""" + +# The maximum number of bytes to synchronously accumulate in the flattener +# buffer before delivering them onwards. +BUFFER_SIZE = 2**16 + + +def escapeForContent(data: Union[bytes, str]) -> bytes: + """ + Escape some character or UTF-8 byte data for inclusion in an HTML or XML + document, by replacing metacharacters (C{&<>}) with their entity + equivalents (C{&<>}). + + This is used as an input to L{_flattenElement}'s C{dataEscaper} parameter. + + @param data: The string to escape. + + @return: The quoted form of C{data}. If C{data} is L{str}, return a utf-8 + encoded string. + """ + if isinstance(data, str): + data = data.encode("utf-8") + data = data.replace(b"&", b"&").replace(b"<", b"<").replace(b">", b">") + return data + + +def attributeEscapingDoneOutside(data: Union[bytes, str]) -> bytes: + """ + Escape some character or UTF-8 byte data for inclusion in the top level of + an attribute. L{attributeEscapingDoneOutside} actually passes the data + through unchanged, because L{writeWithAttributeEscaping} handles the + quoting of the text within attributes outside the generator returned by + L{_flattenElement}; this is used as the C{dataEscaper} argument to that + L{_flattenElement} call so that that generator does not redundantly escape + its text output. + + @param data: The string to escape. + + @return: The string, unchanged, except for encoding. + """ + if isinstance(data, str): + return data.encode("utf-8") + return data + + +def writeWithAttributeEscaping( + write: Callable[[bytes], object] +) -> Callable[[bytes], None]: + """ + Decorate a C{write} callable so that all output written is properly quoted + for inclusion within an XML attribute value. + + If a L{Tag <twisted.web.template.Tag>} C{x} is flattened within the context + of the contents of another L{Tag <twisted.web.template.Tag>} C{y}, the + metacharacters (C{<>&"}) delimiting C{x} should be passed through + unchanged, but the textual content of C{x} should still be quoted, as + usual. For example: C{<y><x>&</x></y>}. That is the default behavior + of L{_flattenElement} when L{escapeForContent} is passed as the + C{dataEscaper}. + + However, when a L{Tag <twisted.web.template.Tag>} C{x} is flattened within + the context of an I{attribute} of another L{Tag <twisted.web.template.Tag>} + C{y}, then the metacharacters delimiting C{x} should be quoted so that it + can be parsed from the attribute's value. In the DOM itself, this is not a + valid thing to do, but given that renderers and slots may be freely moved + around in a L{twisted.web.template} template, it is a condition which may + arise in a document and must be handled in a way which produces valid + output. So, for example, you should be able to get C{<y attr="<x />" + />}. This should also be true for other XML/HTML meta-constructs such as + comments and CDATA, so if you were to serialize a L{comment + <twisted.web.template.Comment>} in an attribute you should get C{<y + attr="<-- comment -->" />}. Therefore in order to capture these + meta-characters, flattening is done with C{write} callable that is wrapped + with L{writeWithAttributeEscaping}. + + The final case, and hopefully the much more common one as compared to + serializing L{Tag <twisted.web.template.Tag>} and arbitrary L{IRenderable} + objects within an attribute, is to serialize a simple string, and those + should be passed through for L{writeWithAttributeEscaping} to quote + without applying a second, redundant level of quoting. + + @param write: A callable which will be invoked with the escaped L{bytes}. + + @return: A callable that writes data with escaping. + """ + + def _write(data: bytes) -> None: + write(escapeForContent(data).replace(b'"', b""")) + + return _write + + +def escapedCDATA(data: Union[bytes, str]) -> bytes: + """ + Escape CDATA for inclusion in a document. + + @param data: The string to escape. + + @return: The quoted form of C{data}. If C{data} is unicode, return a utf-8 + encoded string. + """ + if isinstance(data, str): + data = data.encode("utf-8") + return data.replace(b"]]>", b"]]]]><![CDATA[>") + + +def escapedComment(data: Union[bytes, str]) -> bytes: + """ + Within comments the sequence C{-->} can be mistaken as the end of the comment. + To ensure consistent parsing and valid output the sequence is replaced with C{-->}. + Furthermore, whitespace is added when a comment ends in a dash. This is done to break + the connection of the ending C{-} with the closing C{-->}. + + @param data: The string to escape. + + @return: The quoted form of C{data}. If C{data} is unicode, return a utf-8 + encoded string. + """ + if isinstance(data, str): + data = data.encode("utf-8") + data = data.replace(b"-->", b"-->") + if data and data[-1:] == b"-": + data += b" " + return data + + +def _getSlotValue( + name: str, + slotData: Sequence[Optional[Mapping[str, Flattenable]]], + default: Optional[Flattenable] = None, +) -> Flattenable: + """ + Find the value of the named slot in the given stack of slot data. + """ + for slotFrame in reversed(slotData): + if slotFrame is not None and name in slotFrame: + return slotFrame[name] + else: + if default is not None: + return default + raise UnfilledSlot(name) + + +def _fork(d: Deferred[T]) -> Deferred[T]: + """ + Create a new L{Deferred} based on C{d} that will fire and fail with C{d}'s + result or error, but will not modify C{d}'s callback type. + """ + d2: Deferred[T] = Deferred(lambda _: d.cancel()) + + def callback(result: T) -> T: + d2.callback(result) + return result + + def errback(failure: Failure) -> Failure: + d2.errback(failure) + return failure + + d.addCallbacks(callback, errback) + return d2 + + +def _flattenElement( + request: Optional[IRequest], + root: Flattenable, + write: Callable[[bytes], object], + slotData: List[Optional[Mapping[str, Flattenable]]], + renderFactory: Optional[IRenderable], + dataEscaper: Callable[[Union[bytes, str]], bytes], + # This is annotated as Generator[T, None, None] instead of Iterator[T] + # because mypy does not consider an Iterator to be an instance of + # GeneratorType. +) -> Generator[Union[Generator[Any, Any, Any], Deferred[Flattenable]], None, None]: + """ + Make C{root} slightly more flat by yielding all its immediate contents as + strings, deferreds or generators that are recursive calls to itself. + + @param request: A request object which will be passed to + L{IRenderable.render}. + + @param root: An object to be made flatter. This may be of type C{unicode}, + L{str}, L{slot}, L{Tag <twisted.web.template.Tag>}, L{tuple}, L{list}, + L{types.GeneratorType}, L{Deferred}, or an object that implements + L{IRenderable}. + + @param write: A callable which will be invoked with each L{bytes} produced + by flattening C{root}. + + @param slotData: A L{list} of L{dict} mapping L{str} slot names to data + with which those slots will be replaced. + + @param renderFactory: If not L{None}, an object that provides + L{IRenderable}. + + @param dataEscaper: A 1-argument callable which takes L{bytes} or + L{unicode} and returns L{bytes}, quoted as appropriate for the + rendering context. This is really only one of two values: + L{attributeEscapingDoneOutside} or L{escapeForContent}, depending on + whether the rendering context is within an attribute or not. See the + explanation in L{writeWithAttributeEscaping}. + + @return: An iterator that eventually writes L{bytes} to C{write}. + It can yield other iterators or L{Deferred}s; if it yields another + iterator, the caller will iterate it; if it yields a L{Deferred}, + the result of that L{Deferred} will be another generator, in which + case it is iterated. See L{_flattenTree} for the trampoline that + consumes said values. + """ + + def keepGoing( + newRoot: Flattenable, + dataEscaper: Callable[[Union[bytes, str]], bytes] = dataEscaper, + renderFactory: Optional[IRenderable] = renderFactory, + write: Callable[[bytes], object] = write, + ) -> Generator[Union[Flattenable, Deferred[Flattenable]], None, None]: + return _flattenElement( + request, newRoot, write, slotData, renderFactory, dataEscaper + ) + + def keepGoingAsync(result: Deferred[Flattenable]) -> Deferred[Flattenable]: + return result.addCallback(keepGoing) + + if isinstance(root, (bytes, str)): + write(dataEscaper(root)) + elif isinstance(root, slot): + slotValue = _getSlotValue(root.name, slotData, root.default) + yield keepGoing(slotValue) + elif isinstance(root, CDATA): + write(b"<![CDATA[") + write(escapedCDATA(root.data)) + write(b"]]>") + elif isinstance(root, Comment): + write(b"<!--") + write(escapedComment(root.data)) + write(b"-->") + elif isinstance(root, Tag): + slotData.append(root.slotData) + rendererName = root.render + if rendererName is not None: + if renderFactory is None: + raise ValueError( + f'Tag wants to be rendered by method "{rendererName}" ' + f"but is not contained in any IRenderable" + ) + rootClone = root.clone(False) + rootClone.render = None + renderMethod = renderFactory.lookupRenderMethod(rendererName) + result = renderMethod(request, rootClone) + yield keepGoing(result) + slotData.pop() + return + + if not root.tagName: + yield keepGoing(root.children) + return + + write(b"<") + if isinstance(root.tagName, str): + tagName = root.tagName.encode("ascii") + else: + tagName = root.tagName + write(tagName) + for k, v in root.attributes.items(): + if isinstance(k, str): + k = k.encode("ascii") + write(b" " + k + b'="') + # Serialize the contents of the attribute, wrapping the results of + # that serialization so that _everything_ is quoted. + yield keepGoing( + v, attributeEscapingDoneOutside, write=writeWithAttributeEscaping(write) + ) + write(b'"') + if root.children or nativeString(tagName) not in voidElements: + write(b">") + # Regardless of whether we're in an attribute or not, switch back + # to the escapeForContent dataEscaper. The contents of a tag must + # be quoted no matter what; in the top-level document, just so + # they're valid, and if they're within an attribute, they have to + # be quoted so that after applying the *un*-quoting required to re- + # parse the tag within the attribute, all the quoting is still + # correct. + yield keepGoing(root.children, escapeForContent) + write(b"</" + tagName + b">") + else: + write(b" />") + + elif isinstance(root, (tuple, list, GeneratorType)): + for element in root: + yield keepGoing(element) + elif isinstance(root, CharRef): + escaped = "&#%d;" % (root.ordinal,) + write(escaped.encode("ascii")) + elif isinstance(root, Deferred): + yield keepGoingAsync(_fork(root)) + elif iscoroutine(root): + yield keepGoingAsync( + Deferred.fromCoroutine( + cast(Coroutine[Deferred[Flattenable], object, Flattenable], root) + ) + ) + elif IRenderable.providedBy(root): + result = root.render(request) + yield keepGoing(result, renderFactory=root) + else: + raise UnsupportedType(root) + + +async def _flattenTree( + request: Optional[IRequest], root: Flattenable, write: Callable[[bytes], object] +) -> None: + """ + Make C{root} into an iterable of L{bytes} and L{Deferred} by doing a depth + first traversal of the tree. + + @param request: A request object which will be passed to + L{IRenderable.render}. + + @param root: An object to be made flatter. This may be of type C{unicode}, + L{bytes}, L{slot}, L{Tag <twisted.web.template.Tag>}, L{tuple}, + L{list}, L{types.GeneratorType}, L{Deferred}, or something providing + L{IRenderable}. + + @param write: A callable which will be invoked with each L{bytes} produced + by flattening C{root}. + + @return: A C{Deferred}-returning coroutine that resolves to C{None}. + """ + buf = [] + bufSize = 0 + + # Accumulate some bytes up to the buffer size so that we don't annoy the + # upstream writer with a million tiny string. + def bufferedWrite(bs: bytes) -> None: + nonlocal bufSize + buf.append(bs) + bufSize += len(bs) + if bufSize >= BUFFER_SIZE: + flushBuffer() + + # Deliver the buffered content to the upstream writer as a single string. + # This is how a "big enough" buffer gets delivered, how a buffer of any + # size is delivered before execution is suspended to wait for an + # asynchronous value, and how anything left in the buffer when we're + # finished is delivered. + def flushBuffer() -> None: + nonlocal bufSize + if bufSize > 0: + write(b"".join(buf)) + del buf[:] + bufSize = 0 + + stack: List[Generator[Any, Any, Any]] = [ + _flattenElement(request, root, bufferedWrite, [], None, escapeForContent) + ] + + while stack: + try: + frame = stack[-1].gi_frame + element = next(stack[-1]) + if isinstance(element, Deferred): + # Before suspending flattening for an unknown amount of time, + # flush whatever data we have collected so far. + flushBuffer() + element = await element + except StopIteration: + stack.pop() + except Exception as e: + stack.pop() + roots = [] + for generator in stack: + roots.append(generator.gi_frame.f_locals["root"]) + roots.append(frame.f_locals["root"]) + raise FlattenerError(e, roots, extract_tb(exc_info()[2])) + else: + stack.append(element) + + # Flush any data that remains in the buffer before finishing. + flushBuffer() + + +def flatten( + request: Optional[IRequest], root: Flattenable, write: Callable[[bytes], object] +) -> Deferred[None]: + """ + Incrementally write out a string representation of C{root} using C{write}. + + In order to create a string representation, C{root} will be decomposed into + simpler objects which will themselves be decomposed and so on until strings + or objects which can easily be converted to strings are encountered. + + @param request: A request object which will be passed to the C{render} + method of any L{IRenderable} provider which is encountered. + + @param root: An object to be made flatter. This may be of type L{str}, + L{bytes}, L{slot}, L{Tag <twisted.web.template.Tag>}, L{tuple}, + L{list}, L{types.GeneratorType}, L{Deferred}, or something that + provides L{IRenderable}. + + @param write: A callable which will be invoked with each L{bytes} produced + by flattening C{root}. + + @return: A L{Deferred} which will be called back with C{None} when C{root} + has been completely flattened into C{write} or which will be errbacked + if an unexpected exception occurs. + """ + return ensureDeferred(_flattenTree(request, root, write)) + + +def flattenString(request: Optional[IRequest], root: Flattenable) -> Deferred[bytes]: + """ + Collate a string representation of C{root} into a single string. + + This is basically gluing L{flatten} to an L{io.BytesIO} and returning + the results. See L{flatten} for the exact meanings of C{request} and + C{root}. + + @return: A L{Deferred} which will be called back with a single UTF-8 encoded + string as its result when C{root} has been completely flattened or which + will be errbacked if an unexpected exception occurs. + """ + io = BytesIO() + d = flatten(request, root, io.write) + d.addCallback(lambda _: io.getvalue()) + return cast(Deferred[bytes], d) |