import re
import asyncio
import tokenize
from io import StringIO
from typing import Callable, List, Optional, Union, Generator, Tuple, ClassVar, Any
import warnings

import prompt_toolkit
from prompt_toolkit.buffer import Buffer
from prompt_toolkit.key_binding import KeyPressEvent
from prompt_toolkit.key_binding.bindings import named_commands as nc
from prompt_toolkit.auto_suggest import AutoSuggestFromHistory, Suggestion, AutoSuggest
from prompt_toolkit.document import Document
from prompt_toolkit.history import History
from prompt_toolkit.shortcuts import PromptSession
from prompt_toolkit.layout.processors import (
    Processor,
    Transformation,
    TransformationInput,
)

from IPython.core.getipython import get_ipython
from IPython.utils.tokenutil import generate_tokens

from .filters import pass_through

try:
    import jupyter_ai_magics
    import jupyter_ai.completions.models as jai_models
except ModuleNotFoundError:
    jai_models = None


def _get_query(document: Document):
    return document.lines[document.cursor_position_row]


class AppendAutoSuggestionInAnyLine(Processor):
    """
    Append the auto suggestion to lines other than the last (appending to the
    last line is natively supported by the prompt toolkit).

    This has a private `_debug` attribute that can be set to True to display
    debug information as virtual suggestion on the end of any line. You can do
    so with:

        >>> from IPython.terminal.shortcuts.auto_suggest import AppendAutoSuggestionInAnyLine
        >>> AppendAutoSuggestionInAnyLine._debug = True

    """

    _debug: ClassVar[bool] = False

    def __init__(self, style: str = "class:auto-suggestion") -> None:
        self.style = style

    def apply_transformation(self, ti: TransformationInput) -> Transformation:
        """
         Apply transformation to the line that is currently being edited.

         This is a variation of the original implementation in prompt toolkit
         that allows to not only append suggestions to any line, but also to show
         multi-line suggestions.

         As transformation are applied on a line-by-line basis; we need to trick
         a bit, and elide any line that is after the line we are currently
         editing, until we run out of completions. We cannot shift the existing
         lines

         There are multiple cases to handle:

         The completions ends before the end of the buffer:
             We can resume showing the normal line, and say that some code may
             be hidden.

        The completions ends at the end of the buffer
             We can just say that some code may be hidden.

         And separately:

         The completions ends beyond the end of the buffer
             We need to both say that some code may be hidden, and that some
             lines are not shown.

        """
        last_line_number = ti.document.line_count - 1
        is_last_line = ti.lineno == last_line_number

        noop = lambda text: Transformation(
            fragments=ti.fragments + [(self.style, " " + text if self._debug else "")]
        )
        if ti.document.line_count == 1:
            return noop("noop:oneline")
        if ti.document.cursor_position_row == last_line_number and is_last_line:
            # prompt toolkit already appends something; just leave it be
            return noop("noop:last line and cursor")

        # first everything before the current line is unchanged.
        if ti.lineno < ti.document.cursor_position_row:
            return noop("noop:before cursor")

        buffer = ti.buffer_control.buffer
        if not buffer.suggestion or not ti.document.is_cursor_at_the_end_of_line:
            return noop("noop:not eol")

        delta = ti.lineno - ti.document.cursor_position_row
        suggestions = buffer.suggestion.text.splitlines()

        if len(suggestions) == 0:
            return noop("noop: no suggestions")

        suggestions_longer_than_buffer: bool = (
            len(suggestions) + ti.document.cursor_position_row > ti.document.line_count
        )

        if len(suggestions) >= 1 and prompt_toolkit.VERSION < (3, 0, 49):
            if ti.lineno == ti.document.cursor_position_row:
                return Transformation(
                    fragments=ti.fragments
                    + [
                        (
                            "red",
                            "(Cannot show multiline suggestion; requires prompt_toolkit > 3.0.49)",
                        )
                    ]
                )
            else:
                return Transformation(fragments=ti.fragments)
        if delta == 0:
            suggestion = suggestions[0]
            return Transformation(fragments=ti.fragments + [(self.style, suggestion)])
        if is_last_line:
            if delta < len(suggestions):
                extra = f"; {len(suggestions) - delta} line(s) hidden"
                suggestion = f"… rest of suggestion ({len(suggestions) - delta} lines) and code hidden"
                return Transformation([(self.style, suggestion)])

            n_elided = len(suggestions)
            for i in range(len(suggestions)):
                ll = ti.get_line(last_line_number - i)
                el = "".join(l[1] for l in ll).strip()
                if el:
                    break
                else:
                    n_elided -= 1
            if n_elided:
                return Transformation([(self.style, f"… {n_elided} line(s) hidden")])
            else:
                return Transformation(
                    ti.get_line(last_line_number - len(suggestions) + 1)
                    + ([(self.style, "shift-last-line")] if self._debug else [])
                )

        elif delta < len(suggestions):
            suggestion = suggestions[delta]
            return Transformation([(self.style, suggestion)])
        else:
            shift = ti.lineno - len(suggestions) + 1
            return Transformation(ti.get_line(shift))


