aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/ipython/py3/IPython/terminal/shortcuts/auto_suggest.py
blob: bcba5622e49cab64b5520c2e15c60b99fd38d418 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
import re
import asyncio
import tokenize
from io import StringIO
from typing import Callable, List, Optional, Union, Generator, Tuple, ClassVar, Any
import warnings

import prompt_toolkit
from prompt_toolkit.buffer import Buffer
from prompt_toolkit.key_binding import KeyPressEvent
from prompt_toolkit.key_binding.bindings import named_commands as nc
from prompt_toolkit.auto_suggest import AutoSuggestFromHistory, Suggestion, AutoSuggest
from prompt_toolkit.document import Document
from prompt_toolkit.history import History
from prompt_toolkit.shortcuts import PromptSession
from prompt_toolkit.layout.processors import (
    Processor,
    Transformation,
    TransformationInput,
)

from IPython.core.getipython import get_ipython
from IPython.utils.tokenutil import generate_tokens

from .filters import pass_through

try:
    import jupyter_ai_magics
    import jupyter_ai.completions.models as jai_models
except ModuleNotFoundError:
    jai_models = None


def _get_query(document: Document):
    return document.lines[document.cursor_position_row]


class AppendAutoSuggestionInAnyLine(Processor):
    """
    Append the auto suggestion to lines other than the last (appending to the
    last line is natively supported by the prompt toolkit).

    This has a private `_debug` attribute that can be set to True to display
    debug information as virtual suggestion on the end of any line. You can do
    so with:

        >>> from IPython.terminal.shortcuts.auto_suggest import AppendAutoSuggestionInAnyLine
        >>> AppendAutoSuggestionInAnyLine._debug = True

    """

    _debug: ClassVar[bool] = False

    def __init__(self, style: str = "class:auto-suggestion") -> None:
        self.style = style

    def apply_transformation(self, ti: TransformationInput) -> Transformation:
        """
         Apply transformation to the line that is currently being edited.

         This is a variation of the original implementation in prompt toolkit
         that allows to not only append suggestions to any line, but also to show
         multi-line suggestions.

         As transformation are applied on a line-by-line basis; we need to trick
         a bit, and elide any line that is after the line we are currently
         editing, until we run out of completions. We cannot shift the existing
         lines

         There are multiple cases to handle:

         The completions ends before the end of the buffer:
             We can resume showing the normal line, and say that some code may
             be hidden.

        The completions ends at the end of the buffer
             We can just say that some code may be hidden.

         And separately:

         The completions ends beyond the end of the buffer
             We need to both say that some code may be hidden, and that some
             lines are not shown.

        """
        last_line_number = ti.document.line_count - 1
        is_last_line = ti.lineno == last_line_number

        noop = lambda text: Transformation(
            fragments=ti.fragments + [(self.style, " " + text if self._debug else "")]
        )
        if ti.document.line_count == 1:
            return noop("noop:oneline")
        if ti.document.cursor_position_row == last_line_number and is_last_line:
            # prompt toolkit already appends something; just leave it be
            return noop("noop:last line and cursor")

        # first everything before the current line is unchanged.
        if ti.lineno < ti.document.cursor_position_row:
            return noop("noop:before cursor")

        buffer = ti.buffer_control.buffer
        if not buffer.suggestion or not ti.document.is_cursor_at_the_end_of_line:
            return noop("noop:not eol")

        delta = ti.lineno - ti.document.cursor_position_row
        suggestions = buffer.suggestion.text.splitlines()

        if len(suggestions) == 0:
            return noop("noop: no suggestions")

        suggestions_longer_than_buffer: bool = (
            len(suggestions) + ti.document.cursor_position_row > ti.document.line_count
        )

        if len(suggestions) >= 1 and prompt_toolkit.VERSION < (3, 0, 49):
            if ti.lineno == ti.document.cursor_position_row:
                return Transformation(
                    fragments=ti.fragments
                    + [
                        (
                            "red",
                            "(Cannot show multiline suggestion; requires prompt_toolkit > 3.0.49)",
                        )
                    ]
                )
            else:
                return Transformation(fragments=ti.fragments)
        if delta == 0:
            suggestion = suggestions[0]
            return Transformation(fragments=ti.fragments + [(self.style, suggestion)])
        if is_last_line:
            if delta < len(suggestions):
                extra = f"; {len(suggestions) - delta} line(s) hidden"
                suggestion = f"… rest of suggestion ({len(suggestions) - delta} lines) and code hidden"
                return Transformation([(self.style, suggestion)])

            n_elided = len(suggestions)
            for i in range(len(suggestions)):
                ll = ti.get_line(last_line_number - i)
                el = "".join(l[1] for l in ll).strip()
                if el:
                    break
                else:
                    n_elided -= 1
            if n_elided:
                return Transformation([(self.style, f"… {n_elided} line(s) hidden")])
            else:
                return Transformation(
                    ti.get_line(last_line_number - len(suggestions) + 1)
                    + ([(self.style, "shift-last-line")] if self._debug else [])
                )

        elif delta < len(suggestions):
            suggestion = suggestions[delta]
            return Transformation([(self.style, suggestion)])
        else:
            shift = ti.lineno - len(suggestions) + 1
            return Transformation(ti.get_line(shift))


