diff options
author | shadchin <shadchin@yandex-team.ru> | 2022-02-10 16:44:30 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:44:30 +0300 |
commit | 2598ef1d0aee359b4b6d5fdd1758916d5907d04f (patch) | |
tree | 012bb94d777798f1f56ac1cec429509766d05181 /contrib/python/prompt-toolkit/py3/prompt_toolkit/buffer.py | |
parent | 6751af0b0c1b952fede40b19b71da8025b5d8bcf (diff) | |
download | ydb-2598ef1d0aee359b4b6d5fdd1758916d5907d04f.tar.gz |
Restoring authorship annotation for <shadchin@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/python/prompt-toolkit/py3/prompt_toolkit/buffer.py')
-rw-r--r-- | contrib/python/prompt-toolkit/py3/prompt_toolkit/buffer.py | 4026 |
1 files changed, 2013 insertions, 2013 deletions
diff --git a/contrib/python/prompt-toolkit/py3/prompt_toolkit/buffer.py b/contrib/python/prompt-toolkit/py3/prompt_toolkit/buffer.py index 105d3cc07d..67956d8f54 100644 --- a/contrib/python/prompt-toolkit/py3/prompt_toolkit/buffer.py +++ b/contrib/python/prompt-toolkit/py3/prompt_toolkit/buffer.py @@ -1,2014 +1,2014 @@ -""" -Data structures for the Buffer. -It holds the text, cursor position, history, etc... -""" -import asyncio -import logging -import os -import re -import shlex -import shutil -import subprocess -import tempfile -from collections import deque -from enum import Enum -from functools import wraps -from typing import ( - Any, - Awaitable, - Callable, - Deque, - Iterable, - List, - Optional, - Set, - Tuple, - TypeVar, - Union, - cast, -) - -from .application.current import get_app -from .application.run_in_terminal import run_in_terminal -from .auto_suggest import AutoSuggest, Suggestion -from .cache import FastDictCache -from .clipboard import ClipboardData -from .completion import ( - CompleteEvent, - Completer, - Completion, - DummyCompleter, - get_common_complete_suffix, -) -from .document import Document -from .filters import FilterOrBool, to_filter -from .history import History, InMemoryHistory -from .search import SearchDirection, SearchState -from .selection import PasteMode, SelectionState, SelectionType -from .utils import Event, to_str -from .validation import ValidationError, Validator - -__all__ = [ - "EditReadOnlyBuffer", - "Buffer", - "CompletionState", - "indent", - "unindent", - "reshape_text", -] - -logger = logging.getLogger(__name__) - - -class EditReadOnlyBuffer(Exception): - "Attempt editing of read-only :class:`.Buffer`." - - -class ValidationState(Enum): - "The validation state of a buffer. This is set after the validation." - VALID = "VALID" - INVALID = "INVALID" - UNKNOWN = "UNKNOWN" - - -class CompletionState: - """ - Immutable class that contains a completion state. - """ - - def __init__( - self, - original_document: "Document", - completions: Optional[List["Completion"]] = None, - complete_index: Optional[int] = None, - ) -> None: - - #: Document as it was when the completion started. - self.original_document = original_document - - #: List of all the current Completion instances which are possible at - #: this point. - self.completions = completions or [] - - #: Position in the `completions` array. - #: This can be `None` to indicate "no completion", the original text. - self.complete_index = complete_index # Position in the `_completions` array. - - def __repr__(self) -> str: - return "%s(%r, <%r> completions, index=%r)" % ( - self.__class__.__name__, - self.original_document, - len(self.completions), - self.complete_index, - ) - - def go_to_index(self, index: Optional[int]) -> None: - """ - Create a new :class:`.CompletionState` object with the new index. - - When `index` is `None` deselect the completion. - """ - if self.completions: - assert index is None or 0 <= index < len(self.completions) - self.complete_index = index - - def new_text_and_position(self) -> Tuple[str, int]: - """ - Return (new_text, new_cursor_position) for this completion. - """ - if self.complete_index is None: - return self.original_document.text, self.original_document.cursor_position - else: - original_text_before_cursor = self.original_document.text_before_cursor - original_text_after_cursor = self.original_document.text_after_cursor - - c = self.completions[self.complete_index] - if c.start_position == 0: - before = original_text_before_cursor - else: - before = original_text_before_cursor[: c.start_position] - - new_text = before + c.text + original_text_after_cursor - new_cursor_position = len(before) + len(c.text) - return new_text, new_cursor_position - - @property - def current_completion(self) -> Optional["Completion"]: - """ - Return the current completion, or return `None` when no completion is - selected. - """ - if self.complete_index is not None: - return self.completions[self.complete_index] - return None - - -_QUOTED_WORDS_RE = re.compile(r"""(\s+|".*?"|'.*?')""") - - -class YankNthArgState: - """ - For yank-last-arg/yank-nth-arg: Keep track of where we are in the history. - """ - - def __init__( - self, history_position: int = 0, n: int = -1, previous_inserted_word: str = "" - ) -> None: - - self.history_position = history_position - self.previous_inserted_word = previous_inserted_word - self.n = n - - def __repr__(self) -> str: - return "%s(history_position=%r, n=%r, previous_inserted_word=%r)" % ( - self.__class__.__name__, - self.history_position, - self.n, - self.previous_inserted_word, - ) - - -BufferEventHandler = Callable[["Buffer"], None] -BufferAcceptHandler = Callable[["Buffer"], bool] - - -class Buffer: - """ - The core data structure that holds the text and cursor position of the - current input line and implements all text manipulations on top of it. It - also implements the history, undo stack and the completion state. - - :param completer: :class:`~prompt_toolkit.completion.Completer` instance. - :param history: :class:`~prompt_toolkit.history.History` instance. - :param tempfile_suffix: The tempfile suffix (extension) to be used for the - "open in editor" function. For a Python REPL, this would be ".py", so - that the editor knows the syntax highlighting to use. This can also be - a callable that returns a string. - :param tempfile: For more advanced tempfile situations where you need - control over the subdirectories and filename. For a Git Commit Message, - this would be ".git/COMMIT_EDITMSG", so that the editor knows the syntax - highlighting to use. This can also be a callable that returns a string. - :param name: Name for this buffer. E.g. DEFAULT_BUFFER. This is mostly - useful for key bindings where we sometimes prefer to refer to a buffer - by their name instead of by reference. - :param accept_handler: Called when the buffer input is accepted. (Usually - when the user presses `enter`.) The accept handler receives this - `Buffer` as input and should return True when the buffer text should be - kept instead of calling reset. - - In case of a `PromptSession` for instance, we want to keep the text, - because we will exit the application, and only reset it during the next - run. - - Events: - - :param on_text_changed: When the buffer text changes. (Callable or None.) - :param on_text_insert: When new text is inserted. (Callable or None.) - :param on_cursor_position_changed: When the cursor moves. (Callable or None.) - :param on_completions_changed: When the completions were changed. (Callable or None.) - :param on_suggestion_set: When an auto-suggestion text has been set. (Callable or None.) - - Filters: - - :param complete_while_typing: :class:`~prompt_toolkit.filters.Filter` - or `bool`. Decide whether or not to do asynchronous autocompleting while - typing. - :param validate_while_typing: :class:`~prompt_toolkit.filters.Filter` - or `bool`. Decide whether or not to do asynchronous validation while - typing. - :param enable_history_search: :class:`~prompt_toolkit.filters.Filter` or - `bool` to indicate when up-arrow partial string matching is enabled. It - is advised to not enable this at the same time as - `complete_while_typing`, because when there is an autocompletion found, - the up arrows usually browse through the completions, rather than - through the history. - :param read_only: :class:`~prompt_toolkit.filters.Filter`. When True, - changes will not be allowed. - :param multiline: :class:`~prompt_toolkit.filters.Filter` or `bool`. When - not set, pressing `Enter` will call the `accept_handler`. Otherwise, - pressing `Esc-Enter` is required. - """ - - def __init__( - self, - completer: Optional[Completer] = None, - auto_suggest: Optional[AutoSuggest] = None, - history: Optional[History] = None, - validator: Optional[Validator] = None, - tempfile_suffix: Union[str, Callable[[], str]] = "", - tempfile: Union[str, Callable[[], str]] = "", - name: str = "", - complete_while_typing: FilterOrBool = False, - validate_while_typing: FilterOrBool = False, - enable_history_search: FilterOrBool = False, - document: Optional[Document] = None, - accept_handler: Optional[BufferAcceptHandler] = None, - read_only: FilterOrBool = False, - multiline: FilterOrBool = True, - on_text_changed: Optional[BufferEventHandler] = None, - on_text_insert: Optional[BufferEventHandler] = None, - on_cursor_position_changed: Optional[BufferEventHandler] = None, - on_completions_changed: Optional[BufferEventHandler] = None, - on_suggestion_set: Optional[BufferEventHandler] = None, - ): - - # Accept both filters and booleans as input. - enable_history_search = to_filter(enable_history_search) - complete_while_typing = to_filter(complete_while_typing) - validate_while_typing = to_filter(validate_while_typing) - read_only = to_filter(read_only) - multiline = to_filter(multiline) - - self.completer = completer or DummyCompleter() - self.auto_suggest = auto_suggest - self.validator = validator - self.tempfile_suffix = tempfile_suffix - self.tempfile = tempfile - self.name = name - self.accept_handler = accept_handler - - # Filters. (Usually, used by the key bindings to drive the buffer.) - self.complete_while_typing = complete_while_typing - self.validate_while_typing = validate_while_typing - self.enable_history_search = enable_history_search - self.read_only = read_only - self.multiline = multiline - - # Text width. (For wrapping, used by the Vi 'gq' operator.) - self.text_width = 0 - - #: The command buffer history. - # Note that we shouldn't use a lazy 'or' here. bool(history) could be - # False when empty. - self.history = InMemoryHistory() if history is None else history - - self.__cursor_position = 0 - - # Events - self.on_text_changed: Event["Buffer"] = Event(self, on_text_changed) - self.on_text_insert: Event["Buffer"] = Event(self, on_text_insert) - self.on_cursor_position_changed: Event["Buffer"] = Event( - self, on_cursor_position_changed - ) - self.on_completions_changed: Event["Buffer"] = Event( - self, on_completions_changed - ) - self.on_suggestion_set: Event["Buffer"] = Event(self, on_suggestion_set) - - # Document cache. (Avoid creating new Document instances.) - self._document_cache: FastDictCache[ - Tuple[str, int, Optional[SelectionState]], Document - ] = FastDictCache(Document, size=10) - - # Create completer / auto suggestion / validation coroutines. - self._async_suggester = self._create_auto_suggest_coroutine() - self._async_completer = self._create_completer_coroutine() - self._async_validator = self._create_auto_validate_coroutine() - - # Asyncio task for populating the history. - self._load_history_task: Optional[asyncio.Future[None]] = None - - # Reset other attributes. - self.reset(document=document) - - def __repr__(self) -> str: - if len(self.text) < 15: - text = self.text - else: - text = self.text[:12] + "..." - - return "<Buffer(name=%r, text=%r) at %r>" % (self.name, text, id(self)) - - def reset( - self, document: Optional[Document] = None, append_to_history: bool = False - ) -> None: - """ - :param append_to_history: Append current input to history first. - """ - if append_to_history: - self.append_to_history() - - document = document or Document() - - self.__cursor_position = document.cursor_position - - # `ValidationError` instance. (Will be set when the input is wrong.) - self.validation_error: Optional[ValidationError] = None - self.validation_state: Optional[ValidationState] = ValidationState.UNKNOWN - - # State of the selection. - self.selection_state: Optional[SelectionState] = None - - # Multiple cursor mode. (When we press 'I' or 'A' in visual-block mode, - # we can insert text on multiple lines at once. This is implemented by - # using multiple cursors.) - self.multiple_cursor_positions: List[int] = [] - - # When doing consecutive up/down movements, prefer to stay at this column. - self.preferred_column: Optional[int] = None - - # State of complete browser - # For interactive completion through Ctrl-N/Ctrl-P. - self.complete_state: Optional[CompletionState] = None - - # State of Emacs yank-nth-arg completion. - self.yank_nth_arg_state: Optional[YankNthArgState] = None # for yank-nth-arg. - - # Remember the document that we had *right before* the last paste - # operation. This is used for rotating through the kill ring. - self.document_before_paste: Optional[Document] = None - - # Current suggestion. - self.suggestion: Optional[Suggestion] = None - - # The history search text. (Used for filtering the history when we - # browse through it.) - self.history_search_text: Optional[str] = None - - # Undo/redo stacks (stack of `(text, cursor_position)`). - self._undo_stack: List[Tuple[str, int]] = [] - self._redo_stack: List[Tuple[str, int]] = [] - - # Cancel history loader. If history loading was still ongoing. - # Cancel the `_load_history_task`, so that next repaint of the - # `BufferControl` we will repopulate it. - if self._load_history_task is not None: - self._load_history_task.cancel() - self._load_history_task = None - - #: The working lines. Similar to history, except that this can be - #: modified. The user can press arrow_up and edit previous entries. - #: Ctrl-C should reset this, and copy the whole history back in here. - #: Enter should process the current command and append to the real - #: history. - self._working_lines: Deque[str] = deque([document.text]) - self.__working_index = 0 - - def load_history_if_not_yet_loaded(self) -> None: - """ - Create task for populating the buffer history (if not yet done). - - Note:: - - This needs to be called from within the event loop of the - application, because history loading is async, and we need to be - sure the right event loop is active. Therefor, we call this method - in the `BufferControl.create_content`. - - There are situations where prompt_toolkit applications are created - in one thread, but will later run in a different thread (Ptpython - is one example. The REPL runs in a separate thread, in order to - prevent interfering with a potential different event loop in the - main thread. The REPL UI however is still created in the main - thread.) We could decide to not support creating prompt_toolkit - objects in one thread and running the application in a different - thread, but history loading is the only place where it matters, and - this solves it. - """ - if self._load_history_task is None: - - async def load_history() -> None: - async for item in self.history.load(): - self._working_lines.appendleft(item) - self.__working_index += 1 - - self._load_history_task = get_app().create_background_task(load_history()) - - def load_history_done(f: "asyncio.Future[None]") -> None: - """ - Handle `load_history` result when either done, cancelled, or - when an exception was raised. - """ - try: - f.result() - except asyncio.CancelledError: - # Ignore cancellation. But handle it, so that we don't get - # this traceback. - pass - except GeneratorExit: - # Probably not needed, but we had situations where - # `GeneratorExit` was raised in `load_history` during - # cancellation. - pass - except BaseException: - # Log error if something goes wrong. (We don't have a - # caller to which we can propagate this exception.) - logger.exception("Loading history failed") - - self._load_history_task.add_done_callback(load_history_done) - - # <getters/setters> - - def _set_text(self, value: str) -> bool: - """set text at current working_index. Return whether it changed.""" - working_index = self.working_index - working_lines = self._working_lines - - original_value = working_lines[working_index] - working_lines[working_index] = value - - # Return True when this text has been changed. - if len(value) != len(original_value): - # For Python 2, it seems that when two strings have a different - # length and one is a prefix of the other, Python still scans - # character by character to see whether the strings are different. - # (Some benchmarking showed significant differences for big - # documents. >100,000 of lines.) - return True - elif value != original_value: - return True - return False - - def _set_cursor_position(self, value: int) -> bool: - """Set cursor position. Return whether it changed.""" - original_position = self.__cursor_position - self.__cursor_position = max(0, value) - - return self.__cursor_position != original_position - - @property - def text(self) -> str: - return self._working_lines[self.working_index] - - @text.setter - def text(self, value: str) -> None: - """ - Setting text. (When doing this, make sure that the cursor_position is - valid for this text. text/cursor_position should be consistent at any time, - otherwise set a Document instead.) - """ - # Ensure cursor position remains within the size of the text. - if self.cursor_position > len(value): - self.cursor_position = len(value) - - # Don't allow editing of read-only buffers. - if self.read_only(): - raise EditReadOnlyBuffer() - - changed = self._set_text(value) - - if changed: - self._text_changed() - - # Reset history search text. - # (Note that this doesn't need to happen when working_index - # changes, which is when we traverse the history. That's why we - # don't do this in `self._text_changed`.) - self.history_search_text = None - - @property - def cursor_position(self) -> int: - return self.__cursor_position - - @cursor_position.setter - def cursor_position(self, value: int) -> None: - """ - Setting cursor position. - """ - assert isinstance(value, int) - - # Ensure cursor position is within the size of the text. - if value > len(self.text): - value = len(self.text) - if value < 0: - value = 0 - - changed = self._set_cursor_position(value) - - if changed: - self._cursor_position_changed() - - @property - def working_index(self) -> int: - return self.__working_index - - @working_index.setter - def working_index(self, value: int) -> None: - if self.__working_index != value: - self.__working_index = value - # Make sure to reset the cursor position, otherwise we end up in - # situations where the cursor position is out of the bounds of the - # text. - self.cursor_position = 0 - self._text_changed() - - def _text_changed(self) -> None: - # Remove any validation errors and complete state. - self.validation_error = None - self.validation_state = ValidationState.UNKNOWN - self.complete_state = None - self.yank_nth_arg_state = None - self.document_before_paste = None - self.selection_state = None - self.suggestion = None - self.preferred_column = None - - # fire 'on_text_changed' event. - self.on_text_changed.fire() - - # Input validation. - # (This happens on all change events, unlike auto completion, also when - # deleting text.) - if self.validator and self.validate_while_typing(): - get_app().create_background_task(self._async_validator()) - - def _cursor_position_changed(self) -> None: - # Remove any complete state. - # (Input validation should only be undone when the cursor position - # changes.) - self.complete_state = None - self.yank_nth_arg_state = None - self.document_before_paste = None - - # Unset preferred_column. (Will be set after the cursor movement, if - # required.) - self.preferred_column = None - - # Note that the cursor position can change if we have a selection the - # new position of the cursor determines the end of the selection. - - # fire 'on_cursor_position_changed' event. - self.on_cursor_position_changed.fire() - - @property - def document(self) -> Document: - """ - Return :class:`~prompt_toolkit.document.Document` instance from the - current text, cursor position and selection state. - """ - return self._document_cache[ - self.text, self.cursor_position, self.selection_state - ] - - @document.setter - def document(self, value: Document) -> None: - """ - Set :class:`~prompt_toolkit.document.Document` instance. - - This will set both the text and cursor position at the same time, but - atomically. (Change events will be triggered only after both have been set.) - """ - self.set_document(value) - - def set_document(self, value: Document, bypass_readonly: bool = False) -> None: - """ - Set :class:`~prompt_toolkit.document.Document` instance. Like the - ``document`` property, but accept an ``bypass_readonly`` argument. - - :param bypass_readonly: When True, don't raise an - :class:`.EditReadOnlyBuffer` exception, even - when the buffer is read-only. - - .. warning:: - - When this buffer is read-only and `bypass_readonly` was not passed, - the `EditReadOnlyBuffer` exception will be caught by the - `KeyProcessor` and is silently suppressed. This is important to - keep in mind when writing key bindings, because it won't do what - you expect, and there won't be a stack trace. Use try/finally - around this function if you need some cleanup code. - """ - # Don't allow editing of read-only buffers. - if not bypass_readonly and self.read_only(): - raise EditReadOnlyBuffer() - - # Set text and cursor position first. - text_changed = self._set_text(value.text) - cursor_position_changed = self._set_cursor_position(value.cursor_position) - - # Now handle change events. (We do this when text/cursor position is - # both set and consistent.) - if text_changed: - self._text_changed() - self.history_search_text = None - - if cursor_position_changed: - self._cursor_position_changed() - - @property - def is_returnable(self) -> bool: - """ - True when there is something handling accept. - """ - return bool(self.accept_handler) - - # End of <getters/setters> - - def save_to_undo_stack(self, clear_redo_stack: bool = True) -> None: - """ - Safe current state (input text and cursor position), so that we can - restore it by calling undo. - """ - # Safe if the text is different from the text at the top of the stack - # is different. If the text is the same, just update the cursor position. - if self._undo_stack and self._undo_stack[-1][0] == self.text: - self._undo_stack[-1] = (self._undo_stack[-1][0], self.cursor_position) - else: - self._undo_stack.append((self.text, self.cursor_position)) - - # Saving anything to the undo stack, clears the redo stack. - if clear_redo_stack: - self._redo_stack = [] - - def transform_lines( - self, - line_index_iterator: Iterable[int], - transform_callback: Callable[[str], str], - ) -> str: - """ - Transforms the text on a range of lines. - When the iterator yield an index not in the range of lines that the - document contains, it skips them silently. - - To uppercase some lines:: - - new_text = transform_lines(range(5,10), lambda text: text.upper()) - - :param line_index_iterator: Iterator of line numbers (int) - :param transform_callback: callable that takes the original text of a - line, and return the new text for this line. - - :returns: The new text. - """ - # Split lines - lines = self.text.split("\n") - - # Apply transformation - for index in line_index_iterator: - try: - lines[index] = transform_callback(lines[index]) - except IndexError: - pass - - return "\n".join(lines) - - def transform_current_line(self, transform_callback: Callable[[str], str]) -> None: - """ - Apply the given transformation function to the current line. - - :param transform_callback: callable that takes a string and return a new string. - """ - document = self.document - a = document.cursor_position + document.get_start_of_line_position() - b = document.cursor_position + document.get_end_of_line_position() - self.text = ( - document.text[:a] - + transform_callback(document.text[a:b]) - + document.text[b:] - ) - - def transform_region( - self, from_: int, to: int, transform_callback: Callable[[str], str] - ) -> None: - """ - Transform a part of the input string. - - :param from_: (int) start position. - :param to: (int) end position. - :param transform_callback: Callable which accepts a string and returns - the transformed string. - """ - assert from_ < to - - self.text = "".join( - [ - self.text[:from_] - + transform_callback(self.text[from_:to]) - + self.text[to:] - ] - ) - - def cursor_left(self, count: int = 1) -> None: - self.cursor_position += self.document.get_cursor_left_position(count=count) - - def cursor_right(self, count: int = 1) -> None: - self.cursor_position += self.document.get_cursor_right_position(count=count) - - def cursor_up(self, count: int = 1) -> None: - """(for multiline edit). Move cursor to the previous line.""" - original_column = self.preferred_column or self.document.cursor_position_col - self.cursor_position += self.document.get_cursor_up_position( - count=count, preferred_column=original_column - ) - - # Remember the original column for the next up/down movement. - self.preferred_column = original_column - - def cursor_down(self, count: int = 1) -> None: - """(for multiline edit). Move cursor to the next line.""" - original_column = self.preferred_column or self.document.cursor_position_col - self.cursor_position += self.document.get_cursor_down_position( - count=count, preferred_column=original_column - ) - - # Remember the original column for the next up/down movement. - self.preferred_column = original_column - - def auto_up( - self, count: int = 1, go_to_start_of_line_if_history_changes: bool = False - ) -> None: - """ - If we're not on the first line (of a multiline input) go a line up, - otherwise go back in history. (If nothing is selected.) - """ - if self.complete_state: - self.complete_previous(count=count) - elif self.document.cursor_position_row > 0: - self.cursor_up(count=count) - elif not self.selection_state: - self.history_backward(count=count) - - # Go to the start of the line? - if go_to_start_of_line_if_history_changes: - self.cursor_position += self.document.get_start_of_line_position() - - def auto_down( - self, count: int = 1, go_to_start_of_line_if_history_changes: bool = False - ) -> None: - """ - If we're not on the last line (of a multiline input) go a line down, - otherwise go forward in history. (If nothing is selected.) - """ - if self.complete_state: - self.complete_next(count=count) - elif self.document.cursor_position_row < self.document.line_count - 1: - self.cursor_down(count=count) - elif not self.selection_state: - self.history_forward(count=count) - - # Go to the start of the line? - if go_to_start_of_line_if_history_changes: - self.cursor_position += self.document.get_start_of_line_position() - - def delete_before_cursor(self, count: int = 1) -> str: - """ - Delete specified number of characters before cursor and return the - deleted text. - """ - assert count >= 0 - deleted = "" - - if self.cursor_position > 0: - deleted = self.text[self.cursor_position - count : self.cursor_position] - - new_text = ( - self.text[: self.cursor_position - count] - + self.text[self.cursor_position :] - ) - new_cursor_position = self.cursor_position - len(deleted) - - # Set new Document atomically. - self.document = Document(new_text, new_cursor_position) - - return deleted - - def delete(self, count: int = 1) -> str: - """ - Delete specified number of characters and Return the deleted text. - """ - if self.cursor_position < len(self.text): - deleted = self.document.text_after_cursor[:count] - self.text = ( - self.text[: self.cursor_position] - + self.text[self.cursor_position + len(deleted) :] - ) - return deleted - else: - return "" - - def join_next_line(self, separator: str = " ") -> None: - """ - Join the next line to the current one by deleting the line ending after - the current line. - """ - if not self.document.on_last_line: - self.cursor_position += self.document.get_end_of_line_position() - self.delete() - - # Remove spaces. - self.text = ( - self.document.text_before_cursor - + separator - + self.document.text_after_cursor.lstrip(" ") - ) - - def join_selected_lines(self, separator: str = " ") -> None: - """ - Join the selected lines. - """ - assert self.selection_state - - # Get lines. - from_, to = sorted( - [self.cursor_position, self.selection_state.original_cursor_position] - ) - - before = self.text[:from_] - lines = self.text[from_:to].splitlines() - after = self.text[to:] - - # Replace leading spaces with just one space. - lines = [l.lstrip(" ") + separator for l in lines] - - # Set new document. - self.document = Document( - text=before + "".join(lines) + after, - cursor_position=len(before + "".join(lines[:-1])) - 1, - ) - - def swap_characters_before_cursor(self) -> None: - """ - Swap the last two characters before the cursor. - """ - pos = self.cursor_position - - if pos >= 2: - a = self.text[pos - 2] - b = self.text[pos - 1] - - self.text = self.text[: pos - 2] + b + a + self.text[pos:] - - def go_to_history(self, index: int) -> None: - """ - Go to this item in the history. - """ - if index < len(self._working_lines): - self.working_index = index - self.cursor_position = len(self.text) - - def complete_next(self, count: int = 1, disable_wrap_around: bool = False) -> None: - """ - Browse to the next completions. - (Does nothing if there are no completion.) - """ - index: Optional[int] - - if self.complete_state: - completions_count = len(self.complete_state.completions) - - if self.complete_state.complete_index is None: - index = 0 - elif self.complete_state.complete_index == completions_count - 1: - index = None - - if disable_wrap_around: - return - else: - index = min( - completions_count - 1, self.complete_state.complete_index + count - ) - self.go_to_completion(index) - - def complete_previous( - self, count: int = 1, disable_wrap_around: bool = False - ) -> None: - """ - Browse to the previous completions. - (Does nothing if there are no completion.) - """ - index: Optional[int] - - if self.complete_state: - if self.complete_state.complete_index == 0: - index = None - - if disable_wrap_around: - return - elif self.complete_state.complete_index is None: - index = len(self.complete_state.completions) - 1 - else: - index = max(0, self.complete_state.complete_index - count) - - self.go_to_completion(index) - - def cancel_completion(self) -> None: - """ - Cancel completion, go back to the original text. - """ - if self.complete_state: - self.go_to_completion(None) - self.complete_state = None - - def _set_completions(self, completions: List[Completion]) -> CompletionState: - """ - Start completions. (Generate list of completions and initialize.) - - By default, no completion will be selected. - """ - self.complete_state = CompletionState( - original_document=self.document, completions=completions - ) - - # Trigger event. This should eventually invalidate the layout. - self.on_completions_changed.fire() - - return self.complete_state - - def start_history_lines_completion(self) -> None: - """ - Start a completion based on all the other lines in the document and the - history. - """ - found_completions: Set[str] = set() - completions = [] - - # For every line of the whole history, find matches with the current line. - current_line = self.document.current_line_before_cursor.lstrip() - - for i, string in enumerate(self._working_lines): - for j, l in enumerate(string.split("\n")): - l = l.strip() - if l and l.startswith(current_line): - # When a new line has been found. - if l not in found_completions: - found_completions.add(l) - - # Create completion. - if i == self.working_index: - display_meta = "Current, line %s" % (j + 1) - else: - display_meta = "History %s, line %s" % (i + 1, j + 1) - - completions.append( - Completion( +""" +Data structures for the Buffer. +It holds the text, cursor position, history, etc... +""" +import asyncio +import logging +import os +import re +import shlex +import shutil +import subprocess +import tempfile +from collections import deque +from enum import Enum +from functools import wraps +from typing import ( + Any, + Awaitable, + Callable, + Deque, + Iterable, + List, + Optional, + Set, + Tuple, + TypeVar, + Union, + cast, +) + +from .application.current import get_app +from .application.run_in_terminal import run_in_terminal +from .auto_suggest import AutoSuggest, Suggestion +from .cache import FastDictCache +from .clipboard import ClipboardData +from .completion import ( + CompleteEvent, + Completer, + Completion, + DummyCompleter, + get_common_complete_suffix, +) +from .document import Document +from .filters import FilterOrBool, to_filter +from .history import History, InMemoryHistory +from .search import SearchDirection, SearchState +from .selection import PasteMode, SelectionState, SelectionType +from .utils import Event, to_str +from .validation import ValidationError, Validator + +__all__ = [ + "EditReadOnlyBuffer", + "Buffer", + "CompletionState", + "indent", + "unindent", + "reshape_text", +] + +logger = logging.getLogger(__name__) + + +class EditReadOnlyBuffer(Exception): + "Attempt editing of read-only :class:`.Buffer`." + + +class ValidationState(Enum): + "The validation state of a buffer. This is set after the validation." + VALID = "VALID" + INVALID = "INVALID" + UNKNOWN = "UNKNOWN" + + +class CompletionState: + """ + Immutable class that contains a completion state. + """ + + def __init__( + self, + original_document: "Document", + completions: Optional[List["Completion"]] = None, + complete_index: Optional[int] = None, + ) -> None: + + #: Document as it was when the completion started. + self.original_document = original_document + + #: List of all the current Completion instances which are possible at + #: this point. + self.completions = completions or [] + + #: Position in the `completions` array. + #: This can be `None` to indicate "no completion", the original text. + self.complete_index = complete_index # Position in the `_completions` array. + + def __repr__(self) -> str: + return "%s(%r, <%r> completions, index=%r)" % ( + self.__class__.__name__, + self.original_document, + len(self.completions), + self.complete_index, + ) + + def go_to_index(self, index: Optional[int]) -> None: + """ + Create a new :class:`.CompletionState` object with the new index. + + When `index` is `None` deselect the completion. + """ + if self.completions: + assert index is None or 0 <= index < len(self.completions) + self.complete_index = index + + def new_text_and_position(self) -> Tuple[str, int]: + """ + Return (new_text, new_cursor_position) for this completion. + """ + if self.complete_index is None: + return self.original_document.text, self.original_document.cursor_position + else: + original_text_before_cursor = self.original_document.text_before_cursor + original_text_after_cursor = self.original_document.text_after_cursor + + c = self.completions[self.complete_index] + if c.start_position == 0: + before = original_text_before_cursor + else: + before = original_text_before_cursor[: c.start_position] + + new_text = before + c.text + original_text_after_cursor + new_cursor_position = len(before) + len(c.text) + return new_text, new_cursor_position + + @property + def current_completion(self) -> Optional["Completion"]: + """ + Return the current completion, or return `None` when no completion is + selected. + """ + if self.complete_index is not None: + return self.completions[self.complete_index] + return None + + +_QUOTED_WORDS_RE = re.compile(r"""(\s+|".*?"|'.*?')""") + + +class YankNthArgState: + """ + For yank-last-arg/yank-nth-arg: Keep track of where we are in the history. + """ + + def __init__( + self, history_position: int = 0, n: int = -1, previous_inserted_word: str = "" + ) -> None: + + self.history_position = history_position + self.previous_inserted_word = previous_inserted_word + self.n = n + + def __repr__(self) -> str: + return "%s(history_position=%r, n=%r, previous_inserted_word=%r)" % ( + self.__class__.__name__, + self.history_position, + self.n, + self.previous_inserted_word, + ) + + +BufferEventHandler = Callable[["Buffer"], None] +BufferAcceptHandler = Callable[["Buffer"], bool] + + +class Buffer: + """ + The core data structure that holds the text and cursor position of the + current input line and implements all text manipulations on top of it. It + also implements the history, undo stack and the completion state. + + :param completer: :class:`~prompt_toolkit.completion.Completer` instance. + :param history: :class:`~prompt_toolkit.history.History` instance. + :param tempfile_suffix: The tempfile suffix (extension) to be used for the + "open in editor" function. For a Python REPL, this would be ".py", so + that the editor knows the syntax highlighting to use. This can also be + a callable that returns a string. + :param tempfile: For more advanced tempfile situations where you need + control over the subdirectories and filename. For a Git Commit Message, + this would be ".git/COMMIT_EDITMSG", so that the editor knows the syntax + highlighting to use. This can also be a callable that returns a string. + :param name: Name for this buffer. E.g. DEFAULT_BUFFER. This is mostly + useful for key bindings where we sometimes prefer to refer to a buffer + by their name instead of by reference. + :param accept_handler: Called when the buffer input is accepted. (Usually + when the user presses `enter`.) The accept handler receives this + `Buffer` as input and should return True when the buffer text should be + kept instead of calling reset. + + In case of a `PromptSession` for instance, we want to keep the text, + because we will exit the application, and only reset it during the next + run. + + Events: + + :param on_text_changed: When the buffer text changes. (Callable or None.) + :param on_text_insert: When new text is inserted. (Callable or None.) + :param on_cursor_position_changed: When the cursor moves. (Callable or None.) + :param on_completions_changed: When the completions were changed. (Callable or None.) + :param on_suggestion_set: When an auto-suggestion text has been set. (Callable or None.) + + Filters: + + :param complete_while_typing: :class:`~prompt_toolkit.filters.Filter` + or `bool`. Decide whether or not to do asynchronous autocompleting while + typing. + :param validate_while_typing: :class:`~prompt_toolkit.filters.Filter` + or `bool`. Decide whether or not to do asynchronous validation while + typing. + :param enable_history_search: :class:`~prompt_toolkit.filters.Filter` or + `bool` to indicate when up-arrow partial string matching is enabled. It + is advised to not enable this at the same time as + `complete_while_typing`, because when there is an autocompletion found, + the up arrows usually browse through the completions, rather than + through the history. + :param read_only: :class:`~prompt_toolkit.filters.Filter`. When True, + changes will not be allowed. + :param multiline: :class:`~prompt_toolkit.filters.Filter` or `bool`. When + not set, pressing `Enter` will call the `accept_handler`. Otherwise, + pressing `Esc-Enter` is required. + """ + + def __init__( + self, + completer: Optional[Completer] = None, + auto_suggest: Optional[AutoSuggest] = None, + history: Optional[History] = None, + validator: Optional[Validator] = None, + tempfile_suffix: Union[str, Callable[[], str]] = "", + tempfile: Union[str, Callable[[], str]] = "", + name: str = "", + complete_while_typing: FilterOrBool = False, + validate_while_typing: FilterOrBool = False, + enable_history_search: FilterOrBool = False, + document: Optional[Document] = None, + accept_handler: Optional[BufferAcceptHandler] = None, + read_only: FilterOrBool = False, + multiline: FilterOrBool = True, + on_text_changed: Optional[BufferEventHandler] = None, + on_text_insert: Optional[BufferEventHandler] = None, + on_cursor_position_changed: Optional[BufferEventHandler] = None, + on_completions_changed: Optional[BufferEventHandler] = None, + on_suggestion_set: Optional[BufferEventHandler] = None, + ): + + # Accept both filters and booleans as input. + enable_history_search = to_filter(enable_history_search) + complete_while_typing = to_filter(complete_while_typing) + validate_while_typing = to_filter(validate_while_typing) + read_only = to_filter(read_only) + multiline = to_filter(multiline) + + self.completer = completer or DummyCompleter() + self.auto_suggest = auto_suggest + self.validator = validator + self.tempfile_suffix = tempfile_suffix + self.tempfile = tempfile + self.name = name + self.accept_handler = accept_handler + + # Filters. (Usually, used by the key bindings to drive the buffer.) + self.complete_while_typing = complete_while_typing + self.validate_while_typing = validate_while_typing + self.enable_history_search = enable_history_search + self.read_only = read_only + self.multiline = multiline + + # Text width. (For wrapping, used by the Vi 'gq' operator.) + self.text_width = 0 + + #: The command buffer history. + # Note that we shouldn't use a lazy 'or' here. bool(history) could be + # False when empty. + self.history = InMemoryHistory() if history is None else history + + self.__cursor_position = 0 + + # Events + self.on_text_changed: Event["Buffer"] = Event(self, on_text_changed) + self.on_text_insert: Event["Buffer"] = Event(self, on_text_insert) + self.on_cursor_position_changed: Event["Buffer"] = Event( + self, on_cursor_position_changed + ) + self.on_completions_changed: Event["Buffer"] = Event( + self, on_completions_changed + ) + self.on_suggestion_set: Event["Buffer"] = Event(self, on_suggestion_set) + + # Document cache. (Avoid creating new Document instances.) + self._document_cache: FastDictCache[ + Tuple[str, int, Optional[SelectionState]], Document + ] = FastDictCache(Document, size=10) + + # Create completer / auto suggestion / validation coroutines. + self._async_suggester = self._create_auto_suggest_coroutine() + self._async_completer = self._create_completer_coroutine() + self._async_validator = self._create_auto_validate_coroutine() + + # Asyncio task for populating the history. + self._load_history_task: Optional[asyncio.Future[None]] = None + + # Reset other attributes. + self.reset(document=document) + + def __repr__(self) -> str: + if len(self.text) < 15: + text = self.text + else: + text = self.text[:12] + "..." + + return "<Buffer(name=%r, text=%r) at %r>" % (self.name, text, id(self)) + + def reset( + self, document: Optional[Document] = None, append_to_history: bool = False + ) -> None: + """ + :param append_to_history: Append current input to history first. + """ + if append_to_history: + self.append_to_history() + + document = document or Document() + + self.__cursor_position = document.cursor_position + + # `ValidationError` instance. (Will be set when the input is wrong.) + self.validation_error: Optional[ValidationError] = None + self.validation_state: Optional[ValidationState] = ValidationState.UNKNOWN + + # State of the selection. + self.selection_state: Optional[SelectionState] = None + + # Multiple cursor mode. (When we press 'I' or 'A' in visual-block mode, + # we can insert text on multiple lines at once. This is implemented by + # using multiple cursors.) + self.multiple_cursor_positions: List[int] = [] + + # When doing consecutive up/down movements, prefer to stay at this column. + self.preferred_column: Optional[int] = None + + # State of complete browser + # For interactive completion through Ctrl-N/Ctrl-P. + self.complete_state: Optional[CompletionState] = None + + # State of Emacs yank-nth-arg completion. + self.yank_nth_arg_state: Optional[YankNthArgState] = None # for yank-nth-arg. + + # Remember the document that we had *right before* the last paste + # operation. This is used for rotating through the kill ring. + self.document_before_paste: Optional[Document] = None + + # Current suggestion. + self.suggestion: Optional[Suggestion] = None + + # The history search text. (Used for filtering the history when we + # browse through it.) + self.history_search_text: Optional[str] = None + + # Undo/redo stacks (stack of `(text, cursor_position)`). + self._undo_stack: List[Tuple[str, int]] = [] + self._redo_stack: List[Tuple[str, int]] = [] + + # Cancel history loader. If history loading was still ongoing. + # Cancel the `_load_history_task`, so that next repaint of the + # `BufferControl` we will repopulate it. + if self._load_history_task is not None: + self._load_history_task.cancel() + self._load_history_task = None + + #: The working lines. Similar to history, except that this can be + #: modified. The user can press arrow_up and edit previous entries. + #: Ctrl-C should reset this, and copy the whole history back in here. + #: Enter should process the current command and append to the real + #: history. + self._working_lines: Deque[str] = deque([document.text]) + self.__working_index = 0 + + def load_history_if_not_yet_loaded(self) -> None: + """ + Create task for populating the buffer history (if not yet done). + + Note:: + + This needs to be called from within the event loop of the + application, because history loading is async, and we need to be + sure the right event loop is active. Therefor, we call this method + in the `BufferControl.create_content`. + + There are situations where prompt_toolkit applications are created + in one thread, but will later run in a different thread (Ptpython + is one example. The REPL runs in a separate thread, in order to + prevent interfering with a potential different event loop in the + main thread. The REPL UI however is still created in the main + thread.) We could decide to not support creating prompt_toolkit + objects in one thread and running the application in a different + thread, but history loading is the only place where it matters, and + this solves it. + """ + if self._load_history_task is None: + + async def load_history() -> None: + async for item in self.history.load(): + self._working_lines.appendleft(item) + self.__working_index += 1 + + self._load_history_task = get_app().create_background_task(load_history()) + + def load_history_done(f: "asyncio.Future[None]") -> None: + """ + Handle `load_history` result when either done, cancelled, or + when an exception was raised. + """ + try: + f.result() + except asyncio.CancelledError: + # Ignore cancellation. But handle it, so that we don't get + # this traceback. + pass + except GeneratorExit: + # Probably not needed, but we had situations where + # `GeneratorExit` was raised in `load_history` during + # cancellation. + pass + except BaseException: + # Log error if something goes wrong. (We don't have a + # caller to which we can propagate this exception.) + logger.exception("Loading history failed") + + self._load_history_task.add_done_callback(load_history_done) + + # <getters/setters> + + def _set_text(self, value: str) -> bool: + """set text at current working_index. Return whether it changed.""" + working_index = self.working_index + working_lines = self._working_lines + + original_value = working_lines[working_index] + working_lines[working_index] = value + + # Return True when this text has been changed. + if len(value) != len(original_value): + # For Python 2, it seems that when two strings have a different + # length and one is a prefix of the other, Python still scans + # character by character to see whether the strings are different. + # (Some benchmarking showed significant differences for big + # documents. >100,000 of lines.) + return True + elif value != original_value: + return True + return False + + def _set_cursor_position(self, value: int) -> bool: + """Set cursor position. Return whether it changed.""" + original_position = self.__cursor_position + self.__cursor_position = max(0, value) + + return self.__cursor_position != original_position + + @property + def text(self) -> str: + return self._working_lines[self.working_index] + + @text.setter + def text(self, value: str) -> None: + """ + Setting text. (When doing this, make sure that the cursor_position is + valid for this text. text/cursor_position should be consistent at any time, + otherwise set a Document instead.) + """ + # Ensure cursor position remains within the size of the text. + if self.cursor_position > len(value): + self.cursor_position = len(value) + + # Don't allow editing of read-only buffers. + if self.read_only(): + raise EditReadOnlyBuffer() + + changed = self._set_text(value) + + if changed: + self._text_changed() + + # Reset history search text. + # (Note that this doesn't need to happen when working_index + # changes, which is when we traverse the history. That's why we + # don't do this in `self._text_changed`.) + self.history_search_text = None + + @property + def cursor_position(self) -> int: + return self.__cursor_position + + @cursor_position.setter + def cursor_position(self, value: int) -> None: + """ + Setting cursor position. + """ + assert isinstance(value, int) + + # Ensure cursor position is within the size of the text. + if value > len(self.text): + value = len(self.text) + if value < 0: + value = 0 + + changed = self._set_cursor_position(value) + + if changed: + self._cursor_position_changed() + + @property + def working_index(self) -> int: + return self.__working_index + + @working_index.setter + def working_index(self, value: int) -> None: + if self.__working_index != value: + self.__working_index = value + # Make sure to reset the cursor position, otherwise we end up in + # situations where the cursor position is out of the bounds of the + # text. + self.cursor_position = 0 + self._text_changed() + + def _text_changed(self) -> None: + # Remove any validation errors and complete state. + self.validation_error = None + self.validation_state = ValidationState.UNKNOWN + self.complete_state = None + self.yank_nth_arg_state = None + self.document_before_paste = None + self.selection_state = None + self.suggestion = None + self.preferred_column = None + + # fire 'on_text_changed' event. + self.on_text_changed.fire() + + # Input validation. + # (This happens on all change events, unlike auto completion, also when + # deleting text.) + if self.validator and self.validate_while_typing(): + get_app().create_background_task(self._async_validator()) + + def _cursor_position_changed(self) -> None: + # Remove any complete state. + # (Input validation should only be undone when the cursor position + # changes.) + self.complete_state = None + self.yank_nth_arg_state = None + self.document_before_paste = None + + # Unset preferred_column. (Will be set after the cursor movement, if + # required.) + self.preferred_column = None + + # Note that the cursor position can change if we have a selection the + # new position of the cursor determines the end of the selection. + + # fire 'on_cursor_position_changed' event. + self.on_cursor_position_changed.fire() + + @property + def document(self) -> Document: + """ + Return :class:`~prompt_toolkit.document.Document` instance from the + current text, cursor position and selection state. + """ + return self._document_cache[ + self.text, self.cursor_position, self.selection_state + ] + + @document.setter + def document(self, value: Document) -> None: + """ + Set :class:`~prompt_toolkit.document.Document` instance. + + This will set both the text and cursor position at the same time, but + atomically. (Change events will be triggered only after both have been set.) + """ + self.set_document(value) + + def set_document(self, value: Document, bypass_readonly: bool = False) -> None: + """ + Set :class:`~prompt_toolkit.document.Document` instance. Like the + ``document`` property, but accept an ``bypass_readonly`` argument. + + :param bypass_readonly: When True, don't raise an + :class:`.EditReadOnlyBuffer` exception, even + when the buffer is read-only. + + .. warning:: + + When this buffer is read-only and `bypass_readonly` was not passed, + the `EditReadOnlyBuffer` exception will be caught by the + `KeyProcessor` and is silently suppressed. This is important to + keep in mind when writing key bindings, because it won't do what + you expect, and there won't be a stack trace. Use try/finally + around this function if you need some cleanup code. + """ + # Don't allow editing of read-only buffers. + if not bypass_readonly and self.read_only(): + raise EditReadOnlyBuffer() + + # Set text and cursor position first. + text_changed = self._set_text(value.text) + cursor_position_changed = self._set_cursor_position(value.cursor_position) + + # Now handle change events. (We do this when text/cursor position is + # both set and consistent.) + if text_changed: + self._text_changed() + self.history_search_text = None + + if cursor_position_changed: + self._cursor_position_changed() + + @property + def is_returnable(self) -> bool: + """ + True when there is something handling accept. + """ + return bool(self.accept_handler) + + # End of <getters/setters> + + def save_to_undo_stack(self, clear_redo_stack: bool = True) -> None: + """ + Safe current state (input text and cursor position), so that we can + restore it by calling undo. + """ + # Safe if the text is different from the text at the top of the stack + # is different. If the text is the same, just update the cursor position. + if self._undo_stack and self._undo_stack[-1][0] == self.text: + self._undo_stack[-1] = (self._undo_stack[-1][0], self.cursor_position) + else: + self._undo_stack.append((self.text, self.cursor_position)) + + # Saving anything to the undo stack, clears the redo stack. + if clear_redo_stack: + self._redo_stack = [] + + def transform_lines( + self, + line_index_iterator: Iterable[int], + transform_callback: Callable[[str], str], + ) -> str: + """ + Transforms the text on a range of lines. + When the iterator yield an index not in the range of lines that the + document contains, it skips them silently. + + To uppercase some lines:: + + new_text = transform_lines(range(5,10), lambda text: text.upper()) + + :param line_index_iterator: Iterator of line numbers (int) + :param transform_callback: callable that takes the original text of a + line, and return the new text for this line. + + :returns: The new text. + """ + # Split lines + lines = self.text.split("\n") + + # Apply transformation + for index in line_index_iterator: + try: + lines[index] = transform_callback(lines[index]) + except IndexError: + pass + + return "\n".join(lines) + + def transform_current_line(self, transform_callback: Callable[[str], str]) -> None: + """ + Apply the given transformation function to the current line. + + :param transform_callback: callable that takes a string and return a new string. + """ + document = self.document + a = document.cursor_position + document.get_start_of_line_position() + b = document.cursor_position + document.get_end_of_line_position() + self.text = ( + document.text[:a] + + transform_callback(document.text[a:b]) + + document.text[b:] + ) + + def transform_region( + self, from_: int, to: int, transform_callback: Callable[[str], str] + ) -> None: + """ + Transform a part of the input string. + + :param from_: (int) start position. + :param to: (int) end position. + :param transform_callback: Callable which accepts a string and returns + the transformed string. + """ + assert from_ < to + + self.text = "".join( + [ + self.text[:from_] + + transform_callback(self.text[from_:to]) + + self.text[to:] + ] + ) + + def cursor_left(self, count: int = 1) -> None: + self.cursor_position += self.document.get_cursor_left_position(count=count) + + def cursor_right(self, count: int = 1) -> None: + self.cursor_position += self.document.get_cursor_right_position(count=count) + + def cursor_up(self, count: int = 1) -> None: + """(for multiline edit). Move cursor to the previous line.""" + original_column = self.preferred_column or self.document.cursor_position_col + self.cursor_position += self.document.get_cursor_up_position( + count=count, preferred_column=original_column + ) + + # Remember the original column for the next up/down movement. + self.preferred_column = original_column + + def cursor_down(self, count: int = 1) -> None: + """(for multiline edit). Move cursor to the next line.""" + original_column = self.preferred_column or self.document.cursor_position_col + self.cursor_position += self.document.get_cursor_down_position( + count=count, preferred_column=original_column + ) + + # Remember the original column for the next up/down movement. + self.preferred_column = original_column + + def auto_up( + self, count: int = 1, go_to_start_of_line_if_history_changes: bool = False + ) -> None: + """ + If we're not on the first line (of a multiline input) go a line up, + otherwise go back in history. (If nothing is selected.) + """ + if self.complete_state: + self.complete_previous(count=count) + elif self.document.cursor_position_row > 0: + self.cursor_up(count=count) + elif not self.selection_state: + self.history_backward(count=count) + + # Go to the start of the line? + if go_to_start_of_line_if_history_changes: + self.cursor_position += self.document.get_start_of_line_position() + + def auto_down( + self, count: int = 1, go_to_start_of_line_if_history_changes: bool = False + ) -> None: + """ + If we're not on the last line (of a multiline input) go a line down, + otherwise go forward in history. (If nothing is selected.) + """ + if self.complete_state: + self.complete_next(count=count) + elif self.document.cursor_position_row < self.document.line_count - 1: + self.cursor_down(count=count) + elif not self.selection_state: + self.history_forward(count=count) + + # Go to the start of the line? + if go_to_start_of_line_if_history_changes: + self.cursor_position += self.document.get_start_of_line_position() + + def delete_before_cursor(self, count: int = 1) -> str: + """ + Delete specified number of characters before cursor and return the + deleted text. + """ + assert count >= 0 + deleted = "" + + if self.cursor_position > 0: + deleted = self.text[self.cursor_position - count : self.cursor_position] + + new_text = ( + self.text[: self.cursor_position - count] + + self.text[self.cursor_position :] + ) + new_cursor_position = self.cursor_position - len(deleted) + + # Set new Document atomically. + self.document = Document(new_text, new_cursor_position) + + return deleted + + def delete(self, count: int = 1) -> str: + """ + Delete specified number of characters and Return the deleted text. + """ + if self.cursor_position < len(self.text): + deleted = self.document.text_after_cursor[:count] + self.text = ( + self.text[: self.cursor_position] + + self.text[self.cursor_position + len(deleted) :] + ) + return deleted + else: + return "" + + def join_next_line(self, separator: str = " ") -> None: + """ + Join the next line to the current one by deleting the line ending after + the current line. + """ + if not self.document.on_last_line: + self.cursor_position += self.document.get_end_of_line_position() + self.delete() + + # Remove spaces. + self.text = ( + self.document.text_before_cursor + + separator + + self.document.text_after_cursor.lstrip(" ") + ) + + def join_selected_lines(self, separator: str = " ") -> None: + """ + Join the selected lines. + """ + assert self.selection_state + + # Get lines. + from_, to = sorted( + [self.cursor_position, self.selection_state.original_cursor_position] + ) + + before = self.text[:from_] + lines = self.text[from_:to].splitlines() + after = self.text[to:] + + # Replace leading spaces with just one space. + lines = [l.lstrip(" ") + separator for l in lines] + + # Set new document. + self.document = Document( + text=before + "".join(lines) + after, + cursor_position=len(before + "".join(lines[:-1])) - 1, + ) + + def swap_characters_before_cursor(self) -> None: + """ + Swap the last two characters before the cursor. + """ + pos = self.cursor_position + + if pos >= 2: + a = self.text[pos - 2] + b = self.text[pos - 1] + + self.text = self.text[: pos - 2] + b + a + self.text[pos:] + + def go_to_history(self, index: int) -> None: + """ + Go to this item in the history. + """ + if index < len(self._working_lines): + self.working_index = index + self.cursor_position = len(self.text) + + def complete_next(self, count: int = 1, disable_wrap_around: bool = False) -> None: + """ + Browse to the next completions. + (Does nothing if there are no completion.) + """ + index: Optional[int] + + if self.complete_state: + completions_count = len(self.complete_state.completions) + + if self.complete_state.complete_index is None: + index = 0 + elif self.complete_state.complete_index == completions_count - 1: + index = None + + if disable_wrap_around: + return + else: + index = min( + completions_count - 1, self.complete_state.complete_index + count + ) + self.go_to_completion(index) + + def complete_previous( + self, count: int = 1, disable_wrap_around: bool = False + ) -> None: + """ + Browse to the previous completions. + (Does nothing if there are no completion.) + """ + index: Optional[int] + + if self.complete_state: + if self.complete_state.complete_index == 0: + index = None + + if disable_wrap_around: + return + elif self.complete_state.complete_index is None: + index = len(self.complete_state.completions) - 1 + else: + index = max(0, self.complete_state.complete_index - count) + + self.go_to_completion(index) + + def cancel_completion(self) -> None: + """ + Cancel completion, go back to the original text. + """ + if self.complete_state: + self.go_to_completion(None) + self.complete_state = None + + def _set_completions(self, completions: List[Completion]) -> CompletionState: + """ + Start completions. (Generate list of completions and initialize.) + + By default, no completion will be selected. + """ + self.complete_state = CompletionState( + original_document=self.document, completions=completions + ) + + # Trigger event. This should eventually invalidate the layout. + self.on_completions_changed.fire() + + return self.complete_state + + def start_history_lines_completion(self) -> None: + """ + Start a completion based on all the other lines in the document and the + history. + """ + found_completions: Set[str] = set() + completions = [] + + # For every line of the whole history, find matches with the current line. + current_line = self.document.current_line_before_cursor.lstrip() + + for i, string in enumerate(self._working_lines): + for j, l in enumerate(string.split("\n")): + l = l.strip() + if l and l.startswith(current_line): + # When a new line has been found. + if l not in found_completions: + found_completions.add(l) + + # Create completion. + if i == self.working_index: + display_meta = "Current, line %s" % (j + 1) + else: + display_meta = "History %s, line %s" % (i + 1, j + 1) + + completions.append( + Completion( text=l, - start_position=-len(current_line), - display_meta=display_meta, - ) - ) - - self._set_completions(completions=completions[::-1]) - self.go_to_completion(0) - - def go_to_completion(self, index: Optional[int]) -> None: - """ - Select a completion from the list of current completions. - """ - assert self.complete_state - - # Set new completion - state = self.complete_state - state.go_to_index(index) - - # Set text/cursor position - new_text, new_cursor_position = state.new_text_and_position() - self.document = Document(new_text, new_cursor_position) - - # (changing text/cursor position will unset complete_state.) - self.complete_state = state - - def apply_completion(self, completion: Completion) -> None: - """ - Insert a given completion. - """ - # If there was already a completion active, cancel that one. - if self.complete_state: - self.go_to_completion(None) - self.complete_state = None - - # Insert text from the given completion. - self.delete_before_cursor(-completion.start_position) - self.insert_text(completion.text) - - def _set_history_search(self) -> None: - """ - Set `history_search_text`. - (The text before the cursor will be used for filtering the history.) - """ - if self.enable_history_search(): - if self.history_search_text is None: - self.history_search_text = self.document.text_before_cursor - else: - self.history_search_text = None - - def _history_matches(self, i: int) -> bool: - """ - True when the current entry matches the history search. - (when we don't have history search, it's also True.) - """ - return self.history_search_text is None or self._working_lines[i].startswith( - self.history_search_text - ) - - def history_forward(self, count: int = 1) -> None: - """ - Move forwards through the history. - - :param count: Amount of items to move forward. - """ - self._set_history_search() - - # Go forward in history. - found_something = False - - for i in range(self.working_index + 1, len(self._working_lines)): - if self._history_matches(i): - self.working_index = i - count -= 1 - found_something = True - if count == 0: - break - - # If we found an entry, move cursor to the end of the first line. - if found_something: - self.cursor_position = 0 - self.cursor_position += self.document.get_end_of_line_position() - - def history_backward(self, count: int = 1) -> None: - """ - Move backwards through history. - """ - self._set_history_search() - - # Go back in history. - found_something = False - - for i in range(self.working_index - 1, -1, -1): - if self._history_matches(i): - self.working_index = i - count -= 1 - found_something = True - if count == 0: - break - - # If we move to another entry, move cursor to the end of the line. - if found_something: - self.cursor_position = len(self.text) - - def yank_nth_arg( - self, n: Optional[int] = None, _yank_last_arg: bool = False - ) -> None: - """ - Pick nth word from previous history entry (depending on current - `yank_nth_arg_state`) and insert it at current position. Rotate through - history if called repeatedly. If no `n` has been given, take the first - argument. (The second word.) - - :param n: (None or int), The index of the word from the previous line - to take. - """ - assert n is None or isinstance(n, int) - history_strings = self.history.get_strings() - - if not len(history_strings): - return - - # Make sure we have a `YankNthArgState`. - if self.yank_nth_arg_state is None: - state = YankNthArgState(n=-1 if _yank_last_arg else 1) - else: - state = self.yank_nth_arg_state - - if n is not None: - state.n = n - - # Get new history position. - new_pos = state.history_position - 1 - if -new_pos > len(history_strings): - new_pos = -1 - - # Take argument from line. - line = history_strings[new_pos] - - words = [w.strip() for w in _QUOTED_WORDS_RE.split(line)] - words = [w for w in words if w] - try: - word = words[state.n] - except IndexError: - word = "" - - # Insert new argument. - if state.previous_inserted_word: - self.delete_before_cursor(len(state.previous_inserted_word)) - self.insert_text(word) - - # Save state again for next completion. (Note that the 'insert' - # operation from above clears `self.yank_nth_arg_state`.) - state.previous_inserted_word = word - state.history_position = new_pos - self.yank_nth_arg_state = state - - def yank_last_arg(self, n: Optional[int] = None) -> None: - """ - Like `yank_nth_arg`, but if no argument has been given, yank the last - word by default. - """ - self.yank_nth_arg(n=n, _yank_last_arg=True) - - def start_selection( - self, selection_type: SelectionType = SelectionType.CHARACTERS - ) -> None: - """ - Take the current cursor position as the start of this selection. - """ - self.selection_state = SelectionState(self.cursor_position, selection_type) - - def copy_selection(self, _cut: bool = False) -> ClipboardData: - """ - Copy selected text and return :class:`.ClipboardData` instance. - - Notice that this doesn't store the copied data on the clipboard yet. - You can store it like this: - - .. code:: python - - data = buffer.copy_selection() - get_app().clipboard.set_data(data) - """ - new_document, clipboard_data = self.document.cut_selection() - if _cut: - self.document = new_document - - self.selection_state = None - return clipboard_data - - def cut_selection(self) -> ClipboardData: - """ - Delete selected text and return :class:`.ClipboardData` instance. - """ - return self.copy_selection(_cut=True) - - def paste_clipboard_data( - self, - data: ClipboardData, - paste_mode: PasteMode = PasteMode.EMACS, - count: int = 1, - ) -> None: - """ - Insert the data from the clipboard. - """ - assert isinstance(data, ClipboardData) - assert paste_mode in (PasteMode.VI_BEFORE, PasteMode.VI_AFTER, PasteMode.EMACS) - - original_document = self.document - self.document = self.document.paste_clipboard_data( - data, paste_mode=paste_mode, count=count - ) - - # Remember original document. This assignment should come at the end, - # because assigning to 'document' will erase it. - self.document_before_paste = original_document - - def newline(self, copy_margin: bool = True) -> None: - """ - Insert a line ending at the current position. - """ - if copy_margin: - self.insert_text("\n" + self.document.leading_whitespace_in_current_line) - else: - self.insert_text("\n") - - def insert_line_above(self, copy_margin: bool = True) -> None: - """ - Insert a new line above the current one. - """ - if copy_margin: - insert = self.document.leading_whitespace_in_current_line + "\n" - else: - insert = "\n" - - self.cursor_position += self.document.get_start_of_line_position() - self.insert_text(insert) - self.cursor_position -= 1 - - def insert_line_below(self, copy_margin: bool = True) -> None: - """ - Insert a new line below the current one. - """ - if copy_margin: - insert = "\n" + self.document.leading_whitespace_in_current_line - else: - insert = "\n" - - self.cursor_position += self.document.get_end_of_line_position() - self.insert_text(insert) - - def insert_text( - self, - data: str, - overwrite: bool = False, - move_cursor: bool = True, - fire_event: bool = True, - ) -> None: - """ - Insert characters at cursor position. - - :param fire_event: Fire `on_text_insert` event. This is mainly used to - trigger autocompletion while typing. - """ - # Original text & cursor position. - otext = self.text - ocpos = self.cursor_position - - # In insert/text mode. - if overwrite: - # Don't overwrite the newline itself. Just before the line ending, - # it should act like insert mode. - overwritten_text = otext[ocpos : ocpos + len(data)] - if "\n" in overwritten_text: - overwritten_text = overwritten_text[: overwritten_text.find("\n")] - - text = otext[:ocpos] + data + otext[ocpos + len(overwritten_text) :] - else: - text = otext[:ocpos] + data + otext[ocpos:] - - if move_cursor: - cpos = self.cursor_position + len(data) - else: - cpos = self.cursor_position - - # Set new document. - # (Set text and cursor position at the same time. Otherwise, setting - # the text will fire a change event before the cursor position has been - # set. It works better to have this atomic.) - self.document = Document(text, cpos) - - # Fire 'on_text_insert' event. - if fire_event: # XXX: rename to `start_complete`. - self.on_text_insert.fire() - - # Only complete when "complete_while_typing" is enabled. - if self.completer and self.complete_while_typing(): - get_app().create_background_task(self._async_completer()) - - # Call auto_suggest. - if self.auto_suggest: - get_app().create_background_task(self._async_suggester()) - - def undo(self) -> None: - # Pop from the undo-stack until we find a text that if different from - # the current text. (The current logic of `save_to_undo_stack` will - # cause that the top of the undo stack is usually the same as the - # current text, so in that case we have to pop twice.) - while self._undo_stack: - text, pos = self._undo_stack.pop() - - if text != self.text: - # Push current text to redo stack. - self._redo_stack.append((self.text, self.cursor_position)) - - # Set new text/cursor_position. - self.document = Document(text, cursor_position=pos) - break - - def redo(self) -> None: - if self._redo_stack: - # Copy current state on undo stack. - self.save_to_undo_stack(clear_redo_stack=False) - - # Pop state from redo stack. - text, pos = self._redo_stack.pop() - self.document = Document(text, cursor_position=pos) - - def validate(self, set_cursor: bool = False) -> bool: - """ - Returns `True` if valid. - - :param set_cursor: Set the cursor position, if an error was found. - """ - # Don't call the validator again, if it was already called for the - # current input. - if self.validation_state != ValidationState.UNKNOWN: - return self.validation_state == ValidationState.VALID - - # Call validator. - if self.validator: - try: - self.validator.validate(self.document) - except ValidationError as e: - # Set cursor position (don't allow invalid values.) - if set_cursor: - self.cursor_position = min( - max(0, e.cursor_position), len(self.text) - ) - - self.validation_state = ValidationState.INVALID - self.validation_error = e - return False - - # Handle validation result. - self.validation_state = ValidationState.VALID - self.validation_error = None - return True - - async def _validate_async(self) -> None: - """ - Asynchronous version of `validate()`. - This one doesn't set the cursor position. - - We have both variants, because a synchronous version is required. - Handling the ENTER key needs to be completely synchronous, otherwise - stuff like type-ahead is going to give very weird results. (People - could type input while the ENTER key is still processed.) - - An asynchronous version is required if we have `validate_while_typing` - enabled. - """ - while True: - # Don't call the validator again, if it was already called for the - # current input. - if self.validation_state != ValidationState.UNKNOWN: - return - - # Call validator. - error = None - document = self.document - - if self.validator: - try: - await self.validator.validate_async(self.document) - except ValidationError as e: - error = e - - # If the document changed during the validation, try again. - if self.document != document: - continue - - # Handle validation result. - if error: - self.validation_state = ValidationState.INVALID - else: - self.validation_state = ValidationState.VALID - - self.validation_error = error - get_app().invalidate() # Trigger redraw (display error). - - def append_to_history(self) -> None: - """ - Append the current input to the history. - """ - # Save at the tail of the history. (But don't if the last entry the - # history is already the same.) - if self.text: - history_strings = self.history.get_strings() - if not len(history_strings) or history_strings[-1] != self.text: - self.history.append_string(self.text) - - def _search( - self, - search_state: SearchState, - include_current_position: bool = False, - count: int = 1, - ) -> Optional[Tuple[int, int]]: - """ - Execute search. Return (working_index, cursor_position) tuple when this - search is applied. Returns `None` when this text cannot be found. - """ - assert count > 0 - - text = search_state.text - direction = search_state.direction - ignore_case = search_state.ignore_case() - - def search_once( - working_index: int, document: Document - ) -> Optional[Tuple[int, Document]]: - """ - Do search one time. - Return (working_index, document) or `None` - """ - if direction == SearchDirection.FORWARD: - # Try find at the current input. - new_index = document.find( - text, - include_current_position=include_current_position, - ignore_case=ignore_case, - ) - - if new_index is not None: - return ( - working_index, - Document(document.text, document.cursor_position + new_index), - ) - else: - # No match, go forward in the history. (Include len+1 to wrap around.) - # (Here we should always include all cursor positions, because - # it's a different line.) - for i in range(working_index + 1, len(self._working_lines) + 1): - i %= len(self._working_lines) - - document = Document(self._working_lines[i], 0) - new_index = document.find( - text, include_current_position=True, ignore_case=ignore_case - ) - if new_index is not None: - return (i, Document(document.text, new_index)) - else: - # Try find at the current input. - new_index = document.find_backwards(text, ignore_case=ignore_case) - - if new_index is not None: - return ( - working_index, - Document(document.text, document.cursor_position + new_index), - ) - else: - # No match, go back in the history. (Include -1 to wrap around.) - for i in range(working_index - 1, -2, -1): - i %= len(self._working_lines) - - document = Document( - self._working_lines[i], len(self._working_lines[i]) - ) - new_index = document.find_backwards( - text, ignore_case=ignore_case - ) - if new_index is not None: - return ( - i, - Document(document.text, len(document.text) + new_index), - ) - return None - - # Do 'count' search iterations. - working_index = self.working_index - document = self.document - for _ in range(count): - result = search_once(working_index, document) - if result is None: - return None # Nothing found. - else: - working_index, document = result - - return (working_index, document.cursor_position) - - def document_for_search(self, search_state: SearchState) -> Document: - """ - Return a :class:`~prompt_toolkit.document.Document` instance that has - the text/cursor position for this search, if we would apply it. This - will be used in the - :class:`~prompt_toolkit.layout.BufferControl` to display feedback while - searching. - """ - search_result = self._search(search_state, include_current_position=True) - - if search_result is None: - return self.document - else: - working_index, cursor_position = search_result - - # Keep selection, when `working_index` was not changed. - if working_index == self.working_index: - selection = self.selection_state - else: - selection = None - - return Document( - self._working_lines[working_index], cursor_position, selection=selection - ) - - def get_search_position( - self, - search_state: SearchState, - include_current_position: bool = True, - count: int = 1, - ) -> int: - """ - Get the cursor position for this search. - (This operation won't change the `working_index`. It's won't go through - the history. Vi text objects can't span multiple items.) - """ - search_result = self._search( - search_state, include_current_position=include_current_position, count=count - ) - - if search_result is None: - return self.cursor_position - else: - working_index, cursor_position = search_result - return cursor_position - - def apply_search( - self, - search_state: SearchState, - include_current_position: bool = True, - count: int = 1, - ) -> None: - """ - Apply search. If something is found, set `working_index` and - `cursor_position`. - """ - search_result = self._search( - search_state, include_current_position=include_current_position, count=count - ) - - if search_result is not None: - working_index, cursor_position = search_result - self.working_index = working_index - self.cursor_position = cursor_position - - def exit_selection(self) -> None: - self.selection_state = None - - def _editor_simple_tempfile(self) -> Tuple[str, Callable[[], None]]: - """ - Simple (file) tempfile implementation. - Return (tempfile, cleanup_func). - """ - suffix = to_str(self.tempfile_suffix) - descriptor, filename = tempfile.mkstemp(suffix) - - os.write(descriptor, self.text.encode("utf-8")) - os.close(descriptor) - - def cleanup() -> None: - os.unlink(filename) - - return filename, cleanup - - def _editor_complex_tempfile(self) -> Tuple[str, Callable[[], None]]: - # Complex (directory) tempfile implementation. - headtail = to_str(self.tempfile) - if not headtail: - # Revert to simple case. - return self._editor_simple_tempfile() - headtail = str(headtail) - - # Try to make according to tempfile logic. - head, tail = os.path.split(headtail) - if os.path.isabs(head): - head = head[1:] - - dirpath = tempfile.mkdtemp() - if head: - dirpath = os.path.join(dirpath, head) - # Assume there is no issue creating dirs in this temp dir. - os.makedirs(dirpath) - - # Open the filename and write current text. - filename = os.path.join(dirpath, tail) - with open(filename, "w", encoding="utf-8") as fh: - fh.write(self.text) - - def cleanup() -> None: - shutil.rmtree(dirpath) - - return filename, cleanup - - def open_in_editor(self, validate_and_handle: bool = False) -> "asyncio.Task[None]": - """ - Open code in editor. - - This returns a future, and runs in a thread executor. - """ - if self.read_only(): - raise EditReadOnlyBuffer() - - # Write current text to temporary file - if self.tempfile: - filename, cleanup_func = self._editor_complex_tempfile() - else: - filename, cleanup_func = self._editor_simple_tempfile() - - async def run() -> None: - try: - # Open in editor - # (We need to use `run_in_terminal`, because not all editors go to - # the alternate screen buffer, and some could influence the cursor - # position.) - succes = await run_in_terminal( - lambda: self._open_file_in_editor(filename), in_executor=True - ) - - # Read content again. - if succes: - with open(filename, "rb") as f: - text = f.read().decode("utf-8") - - # Drop trailing newline. (Editors are supposed to add it at the - # end, but we don't need it.) - if text.endswith("\n"): - text = text[:-1] - - self.document = Document(text=text, cursor_position=len(text)) - - # Accept the input. - if validate_and_handle: - self.validate_and_handle() - - finally: - # Clean up temp dir/file. - cleanup_func() - - return get_app().create_background_task(run()) - - def _open_file_in_editor(self, filename: str) -> bool: - """ - Call editor executable. - - Return True when we received a zero return code. - """ - # If the 'VISUAL' or 'EDITOR' environment variable has been set, use that. - # Otherwise, fall back to the first available editor that we can find. - visual = os.environ.get("VISUAL") - editor = os.environ.get("EDITOR") - - editors = [ - visual, - editor, - # Order of preference. - "/usr/bin/editor", - "/usr/bin/nano", - "/usr/bin/pico", - "/usr/bin/vi", - "/usr/bin/emacs", - ] - - for e in editors: - if e: - try: - # Use 'shlex.split()', because $VISUAL can contain spaces - # and quotes. - returncode = subprocess.call(shlex.split(e) + [filename]) - return returncode == 0 - - except OSError: - # Executable does not exist, try the next one. - pass - - return False - - def start_completion( - self, - select_first: bool = False, - select_last: bool = False, - insert_common_part: bool = False, - complete_event: Optional[CompleteEvent] = None, - ) -> None: - """ - Start asynchronous autocompletion of this buffer. - (This will do nothing if a previous completion was still in progress.) - """ - # Only one of these options can be selected. - assert select_first + select_last + insert_common_part <= 1 - - get_app().create_background_task( - self._async_completer( - select_first=select_first, - select_last=select_last, - insert_common_part=insert_common_part, - complete_event=complete_event - or CompleteEvent(completion_requested=True), - ) - ) - - def _create_completer_coroutine(self) -> Callable[..., Awaitable[None]]: - """ - Create function for asynchronous autocompletion. - - (This consumes the asynchronous completer generator, which possibly - runs the completion algorithm in another thread.) - """ - - def completion_does_nothing(document: Document, completion: Completion) -> bool: - """ - Return `True` if applying this completion doesn't have any effect. - (When it doesn't insert any new text. - """ - text_before_cursor = document.text_before_cursor - replaced_text = text_before_cursor[ - len(text_before_cursor) + completion.start_position : - ] - return replaced_text == completion.text - - @_only_one_at_a_time - async def async_completer( - select_first: bool = False, - select_last: bool = False, - insert_common_part: bool = False, - complete_event: Optional[CompleteEvent] = None, - ) -> None: - - document = self.document - complete_event = complete_event or CompleteEvent(text_inserted=True) - - # Don't complete when we already have completions. - if self.complete_state or not self.completer: - return - - # Create an empty CompletionState. - complete_state = CompletionState(original_document=self.document) - self.complete_state = complete_state - - def proceed() -> bool: - """Keep retrieving completions. Input text has not yet changed - while generating completions.""" - return self.complete_state == complete_state - - async for completion in self.completer.get_completions_async( - document, complete_event - ): - complete_state.completions.append(completion) - self.on_completions_changed.fire() - - # If the input text changes, abort. - if not proceed(): - break - - completions = complete_state.completions - - # When there is only one completion, which has nothing to add, ignore it. - if len(completions) == 1 and completion_does_nothing( - document, completions[0] - ): - del completions[:] - - # Set completions if the text was not yet changed. - if proceed(): - # When no completions were found, or when the user selected - # already a completion by using the arrow keys, don't do anything. - if ( - not self.complete_state - or self.complete_state.complete_index is not None - ): - return - - # When there are no completions, reset completion state anyway. - if not completions: - self.complete_state = None - # Render the ui if the completion menu was shown - # it is needed especially if there is one completion and it was deleted. - self.on_completions_changed.fire() - return - - # Select first/last or insert common part, depending on the key - # binding. (For this we have to wait until all completions are - # loaded.) - - if select_first: - self.go_to_completion(0) - - elif select_last: - self.go_to_completion(len(completions) - 1) - - elif insert_common_part: - common_part = get_common_complete_suffix(document, completions) - if common_part: - # Insert the common part, update completions. - self.insert_text(common_part) - if len(completions) > 1: - # (Don't call `async_completer` again, but - # recalculate completions. See: - # https://github.com/ipython/ipython/issues/9658) - completions[:] = [ - c.new_completion_from_position(len(common_part)) - for c in completions - ] - - self._set_completions(completions=completions) - else: - self.complete_state = None - else: - # When we were asked to insert the "common" - # prefix, but there was no common suffix but - # still exactly one match, then select the - # first. (It could be that we have a completion - # which does * expansion, like '*.py', with - # exactly one match.) - if len(completions) == 1: - self.go_to_completion(0) - - else: - # If the last operation was an insert, (not a delete), restart - # the completion coroutine. - - if self.document.text_before_cursor == document.text_before_cursor: - return # Nothing changed. - - if self.document.text_before_cursor.startswith( - document.text_before_cursor - ): - raise _Retry - - return async_completer - - def _create_auto_suggest_coroutine(self) -> Callable[[], Awaitable[None]]: - """ - Create function for asynchronous auto suggestion. - (This can be in another thread.) - """ - - @_only_one_at_a_time - async def async_suggestor() -> None: - document = self.document - - # Don't suggest when we already have a suggestion. - if self.suggestion or not self.auto_suggest: - return - - suggestion = await self.auto_suggest.get_suggestion_async(self, document) - - # Set suggestion only if the text was not yet changed. - if self.document == document: - # Set suggestion and redraw interface. - self.suggestion = suggestion - self.on_suggestion_set.fire() - else: - # Otherwise, restart thread. - raise _Retry - - return async_suggestor - - def _create_auto_validate_coroutine(self) -> Callable[[], Awaitable[None]]: - """ - Create a function for asynchronous validation while typing. - (This can be in another thread.) - """ - - @_only_one_at_a_time - async def async_validator() -> None: - await self._validate_async() - - return async_validator - - def validate_and_handle(self) -> None: - """ - Validate buffer and handle the accept action. - """ - valid = self.validate(set_cursor=True) - - # When the validation succeeded, accept the input. - if valid: - if self.accept_handler: - keep_text = self.accept_handler(self) - else: - keep_text = False - - self.append_to_history() - - if not keep_text: - self.reset() - - -_T = TypeVar("_T", bound=Callable[..., Awaitable[None]]) - - -def _only_one_at_a_time(coroutine: _T) -> _T: - """ - Decorator that only starts the coroutine only if the previous call has - finished. (Used to make sure that we have only one autocompleter, auto - suggestor and validator running at a time.) - - When the coroutine raises `_Retry`, it is restarted. - """ - running = False - - @wraps(coroutine) - async def new_coroutine(*a: Any, **kw: Any) -> Any: - nonlocal running - - # Don't start a new function, if the previous is still in progress. - if running: - return - - running = True - - try: - while True: - try: - await coroutine(*a, **kw) - except _Retry: - continue - else: - return None - finally: - running = False - - return cast(_T, new_coroutine) - - -class _Retry(Exception): - "Retry in `_only_one_at_a_time`." - - -def indent(buffer: Buffer, from_row: int, to_row: int, count: int = 1) -> None: - """ - Indent text of a :class:`.Buffer` object. - """ - current_row = buffer.document.cursor_position_row - line_range = range(from_row, to_row) - - # Apply transformation. - new_text = buffer.transform_lines(line_range, lambda l: " " * count + l) - buffer.document = Document( - new_text, Document(new_text).translate_row_col_to_index(current_row, 0) - ) - - # Go to the start of the line. - buffer.cursor_position += buffer.document.get_start_of_line_position( - after_whitespace=True - ) - - -def unindent(buffer: Buffer, from_row: int, to_row: int, count: int = 1) -> None: - """ - Unindent text of a :class:`.Buffer` object. - """ - current_row = buffer.document.cursor_position_row - line_range = range(from_row, to_row) - - def transform(text: str) -> str: - remove = " " * count - if text.startswith(remove): - return text[len(remove) :] - else: - return text.lstrip() - - # Apply transformation. - new_text = buffer.transform_lines(line_range, transform) - buffer.document = Document( - new_text, Document(new_text).translate_row_col_to_index(current_row, 0) - ) - - # Go to the start of the line. - buffer.cursor_position += buffer.document.get_start_of_line_position( - after_whitespace=True - ) - - -def reshape_text(buffer: Buffer, from_row: int, to_row: int) -> None: - """ - Reformat text, taking the width into account. - `to_row` is included. - (Vi 'gq' operator.) - """ - lines = buffer.text.splitlines(True) - lines_before = lines[:from_row] - lines_after = lines[to_row + 1 :] - lines_to_reformat = lines[from_row : to_row + 1] - - if lines_to_reformat: - # Take indentation from the first line. - match = re.search(r"^\s*", lines_to_reformat[0]) - length = match.end() if match else 0 # `match` can't be None, actually. - - indent = lines_to_reformat[0][:length].replace("\n", "") - - # Now, take all the 'words' from the lines to be reshaped. - words = "".join(lines_to_reformat).split() - - # And reshape. - width = (buffer.text_width or 80) - len(indent) - reshaped_text = [indent] - current_width = 0 - for w in words: - if current_width: - if len(w) + current_width + 1 > width: - reshaped_text.append("\n") - reshaped_text.append(indent) - current_width = 0 - else: - reshaped_text.append(" ") - current_width += 1 - - reshaped_text.append(w) - current_width += len(w) - - if reshaped_text[-1] != "\n": - reshaped_text.append("\n") - - # Apply result. - buffer.document = Document( - text="".join(lines_before + reshaped_text + lines_after), - cursor_position=len("".join(lines_before + reshaped_text)), - ) + start_position=-len(current_line), + display_meta=display_meta, + ) + ) + + self._set_completions(completions=completions[::-1]) + self.go_to_completion(0) + + def go_to_completion(self, index: Optional[int]) -> None: + """ + Select a completion from the list of current completions. + """ + assert self.complete_state + + # Set new completion + state = self.complete_state + state.go_to_index(index) + + # Set text/cursor position + new_text, new_cursor_position = state.new_text_and_position() + self.document = Document(new_text, new_cursor_position) + + # (changing text/cursor position will unset complete_state.) + self.complete_state = state + + def apply_completion(self, completion: Completion) -> None: + """ + Insert a given completion. + """ + # If there was already a completion active, cancel that one. + if self.complete_state: + self.go_to_completion(None) + self.complete_state = None + + # Insert text from the given completion. + self.delete_before_cursor(-completion.start_position) + self.insert_text(completion.text) + + def _set_history_search(self) -> None: + """ + Set `history_search_text`. + (The text before the cursor will be used for filtering the history.) + """ + if self.enable_history_search(): + if self.history_search_text is None: + self.history_search_text = self.document.text_before_cursor + else: + self.history_search_text = None + + def _history_matches(self, i: int) -> bool: + """ + True when the current entry matches the history search. + (when we don't have history search, it's also True.) + """ + return self.history_search_text is None or self._working_lines[i].startswith( + self.history_search_text + ) + + def history_forward(self, count: int = 1) -> None: + """ + Move forwards through the history. + + :param count: Amount of items to move forward. + """ + self._set_history_search() + + # Go forward in history. + found_something = False + + for i in range(self.working_index + 1, len(self._working_lines)): + if self._history_matches(i): + self.working_index = i + count -= 1 + found_something = True + if count == 0: + break + + # If we found an entry, move cursor to the end of the first line. + if found_something: + self.cursor_position = 0 + self.cursor_position += self.document.get_end_of_line_position() + + def history_backward(self, count: int = 1) -> None: + """ + Move backwards through history. + """ + self._set_history_search() + + # Go back in history. + found_something = False + + for i in range(self.working_index - 1, -1, -1): + if self._history_matches(i): + self.working_index = i + count -= 1 + found_something = True + if count == 0: + break + + # If we move to another entry, move cursor to the end of the line. + if found_something: + self.cursor_position = len(self.text) + + def yank_nth_arg( + self, n: Optional[int] = None, _yank_last_arg: bool = False + ) -> None: + """ + Pick nth word from previous history entry (depending on current + `yank_nth_arg_state`) and insert it at current position. Rotate through + history if called repeatedly. If no `n` has been given, take the first + argument. (The second word.) + + :param n: (None or int), The index of the word from the previous line + to take. + """ + assert n is None or isinstance(n, int) + history_strings = self.history.get_strings() + + if not len(history_strings): + return + + # Make sure we have a `YankNthArgState`. + if self.yank_nth_arg_state is None: + state = YankNthArgState(n=-1 if _yank_last_arg else 1) + else: + state = self.yank_nth_arg_state + + if n is not None: + state.n = n + + # Get new history position. + new_pos = state.history_position - 1 + if -new_pos > len(history_strings): + new_pos = -1 + + # Take argument from line. + line = history_strings[new_pos] + + words = [w.strip() for w in _QUOTED_WORDS_RE.split(line)] + words = [w for w in words if w] + try: + word = words[state.n] + except IndexError: + word = "" + + # Insert new argument. + if state.previous_inserted_word: + self.delete_before_cursor(len(state.previous_inserted_word)) + self.insert_text(word) + + # Save state again for next completion. (Note that the 'insert' + # operation from above clears `self.yank_nth_arg_state`.) + state.previous_inserted_word = word + state.history_position = new_pos + self.yank_nth_arg_state = state + + def yank_last_arg(self, n: Optional[int] = None) -> None: + """ + Like `yank_nth_arg`, but if no argument has been given, yank the last + word by default. + """ + self.yank_nth_arg(n=n, _yank_last_arg=True) + + def start_selection( + self, selection_type: SelectionType = SelectionType.CHARACTERS + ) -> None: + """ + Take the current cursor position as the start of this selection. + """ + self.selection_state = SelectionState(self.cursor_position, selection_type) + + def copy_selection(self, _cut: bool = False) -> ClipboardData: + """ + Copy selected text and return :class:`.ClipboardData` instance. + + Notice that this doesn't store the copied data on the clipboard yet. + You can store it like this: + + .. code:: python + + data = buffer.copy_selection() + get_app().clipboard.set_data(data) + """ + new_document, clipboard_data = self.document.cut_selection() + if _cut: + self.document = new_document + + self.selection_state = None + return clipboard_data + + def cut_selection(self) -> ClipboardData: + """ + Delete selected text and return :class:`.ClipboardData` instance. + """ + return self.copy_selection(_cut=True) + + def paste_clipboard_data( + self, + data: ClipboardData, + paste_mode: PasteMode = PasteMode.EMACS, + count: int = 1, + ) -> None: + """ + Insert the data from the clipboard. + """ + assert isinstance(data, ClipboardData) + assert paste_mode in (PasteMode.VI_BEFORE, PasteMode.VI_AFTER, PasteMode.EMACS) + + original_document = self.document + self.document = self.document.paste_clipboard_data( + data, paste_mode=paste_mode, count=count + ) + + # Remember original document. This assignment should come at the end, + # because assigning to 'document' will erase it. + self.document_before_paste = original_document + + def newline(self, copy_margin: bool = True) -> None: + """ + Insert a line ending at the current position. + """ + if copy_margin: + self.insert_text("\n" + self.document.leading_whitespace_in_current_line) + else: + self.insert_text("\n") + + def insert_line_above(self, copy_margin: bool = True) -> None: + """ + Insert a new line above the current one. + """ + if copy_margin: + insert = self.document.leading_whitespace_in_current_line + "\n" + else: + insert = "\n" + + self.cursor_position += self.document.get_start_of_line_position() + self.insert_text(insert) + self.cursor_position -= 1 + + def insert_line_below(self, copy_margin: bool = True) -> None: + """ + Insert a new line below the current one. + """ + if copy_margin: + insert = "\n" + self.document.leading_whitespace_in_current_line + else: + insert = "\n" + + self.cursor_position += self.document.get_end_of_line_position() + self.insert_text(insert) + + def insert_text( + self, + data: str, + overwrite: bool = False, + move_cursor: bool = True, + fire_event: bool = True, + ) -> None: + """ + Insert characters at cursor position. + + :param fire_event: Fire `on_text_insert` event. This is mainly used to + trigger autocompletion while typing. + """ + # Original text & cursor position. + otext = self.text + ocpos = self.cursor_position + + # In insert/text mode. + if overwrite: + # Don't overwrite the newline itself. Just before the line ending, + # it should act like insert mode. + overwritten_text = otext[ocpos : ocpos + len(data)] + if "\n" in overwritten_text: + overwritten_text = overwritten_text[: overwritten_text.find("\n")] + + text = otext[:ocpos] + data + otext[ocpos + len(overwritten_text) :] + else: + text = otext[:ocpos] + data + otext[ocpos:] + + if move_cursor: + cpos = self.cursor_position + len(data) + else: + cpos = self.cursor_position + + # Set new document. + # (Set text and cursor position at the same time. Otherwise, setting + # the text will fire a change event before the cursor position has been + # set. It works better to have this atomic.) + self.document = Document(text, cpos) + + # Fire 'on_text_insert' event. + if fire_event: # XXX: rename to `start_complete`. + self.on_text_insert.fire() + + # Only complete when "complete_while_typing" is enabled. + if self.completer and self.complete_while_typing(): + get_app().create_background_task(self._async_completer()) + + # Call auto_suggest. + if self.auto_suggest: + get_app().create_background_task(self._async_suggester()) + + def undo(self) -> None: + # Pop from the undo-stack until we find a text that if different from + # the current text. (The current logic of `save_to_undo_stack` will + # cause that the top of the undo stack is usually the same as the + # current text, so in that case we have to pop twice.) + while self._undo_stack: + text, pos = self._undo_stack.pop() + + if text != self.text: + # Push current text to redo stack. + self._redo_stack.append((self.text, self.cursor_position)) + + # Set new text/cursor_position. + self.document = Document(text, cursor_position=pos) + break + + def redo(self) -> None: + if self._redo_stack: + # Copy current state on undo stack. + self.save_to_undo_stack(clear_redo_stack=False) + + # Pop state from redo stack. + text, pos = self._redo_stack.pop() + self.document = Document(text, cursor_position=pos) + + def validate(self, set_cursor: bool = False) -> bool: + """ + Returns `True` if valid. + + :param set_cursor: Set the cursor position, if an error was found. + """ + # Don't call the validator again, if it was already called for the + # current input. + if self.validation_state != ValidationState.UNKNOWN: + return self.validation_state == ValidationState.VALID + + # Call validator. + if self.validator: + try: + self.validator.validate(self.document) + except ValidationError as e: + # Set cursor position (don't allow invalid values.) + if set_cursor: + self.cursor_position = min( + max(0, e.cursor_position), len(self.text) + ) + + self.validation_state = ValidationState.INVALID + self.validation_error = e + return False + + # Handle validation result. + self.validation_state = ValidationState.VALID + self.validation_error = None + return True + + async def _validate_async(self) -> None: + """ + Asynchronous version of `validate()`. + This one doesn't set the cursor position. + + We have both variants, because a synchronous version is required. + Handling the ENTER key needs to be completely synchronous, otherwise + stuff like type-ahead is going to give very weird results. (People + could type input while the ENTER key is still processed.) + + An asynchronous version is required if we have `validate_while_typing` + enabled. + """ + while True: + # Don't call the validator again, if it was already called for the + # current input. + if self.validation_state != ValidationState.UNKNOWN: + return + + # Call validator. + error = None + document = self.document + + if self.validator: + try: + await self.validator.validate_async(self.document) + except ValidationError as e: + error = e + + # If the document changed during the validation, try again. + if self.document != document: + continue + + # Handle validation result. + if error: + self.validation_state = ValidationState.INVALID + else: + self.validation_state = ValidationState.VALID + + self.validation_error = error + get_app().invalidate() # Trigger redraw (display error). + + def append_to_history(self) -> None: + """ + Append the current input to the history. + """ + # Save at the tail of the history. (But don't if the last entry the + # history is already the same.) + if self.text: + history_strings = self.history.get_strings() + if not len(history_strings) or history_strings[-1] != self.text: + self.history.append_string(self.text) + + def _search( + self, + search_state: SearchState, + include_current_position: bool = False, + count: int = 1, + ) -> Optional[Tuple[int, int]]: + """ + Execute search. Return (working_index, cursor_position) tuple when this + search is applied. Returns `None` when this text cannot be found. + """ + assert count > 0 + + text = search_state.text + direction = search_state.direction + ignore_case = search_state.ignore_case() + + def search_once( + working_index: int, document: Document + ) -> Optional[Tuple[int, Document]]: + """ + Do search one time. + Return (working_index, document) or `None` + """ + if direction == SearchDirection.FORWARD: + # Try find at the current input. + new_index = document.find( + text, + include_current_position=include_current_position, + ignore_case=ignore_case, + ) + + if new_index is not None: + return ( + working_index, + Document(document.text, document.cursor_position + new_index), + ) + else: + # No match, go forward in the history. (Include len+1 to wrap around.) + # (Here we should always include all cursor positions, because + # it's a different line.) + for i in range(working_index + 1, len(self._working_lines) + 1): + i %= len(self._working_lines) + + document = Document(self._working_lines[i], 0) + new_index = document.find( + text, include_current_position=True, ignore_case=ignore_case + ) + if new_index is not None: + return (i, Document(document.text, new_index)) + else: + # Try find at the current input. + new_index = document.find_backwards(text, ignore_case=ignore_case) + + if new_index is not None: + return ( + working_index, + Document(document.text, document.cursor_position + new_index), + ) + else: + # No match, go back in the history. (Include -1 to wrap around.) + for i in range(working_index - 1, -2, -1): + i %= len(self._working_lines) + + document = Document( + self._working_lines[i], len(self._working_lines[i]) + ) + new_index = document.find_backwards( + text, ignore_case=ignore_case + ) + if new_index is not None: + return ( + i, + Document(document.text, len(document.text) + new_index), + ) + return None + + # Do 'count' search iterations. + working_index = self.working_index + document = self.document + for _ in range(count): + result = search_once(working_index, document) + if result is None: + return None # Nothing found. + else: + working_index, document = result + + return (working_index, document.cursor_position) + + def document_for_search(self, search_state: SearchState) -> Document: + """ + Return a :class:`~prompt_toolkit.document.Document` instance that has + the text/cursor position for this search, if we would apply it. This + will be used in the + :class:`~prompt_toolkit.layout.BufferControl` to display feedback while + searching. + """ + search_result = self._search(search_state, include_current_position=True) + + if search_result is None: + return self.document + else: + working_index, cursor_position = search_result + + # Keep selection, when `working_index` was not changed. + if working_index == self.working_index: + selection = self.selection_state + else: + selection = None + + return Document( + self._working_lines[working_index], cursor_position, selection=selection + ) + + def get_search_position( + self, + search_state: SearchState, + include_current_position: bool = True, + count: int = 1, + ) -> int: + """ + Get the cursor position for this search. + (This operation won't change the `working_index`. It's won't go through + the history. Vi text objects can't span multiple items.) + """ + search_result = self._search( + search_state, include_current_position=include_current_position, count=count + ) + + if search_result is None: + return self.cursor_position + else: + working_index, cursor_position = search_result + return cursor_position + + def apply_search( + self, + search_state: SearchState, + include_current_position: bool = True, + count: int = 1, + ) -> None: + """ + Apply search. If something is found, set `working_index` and + `cursor_position`. + """ + search_result = self._search( + search_state, include_current_position=include_current_position, count=count + ) + + if search_result is not None: + working_index, cursor_position = search_result + self.working_index = working_index + self.cursor_position = cursor_position + + def exit_selection(self) -> None: + self.selection_state = None + + def _editor_simple_tempfile(self) -> Tuple[str, Callable[[], None]]: + """ + Simple (file) tempfile implementation. + Return (tempfile, cleanup_func). + """ + suffix = to_str(self.tempfile_suffix) + descriptor, filename = tempfile.mkstemp(suffix) + + os.write(descriptor, self.text.encode("utf-8")) + os.close(descriptor) + + def cleanup() -> None: + os.unlink(filename) + + return filename, cleanup + + def _editor_complex_tempfile(self) -> Tuple[str, Callable[[], None]]: + # Complex (directory) tempfile implementation. + headtail = to_str(self.tempfile) + if not headtail: + # Revert to simple case. + return self._editor_simple_tempfile() + headtail = str(headtail) + + # Try to make according to tempfile logic. + head, tail = os.path.split(headtail) + if os.path.isabs(head): + head = head[1:] + + dirpath = tempfile.mkdtemp() + if head: + dirpath = os.path.join(dirpath, head) + # Assume there is no issue creating dirs in this temp dir. + os.makedirs(dirpath) + + # Open the filename and write current text. + filename = os.path.join(dirpath, tail) + with open(filename, "w", encoding="utf-8") as fh: + fh.write(self.text) + + def cleanup() -> None: + shutil.rmtree(dirpath) + + return filename, cleanup + + def open_in_editor(self, validate_and_handle: bool = False) -> "asyncio.Task[None]": + """ + Open code in editor. + + This returns a future, and runs in a thread executor. + """ + if self.read_only(): + raise EditReadOnlyBuffer() + + # Write current text to temporary file + if self.tempfile: + filename, cleanup_func = self._editor_complex_tempfile() + else: + filename, cleanup_func = self._editor_simple_tempfile() + + async def run() -> None: + try: + # Open in editor + # (We need to use `run_in_terminal`, because not all editors go to + # the alternate screen buffer, and some could influence the cursor + # position.) + succes = await run_in_terminal( + lambda: self._open_file_in_editor(filename), in_executor=True + ) + + # Read content again. + if succes: + with open(filename, "rb") as f: + text = f.read().decode("utf-8") + + # Drop trailing newline. (Editors are supposed to add it at the + # end, but we don't need it.) + if text.endswith("\n"): + text = text[:-1] + + self.document = Document(text=text, cursor_position=len(text)) + + # Accept the input. + if validate_and_handle: + self.validate_and_handle() + + finally: + # Clean up temp dir/file. + cleanup_func() + + return get_app().create_background_task(run()) + + def _open_file_in_editor(self, filename: str) -> bool: + """ + Call editor executable. + + Return True when we received a zero return code. + """ + # If the 'VISUAL' or 'EDITOR' environment variable has been set, use that. + # Otherwise, fall back to the first available editor that we can find. + visual = os.environ.get("VISUAL") + editor = os.environ.get("EDITOR") + + editors = [ + visual, + editor, + # Order of preference. + "/usr/bin/editor", + "/usr/bin/nano", + "/usr/bin/pico", + "/usr/bin/vi", + "/usr/bin/emacs", + ] + + for e in editors: + if e: + try: + # Use 'shlex.split()', because $VISUAL can contain spaces + # and quotes. + returncode = subprocess.call(shlex.split(e) + [filename]) + return returncode == 0 + + except OSError: + # Executable does not exist, try the next one. + pass + + return False + + def start_completion( + self, + select_first: bool = False, + select_last: bool = False, + insert_common_part: bool = False, + complete_event: Optional[CompleteEvent] = None, + ) -> None: + """ + Start asynchronous autocompletion of this buffer. + (This will do nothing if a previous completion was still in progress.) + """ + # Only one of these options can be selected. + assert select_first + select_last + insert_common_part <= 1 + + get_app().create_background_task( + self._async_completer( + select_first=select_first, + select_last=select_last, + insert_common_part=insert_common_part, + complete_event=complete_event + or CompleteEvent(completion_requested=True), + ) + ) + + def _create_completer_coroutine(self) -> Callable[..., Awaitable[None]]: + """ + Create function for asynchronous autocompletion. + + (This consumes the asynchronous completer generator, which possibly + runs the completion algorithm in another thread.) + """ + + def completion_does_nothing(document: Document, completion: Completion) -> bool: + """ + Return `True` if applying this completion doesn't have any effect. + (When it doesn't insert any new text. + """ + text_before_cursor = document.text_before_cursor + replaced_text = text_before_cursor[ + len(text_before_cursor) + completion.start_position : + ] + return replaced_text == completion.text + + @_only_one_at_a_time + async def async_completer( + select_first: bool = False, + select_last: bool = False, + insert_common_part: bool = False, + complete_event: Optional[CompleteEvent] = None, + ) -> None: + + document = self.document + complete_event = complete_event or CompleteEvent(text_inserted=True) + + # Don't complete when we already have completions. + if self.complete_state or not self.completer: + return + + # Create an empty CompletionState. + complete_state = CompletionState(original_document=self.document) + self.complete_state = complete_state + + def proceed() -> bool: + """Keep retrieving completions. Input text has not yet changed + while generating completions.""" + return self.complete_state == complete_state + + async for completion in self.completer.get_completions_async( + document, complete_event + ): + complete_state.completions.append(completion) + self.on_completions_changed.fire() + + # If the input text changes, abort. + if not proceed(): + break + + completions = complete_state.completions + + # When there is only one completion, which has nothing to add, ignore it. + if len(completions) == 1 and completion_does_nothing( + document, completions[0] + ): + del completions[:] + + # Set completions if the text was not yet changed. + if proceed(): + # When no completions were found, or when the user selected + # already a completion by using the arrow keys, don't do anything. + if ( + not self.complete_state + or self.complete_state.complete_index is not None + ): + return + + # When there are no completions, reset completion state anyway. + if not completions: + self.complete_state = None + # Render the ui if the completion menu was shown + # it is needed especially if there is one completion and it was deleted. + self.on_completions_changed.fire() + return + + # Select first/last or insert common part, depending on the key + # binding. (For this we have to wait until all completions are + # loaded.) + + if select_first: + self.go_to_completion(0) + + elif select_last: + self.go_to_completion(len(completions) - 1) + + elif insert_common_part: + common_part = get_common_complete_suffix(document, completions) + if common_part: + # Insert the common part, update completions. + self.insert_text(common_part) + if len(completions) > 1: + # (Don't call `async_completer` again, but + # recalculate completions. See: + # https://github.com/ipython/ipython/issues/9658) + completions[:] = [ + c.new_completion_from_position(len(common_part)) + for c in completions + ] + + self._set_completions(completions=completions) + else: + self.complete_state = None + else: + # When we were asked to insert the "common" + # prefix, but there was no common suffix but + # still exactly one match, then select the + # first. (It could be that we have a completion + # which does * expansion, like '*.py', with + # exactly one match.) + if len(completions) == 1: + self.go_to_completion(0) + + else: + # If the last operation was an insert, (not a delete), restart + # the completion coroutine. + + if self.document.text_before_cursor == document.text_before_cursor: + return # Nothing changed. + + if self.document.text_before_cursor.startswith( + document.text_before_cursor + ): + raise _Retry + + return async_completer + + def _create_auto_suggest_coroutine(self) -> Callable[[], Awaitable[None]]: + """ + Create function for asynchronous auto suggestion. + (This can be in another thread.) + """ + + @_only_one_at_a_time + async def async_suggestor() -> None: + document = self.document + + # Don't suggest when we already have a suggestion. + if self.suggestion or not self.auto_suggest: + return + + suggestion = await self.auto_suggest.get_suggestion_async(self, document) + + # Set suggestion only if the text was not yet changed. + if self.document == document: + # Set suggestion and redraw interface. + self.suggestion = suggestion + self.on_suggestion_set.fire() + else: + # Otherwise, restart thread. + raise _Retry + + return async_suggestor + + def _create_auto_validate_coroutine(self) -> Callable[[], Awaitable[None]]: + """ + Create a function for asynchronous validation while typing. + (This can be in another thread.) + """ + + @_only_one_at_a_time + async def async_validator() -> None: + await self._validate_async() + + return async_validator + + def validate_and_handle(self) -> None: + """ + Validate buffer and handle the accept action. + """ + valid = self.validate(set_cursor=True) + + # When the validation succeeded, accept the input. + if valid: + if self.accept_handler: + keep_text = self.accept_handler(self) + else: + keep_text = False + + self.append_to_history() + + if not keep_text: + self.reset() + + +_T = TypeVar("_T", bound=Callable[..., Awaitable[None]]) + + +def _only_one_at_a_time(coroutine: _T) -> _T: + """ + Decorator that only starts the coroutine only if the previous call has + finished. (Used to make sure that we have only one autocompleter, auto + suggestor and validator running at a time.) + + When the coroutine raises `_Retry`, it is restarted. + """ + running = False + + @wraps(coroutine) + async def new_coroutine(*a: Any, **kw: Any) -> Any: + nonlocal running + + # Don't start a new function, if the previous is still in progress. + if running: + return + + running = True + + try: + while True: + try: + await coroutine(*a, **kw) + except _Retry: + continue + else: + return None + finally: + running = False + + return cast(_T, new_coroutine) + + +class _Retry(Exception): + "Retry in `_only_one_at_a_time`." + + +def indent(buffer: Buffer, from_row: int, to_row: int, count: int = 1) -> None: + """ + Indent text of a :class:`.Buffer` object. + """ + current_row = buffer.document.cursor_position_row + line_range = range(from_row, to_row) + + # Apply transformation. + new_text = buffer.transform_lines(line_range, lambda l: " " * count + l) + buffer.document = Document( + new_text, Document(new_text).translate_row_col_to_index(current_row, 0) + ) + + # Go to the start of the line. + buffer.cursor_position += buffer.document.get_start_of_line_position( + after_whitespace=True + ) + + +def unindent(buffer: Buffer, from_row: int, to_row: int, count: int = 1) -> None: + """ + Unindent text of a :class:`.Buffer` object. + """ + current_row = buffer.document.cursor_position_row + line_range = range(from_row, to_row) + + def transform(text: str) -> str: + remove = " " * count + if text.startswith(remove): + return text[len(remove) :] + else: + return text.lstrip() + + # Apply transformation. + new_text = buffer.transform_lines(line_range, transform) + buffer.document = Document( + new_text, Document(new_text).translate_row_col_to_index(current_row, 0) + ) + + # Go to the start of the line. + buffer.cursor_position += buffer.document.get_start_of_line_position( + after_whitespace=True + ) + + +def reshape_text(buffer: Buffer, from_row: int, to_row: int) -> None: + """ + Reformat text, taking the width into account. + `to_row` is included. + (Vi 'gq' operator.) + """ + lines = buffer.text.splitlines(True) + lines_before = lines[:from_row] + lines_after = lines[to_row + 1 :] + lines_to_reformat = lines[from_row : to_row + 1] + + if lines_to_reformat: + # Take indentation from the first line. + match = re.search(r"^\s*", lines_to_reformat[0]) + length = match.end() if match else 0 # `match` can't be None, actually. + + indent = lines_to_reformat[0][:length].replace("\n", "") + + # Now, take all the 'words' from the lines to be reshaped. + words = "".join(lines_to_reformat).split() + + # And reshape. + width = (buffer.text_width or 80) - len(indent) + reshaped_text = [indent] + current_width = 0 + for w in words: + if current_width: + if len(w) + current_width + 1 > width: + reshaped_text.append("\n") + reshaped_text.append(indent) + current_width = 0 + else: + reshaped_text.append(" ") + current_width += 1 + + reshaped_text.append(w) + current_width += len(w) + + if reshaped_text[-1] != "\n": + reshaped_text.append("\n") + + # Apply result. + buffer.document = Document( + text="".join(lines_before + reshaped_text + lines_after), + cursor_position=len("".join(lines_before + reshaped_text)), + ) |