class NavigableAutoSuggestFromHistory(AutoSuggestFromHistory):
    """
    A subclass of AutoSuggestFromHistory that allow navigation to next/previous
    suggestion from history. To do so it remembers the current position, but it
    state need to carefully be cleared on the right events.
    """

    skip_lines: int
    _connected_apps: list[PromptSession]

    # handle to the currently running llm task that appends suggestions to the
    # current buffer; we keep a handle to it in order to cancell it when there is a cursor movement, or
    # another request.
    _llm_task: asyncio.Task | None = None

    # This is the instance of the LLM provider from jupyter-ai to which we forward the request
    # to generate inline completions.
    _llm_provider: Any | None

    def __init__(self):
        super().__init__()
        self.skip_lines = 0
        self._connected_apps = []
        self._llm_provider = None

    def reset_history_position(self, _: Buffer):
        self.skip_lines = 0

    def disconnect(self) -> None:
        self._cancel_running_llm_task()
        for pt_app in self._connected_apps:
            text_insert_event = pt_app.default_buffer.on_text_insert
            text_insert_event.remove_handler(self.reset_history_position)

    def connect(self, pt_app: PromptSession):
        self._connected_apps.append(pt_app)
        # note: `on_text_changed` could be used for a bit different behaviour
        # on character deletion (i.e. resetting history position on backspace)
        pt_app.default_buffer.on_text_insert.add_handler(self.reset_history_position)
        pt_app.default_buffer.on_cursor_position_changed.add_handler(self._dismiss)

    def get_suggestion(
        self, buffer: Buffer, document: Document
    ) -> Optional[Suggestion]:
        text = _get_query(document)

        if text.strip():
            for suggestion, _ in self._find_next_match(
                text, self.skip_lines, buffer.history
            ):
                return Suggestion(suggestion)

        return None

    def _dismiss(self, buffer, *args, **kwargs) -> None:
        self._cancel_running_llm_task()
        buffer.suggestion = None

    def _find_match(
        self, text: str, skip_lines: float, history: History, previous: bool
    ) -> Generator[Tuple[str, float], None, None]:
        """
        text : str
            Text content to find a match for, the user cursor is most of the
            time at the end of this text.
        skip_lines : float
            number of items to skip in the search, this is used to indicate how
            far in the list the user has navigated by pressing up or down.
            The float type is used as the base value is +inf
        history : History
            prompt_toolkit History instance to fetch previous entries from.
        previous : bool
            Direction of the search, whether we are looking previous match
            (True), or next match (False).

        Yields
        ------
        Tuple with:
        str:
            current suggestion.
        float:
            will actually yield only ints, which is passed back via skip_lines,
            which may be a +inf (float)


        """
        line_number = -1
        for string in reversed(list(history.get_strings())):
            for line in reversed(string.splitlines()):
                line_number += 1
                if not previous and line_number < skip_lines:
                    continue
                # do not return empty suggestions as these
                # close the auto-suggestion overlay (and are useless)
                if line.startswith(text) and len(line) > len(text):
                    yield line[len(text) :], line_number
                if previous and line_number >= skip_lines:
                    return

    def _find_next_match(
        self, text: str, skip_lines: float, history: History
    ) -> Generator[Tuple[str, float], None, None]:
        return self._find_match(text, skip_lines, history, previous=False)

    def _find_previous_match(self, text: str, skip_lines: float, history: History):
        return reversed(
            list(self._find_match(text, skip_lines, history, previous=True))
        )

    def up(self, query: str, other_than: str, history: History) -> None:
        self._cancel_running_llm_task()
        for suggestion, line_number in self._find_next_match(
            query, self.skip_lines, history
        ):
            # if user has history ['very.a', 'very', 'very.b'] and typed 'very'
            # we want to switch from 'very.b' to 'very.a' because a) if the
            # suggestion equals current text, prompt-toolkit aborts suggesting
            # b) user likely would not be interested in 'very' anyways (they
            # already typed it).
            if query + suggestion != other_than:
                self.skip_lines = line_number
                break
        else:
            # no matches found, cycle back to beginning
            self.skip_lines = 0

    def down(self, query: str, other_than: str, history: History) -> None:
        self._cancel_running_llm_task()
        for suggestion, line_number in self._find_previous_match(
            query, self.skip_lines, history
        ):
            if query + suggestion != other_than:
                self.skip_lines = line_number
                break
        else:
            # no matches found, cycle to end
            for suggestion, line_number in self._find_previous_match(
                query, float("Inf"), history
            ):
                if query + suggestion != other_than:
                    self.skip_lines = line_number
                    break

    def _cancel_running_llm_task(self) -> None:
        """
        Try to cancell the currently running llm_task if exists, and set it to None.
        """
        if self._llm_task is not None:
            if self._llm_task.done():
                self._llm_task = None
                return
            cancelled = self._llm_task.cancel()
            if cancelled:
                self._llm_task = None
            if not cancelled:
                warnings.warn(
                    "LLM task not cancelled, does your provider support cancellation?"
                )

    async def _trigger_llm(self, buffer) -> None:
        """
        This will ask the current llm provider a suggestion for the current buffer.

        If there is a currently running llm task, it will cancel it.
        """
        # we likely want to store the current cursor position, and cancel if the cursor has moved.
        if not self._llm_provider:
            warnings.warn("No LLM provider found, cannot trigger LLM completions")
            return
        if jai_models is None:
            warnings.warn(
                "LLM Completion requires `jupyter_ai_magics` and `jupyter_ai` to be installed"
            )

        self._cancel_running_llm_task()

        async def error_catcher(buffer):
            """
            This catches and log any errors, as otherwise this is just
            lost in the void of the future running task.
            """
            try:
                await self._trigger_llm_core(buffer)
            except Exception as e:
                get_ipython().log.error("error")
                raise

        # here we need a cancellable task so we can't just await the error catched
        self._llm_task = asyncio.create_task(error_catcher(buffer))
        await self._llm_task

    async def _trigger_llm_core(self, buffer: Buffer):
        """
        This is the core of the current llm request.

        Here we build a compatible `InlineCompletionRequest` and ask the llm
        provider to stream it's response back to us iteratively setting it as
        the suggestion on the current buffer.

        Unlike with JupyterAi, as we do not have multiple cell, the cell number
        is always set to `0`, note that we _could_ set it to a new number each
        time and ignore threply from past numbers.

        We set the prefix to the current cell content, but could also inset the
        rest of the history or even just the non-fail history.

        In the same way, we do not have cell id.

        LLM provider may return multiple suggestion stream, but for the time
        being we only support one.

        Here we make the assumption that the provider will have
        stream_inline_completions, I'm not sure it is the case for all
        providers.
        """

        request = jai_models.InlineCompletionRequest(
            number=0,
            prefix=buffer.document.text,
            suffix="",
            mime="text/x-python",
            stream=True,
            path=None,
            language="python",
            cell_id=None,
        )

        async for reply_and_chunks in self._llm_provider.stream_inline_completions(
            request
        ):
            if isinstance(reply_and_chunks, jai_models.InlineCompletionReply):
                if len(reply_and_chunks.list.items) > 1:
                    raise ValueError(
                        "Terminal IPython cannot deal with multiple LLM suggestions at once"
                    )
                buffer.suggestion = Suggestion(
                    reply_and_chunks.list.items[0].insertText
                )
                buffer.on_suggestion_set.fire()
            elif isinstance(reply_and_chunks, jai_models.InlineCompletionStreamChunk):
                buffer.suggestion = Suggestion(reply_and_chunks.response.insertText)
                buffer.on_suggestion_set.fire()
        return


