diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2025-02-15 09:34:32 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2025-02-15 11:36:46 +0300 |
commit | b78775e5a25dfb7551cdc06dba96cdfe6e9bd6fb (patch) | |
tree | 8fdac4a27404b9036f50883e9afe4e3f37c4c39a /contrib/python/ipython/py3/IPython/terminal/shortcuts | |
parent | 784038d7404cb679026c8cc19204497e8411c75a (diff) | |
download | ydb-b78775e5a25dfb7551cdc06dba96cdfe6e9bd6fb.tar.gz |
Intermediate changes
commit_hash:293c725da86af9df83cab900e86bf2b75cc6b4e8
Diffstat (limited to 'contrib/python/ipython/py3/IPython/terminal/shortcuts')
-rw-r--r-- | contrib/python/ipython/py3/IPython/terminal/shortcuts/auto_suggest.py | 279 |
1 files changed, 263 insertions, 16 deletions
diff --git a/contrib/python/ipython/py3/IPython/terminal/shortcuts/auto_suggest.py b/contrib/python/ipython/py3/IPython/terminal/shortcuts/auto_suggest.py index 94a94a88c1e..bcba5622e49 100644 --- a/contrib/python/ipython/py3/IPython/terminal/shortcuts/auto_suggest.py +++ b/contrib/python/ipython/py3/IPython/terminal/shortcuts/auto_suggest.py @@ -1,13 +1,15 @@ import re +import asyncio import tokenize from io import StringIO -from typing import Callable, List, Optional, Union, Generator, Tuple +from typing import Callable, List, Optional, Union, Generator, Tuple, ClassVar, Any import warnings +import prompt_toolkit from prompt_toolkit.buffer import Buffer from prompt_toolkit.key_binding import KeyPressEvent from prompt_toolkit.key_binding.bindings import named_commands as nc -from prompt_toolkit.auto_suggest import AutoSuggestFromHistory, Suggestion +from prompt_toolkit.auto_suggest import AutoSuggestFromHistory, Suggestion, AutoSuggest from prompt_toolkit.document import Document from prompt_toolkit.history import History from prompt_toolkit.shortcuts import PromptSession @@ -22,6 +24,12 @@ from IPython.utils.tokenutil import generate_tokens from .filters import pass_through +try: + import jupyter_ai_magics + import jupyter_ai.completions.models as jai_models +except ModuleNotFoundError: + jai_models = None + def _get_query(document: Document): return document.lines[document.cursor_position_row] @@ -31,26 +39,124 @@ class AppendAutoSuggestionInAnyLine(Processor): """ Append the auto suggestion to lines other than the last (appending to the last line is natively supported by the prompt toolkit). + + This has a private `_debug` attribute that can be set to True to display + debug information as virtual suggestion on the end of any line. You can do + so with: + + >>> from IPython.terminal.shortcuts.auto_suggest import AppendAutoSuggestionInAnyLine + >>> AppendAutoSuggestionInAnyLine._debug = True + """ + _debug: ClassVar[bool] = False + def __init__(self, style: str = "class:auto-suggestion") -> None: self.style = style def apply_transformation(self, ti: TransformationInput) -> Transformation: - is_last_line = ti.lineno == ti.document.line_count - 1 - is_active_line = ti.lineno == ti.document.cursor_position_row + """ + Apply transformation to the line that is currently being edited. - if not is_last_line and is_active_line: - buffer = ti.buffer_control.buffer + This is a variation of the original implementation in prompt toolkit + that allows to not only append suggestions to any line, but also to show + multi-line suggestions. - if buffer.suggestion and ti.document.is_cursor_at_the_end_of_line: - suggestion = buffer.suggestion.text - else: - suggestion = "" + As transformation are applied on a line-by-line basis; we need to trick + a bit, and elide any line that is after the line we are currently + editing, until we run out of completions. We cannot shift the existing + lines + + There are multiple cases to handle: + + The completions ends before the end of the buffer: + We can resume showing the normal line, and say that some code may + be hidden. + + The completions ends at the end of the buffer + We can just say that some code may be hidden. + + And separately: + + The completions ends beyond the end of the buffer + We need to both say that some code may be hidden, and that some + lines are not shown. + + """ + last_line_number = ti.document.line_count - 1 + is_last_line = ti.lineno == last_line_number + + noop = lambda text: Transformation( + fragments=ti.fragments + [(self.style, " " + text if self._debug else "")] + ) + if ti.document.line_count == 1: + return noop("noop:oneline") + if ti.document.cursor_position_row == last_line_number and is_last_line: + # prompt toolkit already appends something; just leave it be + return noop("noop:last line and cursor") + + # first everything before the current line is unchanged. + if ti.lineno < ti.document.cursor_position_row: + return noop("noop:before cursor") + + buffer = ti.buffer_control.buffer + if not buffer.suggestion or not ti.document.is_cursor_at_the_end_of_line: + return noop("noop:not eol") + + delta = ti.lineno - ti.document.cursor_position_row + suggestions = buffer.suggestion.text.splitlines() + if len(suggestions) == 0: + return noop("noop: no suggestions") + + suggestions_longer_than_buffer: bool = ( + len(suggestions) + ti.document.cursor_position_row > ti.document.line_count + ) + + if len(suggestions) >= 1 and prompt_toolkit.VERSION < (3, 0, 49): + if ti.lineno == ti.document.cursor_position_row: + return Transformation( + fragments=ti.fragments + + [ + ( + "red", + "(Cannot show multiline suggestion; requires prompt_toolkit > 3.0.49)", + ) + ] + ) + else: + return Transformation(fragments=ti.fragments) + if delta == 0: + suggestion = suggestions[0] return Transformation(fragments=ti.fragments + [(self.style, suggestion)]) + if is_last_line: + if delta < len(suggestions): + extra = f"; {len(suggestions) - delta} line(s) hidden" + suggestion = f"… rest of suggestion ({len(suggestions) - delta} lines) and code hidden" + return Transformation([(self.style, suggestion)]) + + n_elided = len(suggestions) + for i in range(len(suggestions)): + ll = ti.get_line(last_line_number - i) + el = "".join(l[1] for l in ll).strip() + if el: + break + else: + n_elided -= 1 + if n_elided: + return Transformation([(self.style, f"… {n_elided} line(s) hidden")]) + else: + return Transformation( + ti.get_line(last_line_number - len(suggestions) + 1) + + ([(self.style, "shift-last-line")] if self._debug else []) + ) + + elif delta < len(suggestions): + suggestion = suggestions[delta] + return Transformation([(self.style, suggestion)]) else: - return Transformation(fragments=ti.fragments) + shift = ti.lineno - len(suggestions) + 1 + return Transformation(ti.get_line(shift)) class NavigableAutoSuggestFromHistory(AutoSuggestFromHistory): @@ -60,16 +166,29 @@ class NavigableAutoSuggestFromHistory(AutoSuggestFromHistory): state need to carefully be cleared on the right events. """ - def __init__( - self, - ): + skip_lines: int + _connected_apps: list[PromptSession] + + # handle to the currently running llm task that appends suggestions to the + # current buffer; we keep a handle to it in order to cancell it when there is a cursor movement, or + # another request. + _llm_task: asyncio.Task | None = None + + # This is the instance of the LLM provider from jupyter-ai to which we forward the request + # to generate inline completions. + _llm_provider: Any | None + + def __init__(self): + super().__init__() self.skip_lines = 0 self._connected_apps = [] + self._llm_provider = None def reset_history_position(self, _: Buffer): self.skip_lines = 0 - def disconnect(self): + def disconnect(self) -> None: + self._cancel_running_llm_task() for pt_app in self._connected_apps: text_insert_event = pt_app.default_buffer.on_text_insert text_insert_event.remove_handler(self.reset_history_position) @@ -94,7 +213,8 @@ class NavigableAutoSuggestFromHistory(AutoSuggestFromHistory): return None - def _dismiss(self, buffer, *args, **kwargs): + def _dismiss(self, buffer, *args, **kwargs) -> None: + self._cancel_running_llm_task() buffer.suggestion = None def _find_match( @@ -149,6 +269,7 @@ class NavigableAutoSuggestFromHistory(AutoSuggestFromHistory): ) def up(self, query: str, other_than: str, history: History) -> None: + self._cancel_running_llm_task() for suggestion, line_number in self._find_next_match( query, self.skip_lines, history ): @@ -165,6 +286,7 @@ class NavigableAutoSuggestFromHistory(AutoSuggestFromHistory): self.skip_lines = 0 def down(self, query: str, other_than: str, history: History) -> None: + self._cancel_running_llm_task() for suggestion, line_number in self._find_previous_match( query, self.skip_lines, history ): @@ -180,6 +302,131 @@ class NavigableAutoSuggestFromHistory(AutoSuggestFromHistory): self.skip_lines = line_number break + def _cancel_running_llm_task(self) -> None: + """ + Try to cancell the currently running llm_task if exists, and set it to None. + """ + if self._llm_task is not None: + if self._llm_task.done(): + self._llm_task = None + return + cancelled = self._llm_task.cancel() + if cancelled: + self._llm_task = None + if not cancelled: + warnings.warn( + "LLM task not cancelled, does your provider support cancellation?" + ) + + async def _trigger_llm(self, buffer) -> None: + """ + This will ask the current llm provider a suggestion for the current buffer. + + If there is a currently running llm task, it will cancel it. + """ + # we likely want to store the current cursor position, and cancel if the cursor has moved. + if not self._llm_provider: + warnings.warn("No LLM provider found, cannot trigger LLM completions") + return + if jai_models is None: + warnings.warn( + "LLM Completion requires `jupyter_ai_magics` and `jupyter_ai` to be installed" + ) + + self._cancel_running_llm_task() + + async def error_catcher(buffer): + """ + This catches and log any errors, as otherwise this is just + lost in the void of the future running task. + """ + try: + await self._trigger_llm_core(buffer) + except Exception as e: + get_ipython().log.error("error") + raise + + # here we need a cancellable task so we can't just await the error catched + self._llm_task = asyncio.create_task(error_catcher(buffer)) + await self._llm_task + + async def _trigger_llm_core(self, buffer: Buffer): + """ + This is the core of the current llm request. + + Here we build a compatible `InlineCompletionRequest` and ask the llm + provider to stream it's response back to us iteratively setting it as + the suggestion on the current buffer. + + Unlike with JupyterAi, as we do not have multiple cell, the cell number + is always set to `0`, note that we _could_ set it to a new number each + time and ignore threply from past numbers. + + We set the prefix to the current cell content, but could also inset the + rest of the history or even just the non-fail history. + + In the same way, we do not have cell id. + + LLM provider may return multiple suggestion stream, but for the time + being we only support one. + + Here we make the assumption that the provider will have + stream_inline_completions, I'm not sure it is the case for all + providers. + """ + + request = jai_models.InlineCompletionRequest( + number=0, + prefix=buffer.document.text, + suffix="", + mime="text/x-python", + stream=True, + path=None, + language="python", + cell_id=None, + ) + + async for reply_and_chunks in self._llm_provider.stream_inline_completions( + request + ): + if isinstance(reply_and_chunks, jai_models.InlineCompletionReply): + if len(reply_and_chunks.list.items) > 1: + raise ValueError( + "Terminal IPython cannot deal with multiple LLM suggestions at once" + ) + buffer.suggestion = Suggestion( + reply_and_chunks.list.items[0].insertText + ) + buffer.on_suggestion_set.fire() + elif isinstance(reply_and_chunks, jai_models.InlineCompletionStreamChunk): + buffer.suggestion = Suggestion(reply_and_chunks.response.insertText) + buffer.on_suggestion_set.fire() + return + + +_MIN_LINES = 5 + + +async def llm_autosuggestion(event: KeyPressEvent): + """ + Ask the AutoSuggester from history to delegate to ask an LLM for completion + + This will first make sure that the current buffer have _MIN_LINES (7) + available lines to insert the LLM completion + + Provisional as of 8.32, may change without warnigns + + """ + provider = get_ipython().auto_suggest + if not isinstance(provider, NavigableAutoSuggestFromHistory): + return + doc = event.current_buffer.document + lines_to_insert = max(0, _MIN_LINES - doc.line_count + doc.cursor_position_row) + for _ in range(lines_to_insert): + event.current_buffer.insert_text("\n", move_cursor=False) + + await provider._trigger_llm(event.current_buffer) + def accept_or_jump_to_end(event: KeyPressEvent): """Apply autosuggestion or jump to end of line.""" |