class NavigableAutoSuggestFromHistory(AutoSuggestFromHistory):
    """
    A subclass of AutoSuggestFromHistory that allow navigation to next/previous
    suggestion from history. To do so it remembers the current position, but it
    state need to carefully be cleared on the right events.
    """

    skip_lines: int
    _connected_apps: list[PromptSession]

    # handle to the currently running llm task that appends suggestions to the
    # current buffer; we keep a handle to it in order to cancell it when there is a cursor movement, or
    # another request.
    _llm_task: asyncio.Task | None = None

    # This is the instance of the LLM provider from jupyter-ai to which we forward the request
    # to generate inline completions.
    _llm_provider: Any | None

    def __init__(self):
        super().__init__()
        self.skip_lines = 0
        self._connected_apps = []
        self._llm_provider = None

    def reset_history_position(self, _: Buffer):
        self.skip_lines = 0

    def disconnect(self) -> None:
        self._cancel_running_llm_task()
        for pt_app in self._connected_apps:
            text_insert_event = pt_app.default_buffer.on_text_insert
            text_insert_event.remove_handler(self.reset_history_position)

    def connect(self, pt_app: PromptSession):
        self._connected_apps.append(pt_app)
        # note: `on_text_changed` could be used for a bit different behaviour
        # on character deletion (i.e. resetting history position on backspace)
        pt_app.default_buffer.on_text_insert.add_handler(self.reset_history_position)
        pt_app.default_buffer.on_cursor_position_changed.add_handler(self._dismiss)

    def get_suggestion(
        self, buffer: Buffer, document: Document
    ) -> Optional[Suggestion]:
        text = _get_query(document)

        if text.strip():
            for suggestion, _ in self._find_next_match(
                text, self.skip_lines, buffer.history
            ):
                return Suggestion(suggestion)

        return None

    def _dismiss(self, buffer, *args, **kwargs) -> None:
        self._cancel_running_llm_task()
        buffer.suggestion = None

    def _find_match(
        self, text: str, skip_lines: float, history: History, previous: bool
    ) -> Generator[Tuple[str, float], None, None]:
        """
        text : str
            Text content to find a match for, the user cursor is most of the
            time at the end of this text.
        skip_lines : float
            number of items to skip in the search, this is used to indicate how
            far in the list the user has navigated by pressing up or down.
            The float type is used as the base value is +inf
        history : History
            prompt_toolkit History instance to fetch previous entries from.
        previous : bool
            Direction of the search, whether we are looking previous match
            (True), or next match (False).

        Yields
        ------
        Tuple with:
        str:
            current suggestion.
        float:
            will actually yield only ints, which is passed back via skip_lines,
            which may be a +inf (float)


        """
        line_number = -1
        for string in reversed(list(history.get_strings())):
            for line in reversed(string.splitlines()):
                line_number += 1
                if not previous and line_number < skip_lines:
                    continue
                # do not return empty suggestions as these
                # close the auto-suggestion overlay (and are useless)
                if line.startswith(text) and len(line) > len(text):
                    yield line[len(text) :], line_number
                if previous and line_number >= skip_lines:
                    return

    def _find_next_match(
        self, text: str, skip_lines: float, history: History
    ) -> Generator[Tuple[str, float], None, None]:
        return self._find_match(text, skip_lines, history, previous=False)

    def _find_previous_match(self, text: str, skip_lines: float, history: History):
        return reversed(
            list(self._find_match(text, skip_lines, history, previous=True))
        )

    def up(self, query: str, other_than: str, history: History) -> None:
        self._cancel_running_llm_task()
        for suggestion, line_number in self._find_next_match(
            query, self.skip_lines, history
        ):
            # if user has history ['very.a', 'very', 'very.b'] and typed 'very'
            # we want to switch from 'very.b' to 'very.a' because a) if the
            # suggestion equals current text, prompt-toolkit aborts suggesting
            # b) user likely would not be interested in 'very' anyways (they
            # already typed it).
            if query + suggestion != other_than:
                self.skip_lines = line_number
                break
        else:
            # no matches found, cycle back to beginning
            self.skip_lines = 0

    def down(self, query: str, other_than: str, history: History) -> None:
        self._cancel_running_llm_task()
        for suggestion, line_number in self._find_previous_match(
            query, self.skip_lines, history
        ):
            if query + suggestion != other_than:
                self.skip_lines = line_number
                break
        else:
            # no matches found, cycle to end
            for suggestion, line_number in self._find_previous_match(
                query, float("Inf"), history
            ):
                if query + suggestion != other_than:
                    self.skip_lines = line_number
                    break

    def _cancel_running_llm_task(self) -> None:
        """
        Try to cancell the currently running llm_task if exists, and set it to None.
        """
        if self._llm_task is not None:
            if self._llm_task.done():
                self._llm_task = None
                return
            cancelled = self._llm_task.cancel()
            if cancelled:
                self._llm_task = None
            if not cancelled:
                warnings.warn(
                    "LLM task not cancelled, does your provider support cancellation?"
                )

    async def _trigger_llm(self, buffer) -> None:
        """
        This will ask the current llm provider a suggestion for the current buffer.

        If there is a currently running llm task, it will cancel it.
        """
        # we likely want to store the current cursor position, and cancel if the cursor has moved.
        if not self._llm_provider:
            warnings.warn("No LLM provider found, cannot trigger LLM completions")
            return
        if jai_models is None:
            warnings.warn(
                "LLM Completion requires `jupyter_ai_magics` and `jupyter_ai` to be installed"
            )

        self._cancel_running_llm_task()

        async def error_catcher(buffer):
            """
            This catches and log any errors, as otherwise this is just
            lost in the void of the future running task.
            """
            try:
                await self._trigger_llm_core(buffer)
            except Exception as e:
                get_ipython().log.error("error")
                raise

        # here we need a cancellable task so we can't just await the error catched
        self._llm_task = asyncio.create_task(error_catcher(buffer))
        await self._llm_task

    async def _trigger_llm_core(self, buffer: Buffer):
        """
        This is the core of the current llm request.

        Here we build a compatible `InlineCompletionRequest` and ask the llm
        provider to stream it's response back to us iteratively setting it as
        the suggestion on the current buffer.

        Unlike with JupyterAi, as we do not have multiple cell, the cell number
        is always set to `0`, note that we _could_ set it to a new number each
        time and ignore threply from past numbers.

        We set the prefix to the current cell content, but could also inset the
        rest of the history or even just the non-fail history.

        In the same way, we do not have cell id.

        LLM provider may return multiple suggestion stream, but for the time
        being we only support one.

        Here we make the assumption that the provider will have
        stream_inline_completions, I'm not sure it is the case for all
        providers.
        """

        request = jai_models.InlineCompletionRequest(
            number=0,
            prefix=buffer.document.text,
            suffix="",
            mime="text/x-python",
            stream=True,
            path=None,
            language="python",
            cell_id=None,
        )

        async for reply_and_chunks in self._llm_provider.stream_inline_completions(
            request
        ):
            if isinstance(reply_and_chunks, jai_models.InlineCompletionReply):
                if len(reply_and_chunks.list.items) > 1:
                    raise ValueError(
                        "Terminal IPython cannot deal with multiple LLM suggestions at once"
                    )
                buffer.suggestion = Suggestion(
                    reply_and_chunks.list.items[0].insertText
                )
                buffer.on_suggestion_set.fire()
            elif isinstance(reply_and_chunks, jai_models.InlineCompletionStreamChunk):
                buffer.suggestion = Suggestion(reply_and_chunks.response.insertText)
                buffer.on_suggestion_set.fire()
        return


