diff options
author | monster <monster@ydb.tech> | 2022-07-07 14:41:37 +0300 |
---|---|---|
committer | monster <monster@ydb.tech> | 2022-07-07 14:41:37 +0300 |
commit | 06e5c21a835c0e923506c4ff27929f34e00761c2 (patch) | |
tree | 75efcbc6854ef9bd476eb8bf00cc5c900da436a2 /contrib/python/prompt-toolkit/py3/prompt_toolkit/layout/processors.py | |
parent | 03f024c4412e3aa613bb543cf1660176320ba8f4 (diff) | |
download | ydb-06e5c21a835c0e923506c4ff27929f34e00761c2.tar.gz |
fix ya.make
Diffstat (limited to 'contrib/python/prompt-toolkit/py3/prompt_toolkit/layout/processors.py')
-rw-r--r-- | contrib/python/prompt-toolkit/py3/prompt_toolkit/layout/processors.py | 1029 |
1 files changed, 0 insertions, 1029 deletions
diff --git a/contrib/python/prompt-toolkit/py3/prompt_toolkit/layout/processors.py b/contrib/python/prompt-toolkit/py3/prompt_toolkit/layout/processors.py deleted file mode 100644 index 722658a846..0000000000 --- a/contrib/python/prompt-toolkit/py3/prompt_toolkit/layout/processors.py +++ /dev/null @@ -1,1029 +0,0 @@ -""" -Processors are little transformation blocks that transform the fragments list -from a buffer before the BufferControl will render it to the screen. - -They can insert fragments before or after, or highlight fragments by replacing the -fragment types. -""" -import re -from abc import ABCMeta, abstractmethod -from typing import ( - TYPE_CHECKING, - Callable, - Hashable, - List, - Optional, - Tuple, - Type, - Union, - cast, -) - -from prompt_toolkit.application.current import get_app -from prompt_toolkit.cache import SimpleCache -from prompt_toolkit.document import Document -from prompt_toolkit.filters import FilterOrBool, to_filter, vi_insert_multiple_mode -from prompt_toolkit.formatted_text import ( - AnyFormattedText, - StyleAndTextTuples, - to_formatted_text, -) -from prompt_toolkit.formatted_text.utils import fragment_list_len, fragment_list_to_text -from prompt_toolkit.search import SearchDirection -from prompt_toolkit.utils import to_int, to_str - -from .utils import explode_text_fragments - -if TYPE_CHECKING: - from .controls import BufferControl, UIContent - -__all__ = [ - "Processor", - "TransformationInput", - "Transformation", - "DummyProcessor", - "HighlightSearchProcessor", - "HighlightIncrementalSearchProcessor", - "HighlightSelectionProcessor", - "PasswordProcessor", - "HighlightMatchingBracketProcessor", - "DisplayMultipleCursors", - "BeforeInput", - "ShowArg", - "AfterInput", - "AppendAutoSuggestion", - "ConditionalProcessor", - "ShowLeadingWhiteSpaceProcessor", - "ShowTrailingWhiteSpaceProcessor", - "TabsProcessor", - "ReverseSearchProcessor", - "DynamicProcessor", - "merge_processors", -] - - -class Processor(metaclass=ABCMeta): - """ - Manipulate the fragments for a given line in a - :class:`~prompt_toolkit.layout.controls.BufferControl`. - """ - - @abstractmethod - def apply_transformation( - self, transformation_input: "TransformationInput" - ) -> "Transformation": - """ - Apply transformation. Returns a :class:`.Transformation` instance. - - :param transformation_input: :class:`.TransformationInput` object. - """ - return Transformation(transformation_input.fragments) - - -SourceToDisplay = Callable[[int], int] -DisplayToSource = Callable[[int], int] - - -class TransformationInput: - """ - :param buffer_control: :class:`.BufferControl` instance. - :param lineno: The number of the line to which we apply the processor. - :param source_to_display: A function that returns the position in the - `fragments` for any position in the source string. (This takes - previous processors into account.) - :param fragments: List of fragments that we can transform. (Received from the - previous processor.) - """ - - def __init__( - self, - buffer_control: "BufferControl", - document: Document, - lineno: int, - source_to_display: SourceToDisplay, - fragments: StyleAndTextTuples, - width: int, - height: int, - ) -> None: - - self.buffer_control = buffer_control - self.document = document - self.lineno = lineno - self.source_to_display = source_to_display - self.fragments = fragments - self.width = width - self.height = height - - def unpack( - self, - ) -> Tuple[ - "BufferControl", Document, int, SourceToDisplay, StyleAndTextTuples, int, int - ]: - return ( - self.buffer_control, - self.document, - self.lineno, - self.source_to_display, - self.fragments, - self.width, - self.height, - ) - - -class Transformation: - """ - Transformation result, as returned by :meth:`.Processor.apply_transformation`. - - Important: Always make sure that the length of `document.text` is equal to - the length of all the text in `fragments`! - - :param fragments: The transformed fragments. To be displayed, or to pass to - the next processor. - :param source_to_display: Cursor position transformation from original - string to transformed string. - :param display_to_source: Cursor position transformed from source string to - original string. - """ - - def __init__( - self, - fragments: StyleAndTextTuples, - source_to_display: Optional[SourceToDisplay] = None, - display_to_source: Optional[DisplayToSource] = None, - ) -> None: - - self.fragments = fragments - self.source_to_display = source_to_display or (lambda i: i) - self.display_to_source = display_to_source or (lambda i: i) - - -class DummyProcessor(Processor): - """ - A `Processor` that doesn't do anything. - """ - - def apply_transformation( - self, transformation_input: TransformationInput - ) -> Transformation: - return Transformation(transformation_input.fragments) - - -class HighlightSearchProcessor(Processor): - """ - Processor that highlights search matches in the document. - Note that this doesn't support multiline search matches yet. - - The style classes 'search' and 'search.current' will be applied to the - content. - """ - - _classname = "search" - _classname_current = "search.current" - - def _get_search_text(self, buffer_control: "BufferControl") -> str: - """ - The text we are searching for. - """ - return buffer_control.search_state.text - - def apply_transformation( - self, transformation_input: TransformationInput - ) -> Transformation: - - ( - buffer_control, - document, - lineno, - source_to_display, - fragments, - _, - _, - ) = transformation_input.unpack() - - search_text = self._get_search_text(buffer_control) - searchmatch_fragment = f" class:{self._classname} " - searchmatch_current_fragment = f" class:{self._classname_current} " - - if search_text and not get_app().is_done: - # For each search match, replace the style string. - line_text = fragment_list_to_text(fragments) - fragments = explode_text_fragments(fragments) - - if buffer_control.search_state.ignore_case(): - flags = re.IGNORECASE - else: - flags = re.RegexFlag(0) - - # Get cursor column. - cursor_column: Optional[int] - if document.cursor_position_row == lineno: - cursor_column = source_to_display(document.cursor_position_col) - else: - cursor_column = None - - for match in re.finditer(re.escape(search_text), line_text, flags=flags): - if cursor_column is not None: - on_cursor = match.start() <= cursor_column < match.end() - else: - on_cursor = False - - for i in range(match.start(), match.end()): - old_fragment, text, *_ = fragments[i] - if on_cursor: - fragments[i] = ( - old_fragment + searchmatch_current_fragment, - fragments[i][1], - ) - else: - fragments[i] = ( - old_fragment + searchmatch_fragment, - fragments[i][1], - ) - - return Transformation(fragments) - - -class HighlightIncrementalSearchProcessor(HighlightSearchProcessor): - """ - Highlight the search terms that are used for highlighting the incremental - search. The style class 'incsearch' will be applied to the content. - - Important: this requires the `preview_search=True` flag to be set for the - `BufferControl`. Otherwise, the cursor position won't be set to the search - match while searching, and nothing happens. - """ - - _classname = "incsearch" - _classname_current = "incsearch.current" - - def _get_search_text(self, buffer_control: "BufferControl") -> str: - """ - The text we are searching for. - """ - # When the search buffer has focus, take that text. - search_buffer = buffer_control.search_buffer - if search_buffer is not None and search_buffer.text: - return search_buffer.text - return "" - - -class HighlightSelectionProcessor(Processor): - """ - Processor that highlights the selection in the document. - """ - - def apply_transformation( - self, transformation_input: TransformationInput - ) -> Transformation: - ( - buffer_control, - document, - lineno, - source_to_display, - fragments, - _, - _, - ) = transformation_input.unpack() - - selected_fragment = " class:selected " - - # In case of selection, highlight all matches. - selection_at_line = document.selection_range_at_line(lineno) - - if selection_at_line: - from_, to = selection_at_line - from_ = source_to_display(from_) - to = source_to_display(to) - - fragments = explode_text_fragments(fragments) - - if from_ == 0 and to == 0 and len(fragments) == 0: - # When this is an empty line, insert a space in order to - # visualise the selection. - return Transformation([(selected_fragment, " ")]) - else: - for i in range(from_, to): - if i < len(fragments): - old_fragment, old_text, *_ = fragments[i] - fragments[i] = (old_fragment + selected_fragment, old_text) - elif i == len(fragments): - fragments.append((selected_fragment, " ")) - - return Transformation(fragments) - - -class PasswordProcessor(Processor): - """ - Processor that masks the input. (For passwords.) - - :param char: (string) Character to be used. "*" by default. - """ - - def __init__(self, char: str = "*") -> None: - self.char = char - - def apply_transformation(self, ti: TransformationInput) -> Transformation: - fragments: StyleAndTextTuples = cast( - StyleAndTextTuples, - [ - (style, self.char * len(text), *handler) - for style, text, *handler in ti.fragments - ], - ) - - return Transformation(fragments) - - -class HighlightMatchingBracketProcessor(Processor): - """ - When the cursor is on or right after a bracket, it highlights the matching - bracket. - - :param max_cursor_distance: Only highlight matching brackets when the - cursor is within this distance. (From inside a `Processor`, we can't - know which lines will be visible on the screen. But we also don't want - to scan the whole document for matching brackets on each key press, so - we limit to this value.) - """ - - _closing_braces = "])}>" - - def __init__( - self, chars: str = "[](){}<>", max_cursor_distance: int = 1000 - ) -> None: - self.chars = chars - self.max_cursor_distance = max_cursor_distance - - self._positions_cache: SimpleCache[ - Hashable, List[Tuple[int, int]] - ] = SimpleCache(maxsize=8) - - def _get_positions_to_highlight(self, document: Document) -> List[Tuple[int, int]]: - """ - Return a list of (row, col) tuples that need to be highlighted. - """ - pos: Optional[int] - - # Try for the character under the cursor. - if document.current_char and document.current_char in self.chars: - pos = document.find_matching_bracket_position( - start_pos=document.cursor_position - self.max_cursor_distance, - end_pos=document.cursor_position + self.max_cursor_distance, - ) - - # Try for the character before the cursor. - elif ( - document.char_before_cursor - and document.char_before_cursor in self._closing_braces - and document.char_before_cursor in self.chars - ): - document = Document(document.text, document.cursor_position - 1) - - pos = document.find_matching_bracket_position( - start_pos=document.cursor_position - self.max_cursor_distance, - end_pos=document.cursor_position + self.max_cursor_distance, - ) - else: - pos = None - - # Return a list of (row, col) tuples that need to be highlighted. - if pos: - pos += document.cursor_position # pos is relative. - row, col = document.translate_index_to_position(pos) - return [ - (row, col), - (document.cursor_position_row, document.cursor_position_col), - ] - else: - return [] - - def apply_transformation( - self, transformation_input: TransformationInput - ) -> Transformation: - - ( - buffer_control, - document, - lineno, - source_to_display, - fragments, - _, - _, - ) = transformation_input.unpack() - - # When the application is in the 'done' state, don't highlight. - if get_app().is_done: - return Transformation(fragments) - - # Get the highlight positions. - key = (get_app().render_counter, document.text, document.cursor_position) - positions = self._positions_cache.get( - key, lambda: self._get_positions_to_highlight(document) - ) - - # Apply if positions were found at this line. - if positions: - for row, col in positions: - if row == lineno: - col = source_to_display(col) - fragments = explode_text_fragments(fragments) - style, text, *_ = fragments[col] - - if col == document.cursor_position_col: - style += " class:matching-bracket.cursor " - else: - style += " class:matching-bracket.other " - - fragments[col] = (style, text) - - return Transformation(fragments) - - -class DisplayMultipleCursors(Processor): - """ - When we're in Vi block insert mode, display all the cursors. - """ - - def apply_transformation( - self, transformation_input: TransformationInput - ) -> Transformation: - - ( - buffer_control, - document, - lineno, - source_to_display, - fragments, - _, - _, - ) = transformation_input.unpack() - - buff = buffer_control.buffer - - if vi_insert_multiple_mode(): - cursor_positions = buff.multiple_cursor_positions - fragments = explode_text_fragments(fragments) - - # If any cursor appears on the current line, highlight that. - start_pos = document.translate_row_col_to_index(lineno, 0) - end_pos = start_pos + len(document.lines[lineno]) - - fragment_suffix = " class:multiple-cursors" - - for p in cursor_positions: - if start_pos <= p <= end_pos: - column = source_to_display(p - start_pos) - - # Replace fragment. - try: - style, text, *_ = fragments[column] - except IndexError: - # Cursor needs to be displayed after the current text. - fragments.append((fragment_suffix, " ")) - else: - style += fragment_suffix - fragments[column] = (style, text) - - return Transformation(fragments) - else: - return Transformation(fragments) - - -class BeforeInput(Processor): - """ - Insert text before the input. - - :param text: This can be either plain text or formatted text - (or a callable that returns any of those). - :param style: style to be applied to this prompt/prefix. - """ - - def __init__(self, text: AnyFormattedText, style: str = "") -> None: - self.text = text - self.style = style - - def apply_transformation(self, ti: TransformationInput) -> Transformation: - source_to_display: Optional[SourceToDisplay] - display_to_source: Optional[DisplayToSource] - - if ti.lineno == 0: - # Get fragments. - fragments_before = to_formatted_text(self.text, self.style) - fragments = fragments_before + ti.fragments - - shift_position = fragment_list_len(fragments_before) - source_to_display = lambda i: i + shift_position - display_to_source = lambda i: i - shift_position - else: - fragments = ti.fragments - source_to_display = None - display_to_source = None - - return Transformation( - fragments, - source_to_display=source_to_display, - display_to_source=display_to_source, - ) - - def __repr__(self) -> str: - return f"BeforeInput({self.text!r}, {self.style!r})" - - -class ShowArg(BeforeInput): - """ - Display the 'arg' in front of the input. - - This was used by the `PromptSession`, but now it uses the - `Window.get_line_prefix` function instead. - """ - - def __init__(self) -> None: - super().__init__(self._get_text_fragments) - - def _get_text_fragments(self) -> StyleAndTextTuples: - app = get_app() - if app.key_processor.arg is None: - return [] - else: - arg = app.key_processor.arg - - return [ - ("class:prompt.arg", "(arg: "), - ("class:prompt.arg.text", str(arg)), - ("class:prompt.arg", ") "), - ] - - def __repr__(self) -> str: - return "ShowArg()" - - -class AfterInput(Processor): - """ - Insert text after the input. - - :param text: This can be either plain text or formatted text - (or a callable that returns any of those). - :param style: style to be applied to this prompt/prefix. - """ - - def __init__(self, text: AnyFormattedText, style: str = "") -> None: - self.text = text - self.style = style - - def apply_transformation(self, ti: TransformationInput) -> Transformation: - # Insert fragments after the last line. - if ti.lineno == ti.document.line_count - 1: - # Get fragments. - fragments_after = to_formatted_text(self.text, self.style) - return Transformation(fragments=ti.fragments + fragments_after) - else: - return Transformation(fragments=ti.fragments) - - def __repr__(self) -> str: - return f"{self.__class__.__name__}({self.text!r}, style={self.style!r})" - - -class AppendAutoSuggestion(Processor): - """ - Append the auto suggestion to the input. - (The user can then press the right arrow the insert the suggestion.) - """ - - def __init__(self, style: str = "class:auto-suggestion") -> None: - self.style = style - - def apply_transformation(self, ti: TransformationInput) -> Transformation: - # Insert fragments after the last line. - if ti.lineno == ti.document.line_count - 1: - buffer = ti.buffer_control.buffer - - if buffer.suggestion and ti.document.is_cursor_at_the_end: - suggestion = buffer.suggestion.text - else: - suggestion = "" - - return Transformation(fragments=ti.fragments + [(self.style, suggestion)]) - else: - return Transformation(fragments=ti.fragments) - - -class ShowLeadingWhiteSpaceProcessor(Processor): - """ - Make leading whitespace visible. - - :param get_char: Callable that returns one character. - """ - - def __init__( - self, - get_char: Optional[Callable[[], str]] = None, - style: str = "class:leading-whitespace", - ) -> None: - def default_get_char() -> str: - if "\xb7".encode(get_app().output.encoding(), "replace") == b"?": - return "." - else: - return "\xb7" - - self.style = style - self.get_char = get_char or default_get_char - - def apply_transformation(self, ti: TransformationInput) -> Transformation: - fragments = ti.fragments - - # Walk through all te fragments. - if fragments and fragment_list_to_text(fragments).startswith(" "): - t = (self.style, self.get_char()) - fragments = explode_text_fragments(fragments) - - for i in range(len(fragments)): - if fragments[i][1] == " ": - fragments[i] = t - else: - break - - return Transformation(fragments) - - -class ShowTrailingWhiteSpaceProcessor(Processor): - """ - Make trailing whitespace visible. - - :param get_char: Callable that returns one character. - """ - - def __init__( - self, - get_char: Optional[Callable[[], str]] = None, - style: str = "class:training-whitespace", - ) -> None: - def default_get_char() -> str: - if "\xb7".encode(get_app().output.encoding(), "replace") == b"?": - return "." - else: - return "\xb7" - - self.style = style - self.get_char = get_char or default_get_char - - def apply_transformation(self, ti: TransformationInput) -> Transformation: - fragments = ti.fragments - - if fragments and fragments[-1][1].endswith(" "): - t = (self.style, self.get_char()) - fragments = explode_text_fragments(fragments) - - # Walk backwards through all te fragments and replace whitespace. - for i in range(len(fragments) - 1, -1, -1): - char = fragments[i][1] - if char == " ": - fragments[i] = t - else: - break - - return Transformation(fragments) - - -class TabsProcessor(Processor): - """ - Render tabs as spaces (instead of ^I) or make them visible (for instance, - by replacing them with dots.) - - :param tabstop: Horizontal space taken by a tab. (`int` or callable that - returns an `int`). - :param char1: Character or callable that returns a character (text of - length one). This one is used for the first space taken by the tab. - :param char2: Like `char1`, but for the rest of the space. - """ - - def __init__( - self, - tabstop: Union[int, Callable[[], int]] = 4, - char1: Union[str, Callable[[], str]] = "|", - char2: Union[str, Callable[[], str]] = "\u2508", - style: str = "class:tab", - ) -> None: - - self.char1 = char1 - self.char2 = char2 - self.tabstop = tabstop - self.style = style - - def apply_transformation(self, ti: TransformationInput) -> Transformation: - tabstop = to_int(self.tabstop) - style = self.style - - # Create separator for tabs. - separator1 = to_str(self.char1) - separator2 = to_str(self.char2) - - # Transform fragments. - fragments = explode_text_fragments(ti.fragments) - - position_mappings = {} - result_fragments: StyleAndTextTuples = [] - pos = 0 - - for i, fragment_and_text in enumerate(fragments): - position_mappings[i] = pos - - if fragment_and_text[1] == "\t": - # Calculate how many characters we have to insert. - count = tabstop - (pos % tabstop) - if count == 0: - count = tabstop - - # Insert tab. - result_fragments.append((style, separator1)) - result_fragments.append((style, separator2 * (count - 1))) - pos += count - else: - result_fragments.append(fragment_and_text) - pos += 1 - - position_mappings[len(fragments)] = pos - # Add `pos+1` to mapping, because the cursor can be right after the - # line as well. - position_mappings[len(fragments) + 1] = pos + 1 - - def source_to_display(from_position: int) -> int: - "Maps original cursor position to the new one." - return position_mappings[from_position] - - def display_to_source(display_pos: int) -> int: - "Maps display cursor position to the original one." - position_mappings_reversed = {v: k for k, v in position_mappings.items()} - - while display_pos >= 0: - try: - return position_mappings_reversed[display_pos] - except KeyError: - display_pos -= 1 - return 0 - - return Transformation( - result_fragments, - source_to_display=source_to_display, - display_to_source=display_to_source, - ) - - -class ReverseSearchProcessor(Processor): - """ - Process to display the "(reverse-i-search)`...`:..." stuff around - the search buffer. - - Note: This processor is meant to be applied to the BufferControl that - contains the search buffer, it's not meant for the original input. - """ - - _excluded_input_processors: List[Type[Processor]] = [ - HighlightSearchProcessor, - HighlightSelectionProcessor, - BeforeInput, - AfterInput, - ] - - def _get_main_buffer( - self, buffer_control: "BufferControl" - ) -> Optional["BufferControl"]: - from prompt_toolkit.layout.controls import BufferControl - - prev_control = get_app().layout.search_target_buffer_control - if ( - isinstance(prev_control, BufferControl) - and prev_control.search_buffer_control == buffer_control - ): - return prev_control - return None - - def _content( - self, main_control: "BufferControl", ti: TransformationInput - ) -> "UIContent": - from prompt_toolkit.layout.controls import BufferControl - - # Emulate the BufferControl through which we are searching. - # For this we filter out some of the input processors. - excluded_processors = tuple(self._excluded_input_processors) - - def filter_processor(item: Processor) -> Optional[Processor]: - """Filter processors from the main control that we want to disable - here. This returns either an accepted processor or None.""" - # For a `_MergedProcessor`, check each individual processor, recursively. - if isinstance(item, _MergedProcessor): - accepted_processors = [filter_processor(p) for p in item.processors] - return merge_processors( - [p for p in accepted_processors if p is not None] - ) - - # For a `ConditionalProcessor`, check the body. - elif isinstance(item, ConditionalProcessor): - p = filter_processor(item.processor) - if p: - return ConditionalProcessor(p, item.filter) - - # Otherwise, check the processor itself. - else: - if not isinstance(item, excluded_processors): - return item - - return None - - filtered_processor = filter_processor( - merge_processors(main_control.input_processors or []) - ) - highlight_processor = HighlightIncrementalSearchProcessor() - - if filtered_processor: - new_processors = [filtered_processor, highlight_processor] - else: - new_processors = [highlight_processor] - - from .controls import SearchBufferControl - - assert isinstance(ti.buffer_control, SearchBufferControl) - - buffer_control = BufferControl( - buffer=main_control.buffer, - input_processors=new_processors, - include_default_input_processors=False, - lexer=main_control.lexer, - preview_search=True, - search_buffer_control=ti.buffer_control, - ) - - return buffer_control.create_content(ti.width, ti.height, preview_search=True) - - def apply_transformation(self, ti: TransformationInput) -> Transformation: - from .controls import SearchBufferControl - - assert isinstance( - ti.buffer_control, SearchBufferControl - ), "`ReverseSearchProcessor` should be applied to a `SearchBufferControl` only." - - source_to_display: Optional[SourceToDisplay] - display_to_source: Optional[DisplayToSource] - - main_control = self._get_main_buffer(ti.buffer_control) - - if ti.lineno == 0 and main_control: - content = self._content(main_control, ti) - - # Get the line from the original document for this search. - line_fragments = content.get_line(content.cursor_position.y) - - if main_control.search_state.direction == SearchDirection.FORWARD: - direction_text = "i-search" - else: - direction_text = "reverse-i-search" - - fragments_before: StyleAndTextTuples = [ - ("class:prompt.search", "("), - ("class:prompt.search", direction_text), - ("class:prompt.search", ")`"), - ] - - fragments = ( - fragments_before - + [ - ("class:prompt.search.text", fragment_list_to_text(ti.fragments)), - ("", "': "), - ] - + line_fragments - ) - - shift_position = fragment_list_len(fragments_before) - source_to_display = lambda i: i + shift_position - display_to_source = lambda i: i - shift_position - else: - source_to_display = None - display_to_source = None - fragments = ti.fragments - - return Transformation( - fragments, - source_to_display=source_to_display, - display_to_source=display_to_source, - ) - - -class ConditionalProcessor(Processor): - """ - Processor that applies another processor, according to a certain condition. - Example:: - - # Create a function that returns whether or not the processor should - # currently be applied. - def highlight_enabled(): - return true_or_false - - # Wrapped it in a `ConditionalProcessor` for usage in a `BufferControl`. - BufferControl(input_processors=[ - ConditionalProcessor(HighlightSearchProcessor(), - Condition(highlight_enabled))]) - - :param processor: :class:`.Processor` instance. - :param filter: :class:`~prompt_toolkit.filters.Filter` instance. - """ - - def __init__(self, processor: Processor, filter: FilterOrBool) -> None: - self.processor = processor - self.filter = to_filter(filter) - - def apply_transformation( - self, transformation_input: TransformationInput - ) -> Transformation: - # Run processor when enabled. - if self.filter(): - return self.processor.apply_transformation(transformation_input) - else: - return Transformation(transformation_input.fragments) - - def __repr__(self) -> str: - return "{}(processor={!r}, filter={!r})".format( - self.__class__.__name__, - self.processor, - self.filter, - ) - - -class DynamicProcessor(Processor): - """ - Processor class that dynamically returns any Processor. - - :param get_processor: Callable that returns a :class:`.Processor` instance. - """ - - def __init__(self, get_processor: Callable[[], Optional[Processor]]) -> None: - self.get_processor = get_processor - - def apply_transformation(self, ti: TransformationInput) -> Transformation: - processor = self.get_processor() or DummyProcessor() - return processor.apply_transformation(ti) - - -def merge_processors(processors: List[Processor]) -> Processor: - """ - Merge multiple `Processor` objects into one. - """ - if len(processors) == 0: - return DummyProcessor() - - if len(processors) == 1: - return processors[0] # Nothing to merge. - - return _MergedProcessor(processors) - - -class _MergedProcessor(Processor): - """ - Processor that groups multiple other `Processor` objects, but exposes an - API as if it is one `Processor`. - """ - - def __init__(self, processors: List[Processor]): - self.processors = processors - - def apply_transformation(self, ti: TransformationInput) -> Transformation: - source_to_display_functions = [ti.source_to_display] - display_to_source_functions = [] - fragments = ti.fragments - - def source_to_display(i: int) -> int: - """Translate x position from the buffer to the x position in the - processor fragments list.""" - for f in source_to_display_functions: - i = f(i) - return i - - for p in self.processors: - transformation = p.apply_transformation( - TransformationInput( - ti.buffer_control, - ti.document, - ti.lineno, - source_to_display, - fragments, - ti.width, - ti.height, - ) - ) - fragments = transformation.fragments - display_to_source_functions.append(transformation.display_to_source) - source_to_display_functions.append(transformation.source_to_display) - - def display_to_source(i: int) -> int: - for f in reversed(display_to_source_functions): - i = f(i) - return i - - # In the case of a nested _MergedProcessor, each processor wants to - # receive a 'source_to_display' function (as part of the - # TransformationInput) that has everything in the chain before - # included, because it can be called as part of the - # `apply_transformation` function. However, this first - # `source_to_display` should not be part of the output that we are - # returning. (This is the most consistent with `display_to_source`.) - del source_to_display_functions[:1] - - return Transformation(fragments, source_to_display, display_to_source) |