_MIN_LINES = 5


async def llm_autosuggestion(event: KeyPressEvent):
    """
    Ask the AutoSuggester from history to delegate to ask an LLM for completion

    This will first make sure that the current buffer have _MIN_LINES (7)
    available lines to insert the LLM completion

    Provisional as of 8.32, may change without warnigns

    """
    provider = get_ipython().auto_suggest
    if not isinstance(provider, NavigableAutoSuggestFromHistory):
        return
    doc = event.current_buffer.document
    lines_to_insert = max(0, _MIN_LINES - doc.line_count + doc.cursor_position_row)
    for _ in range(lines_to_insert):
        event.current_buffer.insert_text("\n", move_cursor=False)

    await provider._trigger_llm(event.current_buffer)


def accept_or_jump_to_end(event: KeyPressEvent):
    """Apply autosuggestion or jump to end of line."""
    buffer = event.current_buffer
    d = buffer.document
    after_cursor = d.text[d.cursor_position :]
    lines = after_cursor.split("\n")
    end_of_current_line = lines[0].strip()
    suggestion = buffer.suggestion
    if (suggestion is not None) and (suggestion.text) and (end_of_current_line == ""):
        buffer.insert_text(suggestion.text)
    else:
        nc.end_of_line(event)


def _deprected_accept_in_vi_insert_mode(event: KeyPressEvent):
    """Accept autosuggestion or jump to end of line.

    .. deprecated:: 8.12
        Use `accept_or_jump_to_end` instead.
    """
    return accept_or_jump_to_end(event)