_MIN_LINES = 5


async def llm_autosuggestion(event: KeyPressEvent):
    """
    Ask the AutoSuggester from history to delegate to ask an LLM for completion

    This will first make sure that the current buffer have _MIN_LINES (7)
    available lines to insert the LLM completion

    Provisional as of 8.32, may change without warnigns

    """
    provider = get_ipython().auto_suggest
    if not isinstance(provider, NavigableAutoSuggestFromHistory):
        return
    doc = event.current_buffer.document
    lines_to_insert = max(0, _MIN_LINES - doc.line_count + doc.cursor_position_row)
    for _ in range(lines_to_insert):
        event.current_buffer.insert_text("\n", move_cursor=False)

    await provider._trigger_llm(event.current_buffer)


def accept_or_jump_to_end(event: KeyPressEvent):
    """Apply autosuggestion or jump to end of line."""
    buffer = event.current_buffer
    d = buffer.document
    after_cursor = d.text[d.cursor_position :]
    lines = after_cursor.split("\n")
    end_of_current_line = lines[0].strip()
    suggestion = buffer.suggestion
    if (suggestion is not None) and (suggestion.text) and (end_of_current_line == ""):
        buffer.insert_text(suggestion.text)
    else:
        nc.end_of_line(event)


def _deprected_accept_in_vi_insert_mode(event: KeyPressEvent):
    """Accept autosuggestion or jump to end of line.

    .. deprecated:: 8.12
        Use `accept_or_jump_to_end` instead.
    """
    return accept_or_jump_to_end(event)


