1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
|
import re
import asyncio
import tokenize
from io import StringIO
from typing import Callable, List, Optional, Union, Generator, Tuple, ClassVar, Any
import warnings
import prompt_toolkit
from prompt_toolkit.buffer import Buffer
from prompt_toolkit.key_binding import KeyPressEvent
from prompt_toolkit.key_binding.bindings import named_commands as nc
from prompt_toolkit.auto_suggest import AutoSuggestFromHistory, Suggestion, AutoSuggest
from prompt_toolkit.document import Document
from prompt_toolkit.history import History
from prompt_toolkit.shortcuts import PromptSession
from prompt_toolkit.layout.processors import (
Processor,
Transformation,
TransformationInput,
)
from IPython.core.getipython import get_ipython
from IPython.utils.tokenutil import generate_tokens
from .filters import pass_through
try:
import jupyter_ai_magics
import jupyter_ai.completions.models as jai_models
except ModuleNotFoundError:
jai_models = None
def _get_query(document: Document):
return document.lines[document.cursor_position_row]
class AppendAutoSuggestionInAnyLine(Processor):
"""
Append the auto suggestion to lines other than the last (appending to the
last line is natively supported by the prompt toolkit).
This has a private `_debug` attribute that can be set to True to display
debug information as virtual suggestion on the end of any line. You can do
so with:
>>> from IPython.terminal.shortcuts.auto_suggest import AppendAutoSuggestionInAnyLine
>>> AppendAutoSuggestionInAnyLine._debug = True
"""
_debug: ClassVar[bool] = False
def __init__(self, style: str = "class:auto-suggestion") -> None:
self.style = style
def apply_transformation(self, ti: TransformationInput) -> Transformation:
"""
Apply transformation to the line that is currently being edited.
This is a variation of the original implementation in prompt toolkit
that allows to not only append suggestions to any line, but also to show
multi-line suggestions.
As transformation are applied on a line-by-line basis; we need to trick
a bit, and elide any line that is after the line we are currently
editing, until we run out of completions. We cannot shift the existing
lines
There are multiple cases to handle:
The completions ends before the end of the buffer:
We can resume showing the normal line, and say that some code may
be hidden.
The completions ends at the end of the buffer
We can just say that some code may be hidden.
And separately:
The completions ends beyond the end of the buffer
We need to both say that some code may be hidden, and that some
lines are not shown.
"""
last_line_number = ti.document.line_count - 1
is_last_line = ti.lineno == last_line_number
noop = lambda text: Transformation(
fragments=ti.fragments + [(self.style, " " + text if self._debug else "")]
)
if ti.document.line_count == 1:
return noop("noop:oneline")
if ti.document.cursor_position_row == last_line_number and is_last_line:
# prompt toolkit already appends something; just leave it be
return noop("noop:last line and cursor")
# first everything before the current line is unchanged.
if ti.lineno < ti.document.cursor_position_row:
return noop("noop:before cursor")
buffer = ti.buffer_control.buffer
if not buffer.suggestion or not ti.document.is_cursor_at_the_end_of_line:
return noop("noop:not eol")
delta = ti.lineno - ti.document.cursor_position_row
suggestions = buffer.suggestion.text.splitlines()
if len(suggestions) == 0:
return noop("noop: no suggestions")
suggestions_longer_than_buffer: bool = (
len(suggestions) + ti.document.cursor_position_row > ti.document.line_count
)
if len(suggestions) >= 1 and prompt_toolkit.VERSION < (3, 0, 49):
if ti.lineno == ti.document.cursor_position_row:
return Transformation(
fragments=ti.fragments
+ [
(
"red",
"(Cannot show multiline suggestion; requires prompt_toolkit > 3.0.49)",
)
]
)
else:
return Transformation(fragments=ti.fragments)
if delta == 0:
suggestion = suggestions[0]
return Transformation(fragments=ti.fragments + [(self.style, suggestion)])
if is_last_line:
if delta < len(suggestions):
extra = f"; {len(suggestions) - delta} line(s) hidden"
suggestion = f"… rest of suggestion ({len(suggestions) - delta} lines) and code hidden"
return Transformation([(self.style, suggestion)])
n_elided = len(suggestions)
for i in range(len(suggestions)):
ll = ti.get_line(last_line_number - i)
el = "".join(l[1] for l in ll).strip()
if el:
break
else:
n_elided -= 1
if n_elided:
return Transformation([(self.style, f"… {n_elided} line(s) hidden")])
else:
return Transformation(
ti.get_line(last_line_number - len(suggestions) + 1)
+ ([(self.style, "shift-last-line")] if self._debug else [])
)
elif delta < len(suggestions):
suggestion = suggestions[delta]
return Transformation([(self.style, suggestion)])
else:
shift = ti.lineno - len(suggestions) + 1
return Transformation(ti.get_line(shift))
class NavigableAutoSuggestFromHistory(AutoSuggestFromHistory):
"""
A subclass of AutoSuggestFromHistory that allow navigation to next/previous
suggestion from history. To do so it remembers the current position, but it
state need to carefully be cleared on the right events.
"""
skip_lines: int
_connected_apps: list[PromptSession]
# handle to the currently running llm task that appends suggestions to the
# current buffer; we keep a handle to it in order to cancell it when there is a cursor movement, or
# another request.
_llm_task: asyncio.Task | None = None
# This is the instance of the LLM provider from jupyter-ai to which we forward the request
# to generate inline completions.
_llm_provider: Any | None
def __init__(self):
super().__init__()
self.skip_lines = 0
self._connected_apps = []
self._llm_provider = None
def reset_history_position(self, _: Buffer):
self.skip_lines = 0
def disconnect(self) -> None:
self._cancel_running_llm_task()
for pt_app in self._connected_apps:
text_insert_event = pt_app.default_buffer.on_text_insert
text_insert_event.remove_handler(self.reset_history_position)
def connect(self, pt_app: PromptSession):
self._connected_apps.append(pt_app)
# note: `on_text_changed` could be used for a bit different behaviour
# on character deletion (i.e. resetting history position on backspace)
pt_app.default_buffer.on_text_insert.add_handler(self.reset_history_position)
pt_app.default_buffer.on_cursor_position_changed.add_handler(self._dismiss)
def get_suggestion(
self, buffer: Buffer, document: Document
) -> Optional[Suggestion]:
text = _get_query(document)
if text.strip():
for suggestion, _ in self._find_next_match(
text, self.skip_lines, buffer.history
):
return Suggestion(suggestion)
return None
def _dismiss(self, buffer, *args, **kwargs) -> None:
self._cancel_running_llm_task()
buffer.suggestion = None
def _find_match(
self, text: str, skip_lines: float, history: History, previous: bool
) -> Generator[Tuple[str, float], None, None]:
"""
text : str
Text content to find a match for, the user cursor is most of the
time at the end of this text.
skip_lines : float
number of items to skip in the search, this is used to indicate how
far in the list the user has navigated by pressing up or down.
The float type is used as the base value is +inf
history : History
prompt_toolkit History instance to fetch previous entries from.
previous : bool
Direction of the search, whether we are looking previous match
(True), or next match (False).
Yields
------
Tuple with:
str:
current suggestion.
float:
will actually yield only ints, which is passed back via skip_lines,
which may be a +inf (float)
"""
line_number = -1
for string in reversed(list(history.get_strings())):
for line in reversed(string.splitlines()):
line_number += 1
if not previous and line_number < skip_lines:
continue
# do not return empty suggestions as these
# close the auto-suggestion overlay (and are useless)
if line.startswith(text) and len(line) > len(text):
yield line[len(text) :], line_number
if previous and line_number >= skip_lines:
return
def _find_next_match(
self, text: str, skip_lines: float, history: History
) -> Generator[Tuple[str, float], None, None]:
return self._find_match(text, skip_lines, history, previous=False)
def _find_previous_match(self, text: str, skip_lines: float, history: History):
return reversed(
list(self._find_match(text, skip_lines, history, previous=True))
)
def up(self, query: str, other_than: str, history: History) -> None:
self._cancel_running_llm_task()
for suggestion, line_number in self._find_next_match(
query, self.skip_lines, history
):
# if user has history ['very.a', 'very', 'very.b'] and typed 'very'
# we want to switch from 'very.b' to 'very.a' because a) if the
# suggestion equals current text, prompt-toolkit aborts suggesting
# b) user likely would not be interested in 'very' anyways (they
# already typed it).
if query + suggestion != other_than:
self.skip_lines = line_number
break
else:
# no matches found, cycle back to beginning
self.skip_lines = 0
def down(self, query: str, other_than: str, history: History) -> None:
self._cancel_running_llm_task()
for suggestion, line_number in self._find_previous_match(
query, self.skip_lines, history
):
if query + suggestion != other_than:
self.skip_lines = line_number
break
else:
# no matches found, cycle to end
for suggestion, line_number in self._find_previous_match(
query, float("Inf"), history
):
if query + suggestion != other_than:
self.skip_lines = line_number
break
def _cancel_running_llm_task(self) -> None:
"""
Try to cancell the currently running llm_task if exists, and set it to None.
"""
if self._llm_task is not None:
if self._llm_task.done():
self._llm_task = None
return
cancelled = self._llm_task.cancel()
if cancelled:
self._llm_task = None
if not cancelled:
warnings.warn(
"LLM task not cancelled, does your provider support cancellation?"
)
async def _trigger_llm(self, buffer) -> None:
"""
This will ask the current llm provider a suggestion for the current buffer.
If there is a currently running llm task, it will cancel it.
"""
# we likely want to store the current cursor position, and cancel if the cursor has moved.
if not self._llm_provider:
warnings.warn("No LLM provider found, cannot trigger LLM completions")
return
if jai_models is None:
warnings.warn(
"LLM Completion requires `jupyter_ai_magics` and `jupyter_ai` to be installed"
)
self._cancel_running_llm_task()
async def error_catcher(buffer):
"""
This catches and log any errors, as otherwise this is just
lost in the void of the future running task.
"""
try:
await self._trigger_llm_core(buffer)
except Exception as e:
get_ipython().log.error("error")
raise
# here we need a cancellable task so we can't just await the error catched
self._llm_task = asyncio.create_task(error_catcher(buffer))
await self._llm_task
async def _trigger_llm_core(self, buffer: Buffer):
"""
This is the core of the current llm request.
Here we build a compatible `InlineCompletionRequest` and ask the llm
provider to stream it's response back to us iteratively setting it as
the suggestion on the current buffer.
Unlike with JupyterAi, as we do not have multiple cell, the cell number
is always set to `0`, note that we _could_ set it to a new number each
time and ignore threply from past numbers.
We set the prefix to the current cell content, but could also inset the
rest of the history or even just the non-fail history.
In the same way, we do not have cell id.
LLM provider may return multiple suggestion stream, but for the time
being we only support one.
Here we make the assumption that the provider will have
stream_inline_completions, I'm not sure it is the case for all
providers.
"""
request = jai_models.InlineCompletionRequest(
number=0,
prefix=buffer.document.text,
suffix="",
mime="text/x-python",
stream=True,
path=None,
language="python",
cell_id=None,
)
async for reply_and_chunks in self._llm_provider.stream_inline_completions(
request
):
if isinstance(reply_and_chunks, jai_models.InlineCompletionReply):
if len(reply_and_chunks.list.items) > 1:
raise ValueError(
"Terminal IPython cannot deal with multiple LLM suggestions at once"
)
buffer.suggestion = Suggestion(
reply_and_chunks.list.items[0].insertText
)
buffer.on_suggestion_set.fire()
elif isinstance(reply_and_chunks, jai_models.InlineCompletionStreamChunk):
buffer.suggestion = Suggestion(reply_and_chunks.response.insertText)
buffer.on_suggestion_set.fire()
return
_MIN_LINES = 5
async def llm_autosuggestion(event: KeyPressEvent):
"""
Ask the AutoSuggester from history to delegate to ask an LLM for completion
This will first make sure that the current buffer have _MIN_LINES (7)
available lines to insert the LLM completion
Provisional as of 8.32, may change without warnigns
"""
provider = get_ipython().auto_suggest
if not isinstance(provider, NavigableAutoSuggestFromHistory):
return
doc = event.current_buffer.document
lines_to_insert = max(0, _MIN_LINES - doc.line_count + doc.cursor_position_row)
for _ in range(lines_to_insert):
event.current_buffer.insert_text("\n", move_cursor=False)
await provider._trigger_llm(event.current_buffer)
def accept_or_jump_to_end(event: KeyPressEvent):
"""Apply autosuggestion or jump to end of line."""
buffer = event.current_buffer
d = buffer.document
after_cursor = d.text[d.cursor_position :]
lines = after_cursor.split("\n")
end_of_current_line = lines[0].strip()
suggestion = buffer.suggestion
if (suggestion is not None) and (suggestion.text) and (end_of_current_line == ""):
buffer.insert_text(suggestion.text)
else:
nc.end_of_line(event)
def _deprected_accept_in_vi_insert_mode(event: KeyPressEvent):
"""Accept autosuggestion or jump to end of line.
.. deprecated:: 8.12
Use `accept_or_jump_to_end` instead.
"""
return accept_or_jump_to_end(event)
def accept(event: KeyPressEvent):
"""Accept autosuggestion"""
buffer = event.current_buffer
suggestion = buffer.suggestion
if suggestion:
buffer.insert_text(suggestion.text)
else:
nc.forward_char(event)
def discard(event: KeyPressEvent):
"""Discard autosuggestion"""
buffer = event.current_buffer
buffer.suggestion = None
def accept_word(event: KeyPressEvent):
"""Fill partial autosuggestion by word"""
buffer = event.current_buffer
suggestion = buffer.suggestion
if suggestion:
t = re.split(r"(\S+\s+)", suggestion.text)
buffer.insert_text(next((x for x in t if x), ""))
else:
nc.forward_word(event)
def accept_character(event: KeyPressEvent):
"""Fill partial autosuggestion by character"""
b = event.current_buffer
suggestion = b.suggestion
if suggestion and suggestion.text:
b.insert_text(suggestion.text[0])
def accept_and_keep_cursor(event: KeyPressEvent):
"""Accept autosuggestion and keep cursor in place"""
buffer = event.current_buffer
old_position = buffer.cursor_position
suggestion = buffer.suggestion
if suggestion:
buffer.insert_text(suggestion.text)
buffer.cursor_position = old_position
def accept_and_move_cursor_left(event: KeyPressEvent):
"""Accept autosuggestion and move cursor left in place"""
accept_and_keep_cursor(event)
nc.backward_char(event)
def _update_hint(buffer: Buffer):
if buffer.auto_suggest:
suggestion = buffer.auto_suggest.get_suggestion(buffer, buffer.document)
buffer.suggestion = suggestion
def backspace_and_resume_hint(event: KeyPressEvent):
"""Resume autosuggestions after deleting last character"""
nc.backward_delete_char(event)
_update_hint(event.current_buffer)
def resume_hinting(event: KeyPressEvent):
"""Resume autosuggestions"""
pass_through.reply(event)
# Order matters: if update happened first and event reply second, the
# suggestion would be auto-accepted if both actions are bound to same key.
_update_hint(event.current_buffer)
def up_and_update_hint(event: KeyPressEvent):
"""Go up and update hint"""
current_buffer = event.current_buffer
current_buffer.auto_up(count=event.arg)
_update_hint(current_buffer)
def down_and_update_hint(event: KeyPressEvent):
"""Go down and update hint"""
current_buffer = event.current_buffer
current_buffer.auto_down(count=event.arg)
_update_hint(current_buffer)
def accept_token(event: KeyPressEvent):
"""Fill partial autosuggestion by token"""
b = event.current_buffer
suggestion = b.suggestion
if suggestion:
prefix = _get_query(b.document)
text = prefix + suggestion.text
tokens: List[Optional[str]] = [None, None, None]
substrings = [""]
i = 0
for token in generate_tokens(StringIO(text).readline):
if token.type == tokenize.NEWLINE:
index = len(text)
else:
index = text.index(token[1], len(substrings[-1]))
substrings.append(text[:index])
tokenized_so_far = substrings[-1]
if tokenized_so_far.startswith(prefix):
if i == 0 and len(tokenized_so_far) > len(prefix):
tokens[0] = tokenized_so_far[len(prefix) :]
substrings.append(tokenized_so_far)
i += 1
tokens[i] = token[1]
if i == 2:
break
i += 1
if tokens[0]:
to_insert: str
insert_text = substrings[-2]
if tokens[1] and len(tokens[1]) == 1:
insert_text = substrings[-1]
to_insert = insert_text[len(prefix) :]
b.insert_text(to_insert)
return
nc.forward_word(event)
Provider = Union[AutoSuggestFromHistory, NavigableAutoSuggestFromHistory, None]
def _swap_autosuggestion(
buffer: Buffer,
provider: NavigableAutoSuggestFromHistory,
direction_method: Callable,
):
"""
We skip most recent history entry (in either direction) if it equals the
current autosuggestion because if user cycles when auto-suggestion is shown
they most likely want something else than what was suggested (otherwise
they would have accepted the suggestion).
"""
suggestion = buffer.suggestion
if not suggestion:
return
query = _get_query(buffer.document)
current = query + suggestion.text
direction_method(query=query, other_than=current, history=buffer.history)
new_suggestion = provider.get_suggestion(buffer, buffer.document)
buffer.suggestion = new_suggestion
def swap_autosuggestion_up(event: KeyPressEvent):
"""Get next autosuggestion from history."""
shell = get_ipython()
provider = shell.auto_suggest
if not isinstance(provider, NavigableAutoSuggestFromHistory):
return
return _swap_autosuggestion(
buffer=event.current_buffer, provider=provider, direction_method=provider.up
)
def swap_autosuggestion_down(event: KeyPressEvent):
"""Get previous autosuggestion from history."""
shell = get_ipython()
provider = shell.auto_suggest
if not isinstance(provider, NavigableAutoSuggestFromHistory):
return
return _swap_autosuggestion(
buffer=event.current_buffer,
provider=provider,
direction_method=provider.down,
)
def __getattr__(key):
if key == "accept_in_vi_insert_mode":
warnings.warn(
"`accept_in_vi_insert_mode` is deprecated since IPython 8.12 and "
"renamed to `accept_or_jump_to_end`. Please update your configuration "
"accordingly",
DeprecationWarning,
stacklevel=2,
)
return _deprected_accept_in_vi_insert_mode
raise AttributeError
|