def accept(event: KeyPressEvent):
    """Accept autosuggestion"""
    buffer = event.current_buffer
    suggestion = buffer.suggestion
    if suggestion:
        buffer.insert_text(suggestion.text)
    else:
        nc.forward_char(event)


def discard(event: KeyPressEvent):
    """Discard autosuggestion"""
    buffer = event.current_buffer
    buffer.suggestion = None


def accept_word(event: KeyPressEvent):
    """Fill partial autosuggestion by word"""
    buffer = event.current_buffer
    suggestion = buffer.suggestion
    if suggestion:
        t = re.split(r"(\S+\s+)", suggestion.text)
        buffer.insert_text(next((x for x in t if x), ""))
    else:
        nc.forward_word(event)


def accept_character(event: KeyPressEvent):
    """Fill partial autosuggestion by character"""
    b = event.current_buffer
    suggestion = b.suggestion
    if suggestion and suggestion.text:
        b.insert_text(suggestion.text[0])


def accept_and_keep_cursor(event: KeyPressEvent):
    """Accept autosuggestion and keep cursor in place"""
    buffer = event.current_buffer
    old_position = buffer.cursor_position
    suggestion = buffer.suggestion
    if suggestion:
        buffer.insert_text(suggestion.text)
        buffer.cursor_position = old_position


def accept_and_move_cursor_left(event: KeyPressEvent):
    """Accept autosuggestion and move cursor left in place"""
    accept_and_keep_cursor(event)
    nc.backward_char(event)


def _update_hint(buffer: Buffer):
    if buffer.auto_suggest:
        suggestion = buffer.auto_suggest.get_suggestion(buffer, buffer.document)
        buffer.suggestion = suggestion