def accept(event: KeyPressEvent):
    """Accept autosuggestion"""
    buffer = event.current_buffer
    suggestion = buffer.suggestion
    if suggestion:
        buffer.insert_text(suggestion.text)
    else:
        nc.forward_char(event)


def discard(event: KeyPressEvent):
    """Discard autosuggestion"""
    buffer = event.current_buffer
    buffer.suggestion = None


def accept_word(event: KeyPressEvent):
    """Fill partial autosuggestion by word"""
    buffer = event.current_buffer
    suggestion = buffer.suggestion
    if suggestion:
        t = re.split(r"(\S+\s+)", suggestion.text)
        buffer.insert_text(next((x for x in t if x), ""))
    else:
        nc.forward_word(event)


def accept_character(event: KeyPressEvent):
    """Fill partial autosuggestion by character"""
    b = event.current_buffer
    suggestion = b.suggestion
    if suggestion and suggestion.text:
        b.insert_text(suggestion.text[0])


def accept_and_keep_cursor(event: KeyPressEvent):
    """Accept autosuggestion and keep cursor in place"""
    buffer = event.current_buffer
    old_position = buffer.cursor_position
    suggestion = buffer.suggestion
    if suggestion:
        buffer.insert_text(suggestion.text)
        buffer.cursor_position = old_position


def accept_and_move_cursor_left(event: KeyPressEvent):
    """Accept autosuggestion and move cursor left in place"""
    accept_and_keep_cursor(event)
    nc.backward_char(event)


def _update_hint(buffer: Buffer):
    if buffer.auto_suggest:
        suggestion = buffer.auto_suggest.get_suggestion(buffer, buffer.document)
        buffer.suggestion = suggestion


def backspace_and_resume_hint(event: KeyPressEvent):
    """Resume autosuggestions after deleting last character"""
    nc.backward_delete_char(event)
    _update_hint(event.current_buffer)


def resume_hinting(event: KeyPressEvent):
    """Resume autosuggestions"""
    pass_through.reply(event)
    # Order matters: if update happened first and event reply second, the
    # suggestion would be auto-accepted if both actions are bound to same key.
    _update_hint(event.current_buffer)


def up_and_update_hint(event: KeyPressEvent):
    """Go up and update hint"""
    current_buffer = event.current_buffer

    current_buffer.auto_up(count=event.arg)
    _update_hint(current_buffer)


def down_and_update_hint(event: KeyPressEvent):
    """Go down and update hint"""
    current_buffer = event.current_buffer

    current_buffer.auto_down(count=event.arg)
    _update_hint(current_buffer)


def accept_token(event: KeyPressEvent):
    """Fill partial autosuggestion by token"""
    b = event.current_buffer
    suggestion = b.suggestion

    if suggestion:
        prefix = _get_query(b.document)
        text = prefix + suggestion.text

        tokens: List[Optional[str]] = [None, None, None]
        substrings = [""]
        i = 0

        for token in generate_tokens(StringIO(text).readline):
            if token.type == tokenize.NEWLINE:
                index = len(text)
            else:
                index = text.index(token[1], len(substrings[-1]))
            substrings.append(text[:index])
            tokenized_so_far = substrings[-1]
            if tokenized_so_far.startswith(prefix):
                if i == 0 and len(tokenized_so_far) > len(prefix):
                    tokens[0] = tokenized_so_far[len(prefix) :]
                    substrings.append(tokenized_so_far)
                    i += 1
                tokens[i] = token[1]
                if i == 2:
                    break
                i += 1

        if tokens[0]:
            to_insert: str
            insert_text = substrings[-2]
            if tokens[1] and len(tokens[1]) == 1:
                insert_text = substrings[-1]
            to_insert = insert_text[len(prefix) :]
            b.insert_text(to_insert)
            return

    nc.forward_word(event)


Provider = Union[AutoSuggestFromHistory, NavigableAutoSuggestFromHistory, None]


def _swap_autosuggestion(
    buffer: Buffer,
    provider: NavigableAutoSuggestFromHistory,
    direction_method: Callable,
):
    """
    We skip most recent history entry (in either direction) if it equals the
    current autosuggestion because if user cycles when auto-suggestion is shown
    they most likely want something else than what was suggested (otherwise
    they would have accepted the suggestion).
    """
    suggestion = buffer.suggestion
    if not suggestion:
        return

    query = _get_query(buffer.document)
    current = query + suggestion.text

    direction_method(query=query, other_than=current, history=buffer.history)

    new_suggestion = provider.get_suggestion(buffer, buffer.document)
    buffer.suggestion = new_suggestion


def swap_autosuggestion_up(event: KeyPressEvent):
    """Get next autosuggestion from history."""
    shell = get_ipython()
    provider = shell.auto_suggest

    if not isinstance(provider, NavigableAutoSuggestFromHistory):
        return

    return _swap_autosuggestion(
        buffer=event.current_buffer, provider=provider, direction_method=provider.up
    )


def swap_autosuggestion_down(event: KeyPressEvent):
    """Get previous autosuggestion from history."""
    shell = get_ipython()
    provider = shell.auto_suggest

    if not isinstance(provider, NavigableAutoSuggestFromHistory):
        return

    return _swap_autosuggestion(
        buffer=event.current_buffer,
        provider=provider,
        direction_method=provider.down,
    )


def __getattr__(key):
    if key == "accept_in_vi_insert_mode":
        warnings.warn(
            "`accept_in_vi_insert_mode` is deprecated since IPython 8.12 and "
            "renamed to `accept_or_jump_to_end`. Please update your configuration "
            "accordingly",
            DeprecationWarning,
            stacklevel=2,
        )
        return _deprected_accept_in_vi_insert_mode
    raise AttributeError