def backspace_and_resume_hint(event: KeyPressEvent):
    """Resume autosuggestions after deleting last character"""
    nc.backward_delete_char(event)
    _update_hint(event.current_buffer)


def resume_hinting(event: KeyPressEvent):
    """Resume autosuggestions"""
    pass_through.reply(event)
    # Order matters: if update happened first and event reply second, the
    # suggestion would be auto-accepted if both actions are bound to same key.
    _update_hint(event.current_buffer)


def up_and_update_hint(event: KeyPressEvent):
    """Go up and update hint"""
    current_buffer = event.current_buffer

    current_buffer.auto_up(count=event.arg)
    _update_hint(current_buffer)


def down_and_update_hint(event: KeyPressEvent):
    """Go down and update hint"""
    current_buffer = event.current_buffer

    current_buffer.auto_down(count=event.arg)
    _update_hint(current_buffer)


def accept_token(event: KeyPressEvent):
    """Fill partial autosuggestion by token"""
    b = event.current_buffer
    suggestion = b.suggestion

    if suggestion:
        prefix = _get_query(b.document)
        text = prefix + suggestion.text

        tokens: List[Optional[str]] = [None, None, None]
        substrings = [""]
        i = 0

        for token in generate_tokens(StringIO(text).readline):
            if token.type == tokenize.NEWLINE:
                index = len(text)
            else:
                index = text.index(token[1], len(substrings[-1]))
            substrings.append(text[:index])
            tokenized_so_far = substrings[-1]
            if tokenized_so_far.startswith(prefix):
                if i == 0 and len(tokenized_so_far) > len(prefix):
                    tokens[0] = tokenized_so_far[len(prefix) :]
                    substrings.append(tokenized_so_far)
                    i += 1
                tokens[i] = token[1]
                if i == 2:
                    break
                i += 1

        if tokens[0]:
            to_insert: str
            insert_text = substrings[-2]
            if tokens[1] and len(tokens[1]) == 1:
                insert_text = substrings[-1]
            to_insert = insert_text[len(prefix) :]
            b.insert_text(to_insert)
            return

    nc.forward_word(event)


Provider = Union[AutoSuggestFromHistory, NavigableAutoSuggestFromHistory, None]


def _swap_autosuggestion(
    buffer: Buffer,
    provider: NavigableAutoSuggestFromHistory,
    direction_method: Callable,
):
    """
    We skip most recent history entry (in either direction) if it equals the
    current autosuggestion because if user cycles when auto-suggestion is shown
    they most likely want something else than what was suggested (otherwise
    they would have accepted the suggestion).
    """
    suggestion = buffer.suggestion
    if not suggestion:
        return

    query = _get_query(buffer.document)
    current = query + suggestion.text

    direction_method(query=query, other_than=current, history=buffer.history)

    new_suggestion = provider.get_suggestion(buffer, buffer.document)
    buffer.suggestion = new_suggestion


def swap_autosuggestion_up(event: KeyPressEvent):
    """Get next autosuggestion from history."""
    shell = get_ipython()
    provider = shell.auto_suggest

    if not isinstance(provider, NavigableAutoSuggestFromHistory):
        return

    return _swap_autosuggestion(
        buffer=event.current_buffer, provider=provider, direction_method=provider.up
    )


def swap_autosuggestion_down(event: KeyPressEvent):
    """Get previous autosuggestion from history."""
    shell = get_ipython()
    provider = shell.auto_suggest

    if not isinstance(provider, NavigableAutoSuggestFromHistory):
        return

    return _swap_autosuggestion(
        buffer=event.current_buffer,
        provider=provider,
        direction_method=provider.down,
    )


def __getattr__(key):
    if key == "accept_in_vi_insert_mode":
        warnings.warn(
            "`accept_in_vi_insert_mode` is deprecated since IPython 8.12 and "
            "renamed to `accept_or_jump_to_end`. Please update your configuration "
            "accordingly",
            DeprecationWarning,
            stacklevel=2,
        )
        return _deprected_accept_in_vi_insert_mode
    raise AttributeError