aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/ipython/py3/IPython/core/completerlib.py
diff options
context:
space:
mode:
authornkozlovskiy <nmk@ydb.tech>2023-09-29 12:24:06 +0300
committernkozlovskiy <nmk@ydb.tech>2023-09-29 12:41:34 +0300
commite0e3e1717e3d33762ce61950504f9637a6e669ed (patch)
treebca3ff6939b10ed60c3d5c12439963a1146b9711 /contrib/python/ipython/py3/IPython/core/completerlib.py
parent38f2c5852db84c7b4d83adfcb009eb61541d1ccd (diff)
downloadydb-e0e3e1717e3d33762ce61950504f9637a6e669ed.tar.gz
add ydb deps
Diffstat (limited to 'contrib/python/ipython/py3/IPython/core/completerlib.py')
-rw-r--r--contrib/python/ipython/py3/IPython/core/completerlib.py418
1 files changed, 418 insertions, 0 deletions
diff --git a/contrib/python/ipython/py3/IPython/core/completerlib.py b/contrib/python/ipython/py3/IPython/core/completerlib.py
new file mode 100644
index 0000000000..65efa42254
--- /dev/null
+++ b/contrib/python/ipython/py3/IPython/core/completerlib.py
@@ -0,0 +1,418 @@
+# encoding: utf-8
+"""Implementations for various useful completers.
+
+These are all loaded by default by IPython.
+"""
+#-----------------------------------------------------------------------------
+# Copyright (C) 2010-2011 The IPython Development Team.
+#
+# Distributed under the terms of the BSD License.
+#
+# The full license is in the file COPYING.txt, distributed with this software.
+#-----------------------------------------------------------------------------
+
+#-----------------------------------------------------------------------------
+# Imports
+#-----------------------------------------------------------------------------
+
+# Stdlib imports
+import glob
+import inspect
+import itertools
+import os
+import re
+import sys
+from importlib import import_module
+from importlib.machinery import all_suffixes
+
+
+# Third-party imports
+from time import time
+from zipimport import zipimporter
+
+# Our own imports
+from .completer import expand_user, compress_user
+from .error import TryNext
+from ..utils._process_common import arg_split
+
+# FIXME: this should be pulled in with the right call via the component system
+from IPython import get_ipython
+
+from typing import List
+
+from __res import importer
+
+#-----------------------------------------------------------------------------
+# Globals and constants
+#-----------------------------------------------------------------------------
+_suffixes = all_suffixes()
+
+# Time in seconds after which the rootmodules will be stored permanently in the
+# ipython ip.db database (kept in the user's .ipython dir).
+TIMEOUT_STORAGE = 2
+
+# Time in seconds after which we give up
+TIMEOUT_GIVEUP = 20
+
+# Regular expression for the python import statement
+import_re = re.compile(r'(?P<name>[^\W\d]\w*?)'
+ r'(?P<package>[/\\]__init__)?'
+ r'(?P<suffix>%s)$' %
+ r'|'.join(re.escape(s) for s in _suffixes))
+
+# RE for the ipython %run command (python + ipython scripts)
+magic_run_re = re.compile(r'.*(\.ipy|\.ipynb|\.py[w]?)$')
+
+#-----------------------------------------------------------------------------
+# Local utilities
+#-----------------------------------------------------------------------------
+
+arcadia_rootmodules_cache = None
+arcadia_modules_cache = None
+
+
+def arcadia_init_cache():
+ global arcadia_rootmodules_cache, arcadia_modules_cache
+ arcadia_rootmodules_cache = set()
+ arcadia_modules_cache = {}
+
+ all_modules = itertools.chain(
+ sys.builtin_module_names,
+ importer.memory
+ )
+
+ for name in all_modules:
+ path = name.split('.')
+ arcadia_rootmodules_cache.add(path[0])
+
+ prefix = path[0]
+ for element in path[1:]:
+ if element == '__init__':
+ continue
+
+ arcadia_modules_cache.setdefault(prefix, set()).add(element)
+ prefix += '.' + element
+
+ arcadia_rootmodules_cache = sorted(arcadia_rootmodules_cache)
+ arcadia_modules_cache = {k: sorted(v) for k, v in arcadia_modules_cache.items()}
+
+
+def arcadia_module_list(mod):
+ if arcadia_modules_cache is None:
+ arcadia_init_cache()
+
+ return arcadia_modules_cache.get(mod, ())
+
+
+def arcadia_get_root_modules():
+ if arcadia_rootmodules_cache is None:
+ arcadia_init_cache()
+
+ return arcadia_rootmodules_cache
+
+
+def module_list(path):
+ """
+ Return the list containing the names of the modules available in the given
+ folder.
+ """
+ # sys.path has the cwd as an empty string, but isdir/listdir need it as '.'
+ if path == '':
+ path = '.'
+
+ # A few local constants to be used in loops below
+ pjoin = os.path.join
+
+ if os.path.isdir(path):
+ # Build a list of all files in the directory and all files
+ # in its subdirectories. For performance reasons, do not
+ # recurse more than one level into subdirectories.
+ files = []
+ for root, dirs, nondirs in os.walk(path, followlinks=True):
+ subdir = root[len(path)+1:]
+ if subdir:
+ files.extend(pjoin(subdir, f) for f in nondirs)
+ dirs[:] = [] # Do not recurse into additional subdirectories.
+ else:
+ files.extend(nondirs)
+
+ else:
+ try:
+ files = list(zipimporter(path)._files.keys())
+ except:
+ files = []
+
+ # Build a list of modules which match the import_re regex.
+ modules = []
+ for f in files:
+ m = import_re.match(f)
+ if m:
+ modules.append(m.group('name'))
+ return list(set(modules))
+
+
+def get_root_modules():
+ """
+ Returns a list containing the names of all the modules available in the
+ folders of the pythonpath.
+
+ ip.db['rootmodules_cache'] maps sys.path entries to list of modules.
+ """
+ ip = get_ipython()
+ if ip is None:
+ # No global shell instance to store cached list of modules.
+ # Don't try to scan for modules every time.
+ return list(sys.builtin_module_names)
+
+ rootmodules_cache = ip.db.get('rootmodules_cache', {})
+ rootmodules = list(sys.builtin_module_names)
+ start_time = time()
+ store = False
+ for path in sys.path:
+ try:
+ modules = rootmodules_cache[path]
+ except KeyError:
+ modules = module_list(path)
+ try:
+ modules.remove('__init__')
+ except ValueError:
+ pass
+ if path not in ('', '.'): # cwd modules should not be cached
+ rootmodules_cache[path] = modules
+ if time() - start_time > TIMEOUT_STORAGE and not store:
+ store = True
+ print("\nCaching the list of root modules, please wait!")
+ print("(This will only be done once - type '%rehashx' to "
+ "reset cache!)\n")
+ sys.stdout.flush()
+ if time() - start_time > TIMEOUT_GIVEUP:
+ print("This is taking too long, we give up.\n")
+ return []
+ rootmodules.extend(modules)
+ if store:
+ ip.db['rootmodules_cache'] = rootmodules_cache
+ rootmodules = list(set(rootmodules))
+ return rootmodules
+
+
+def is_importable(module, attr, only_modules):
+ if only_modules:
+ return inspect.ismodule(getattr(module, attr))
+ else:
+ return not(attr[:2] == '__' and attr[-2:] == '__')
+
+def is_possible_submodule(module, attr):
+ try:
+ obj = getattr(module, attr)
+ except AttributeError:
+ # Is possilby an unimported submodule
+ return True
+ except TypeError:
+ # https://github.com/ipython/ipython/issues/9678
+ return False
+ return inspect.ismodule(obj)
+
+
+def try_import(mod: str, only_modules=False) -> List[str]:
+ """
+ Try to import given module and return list of potential completions.
+ """
+ mod = mod.rstrip('.')
+ try:
+ m = import_module(mod)
+ except:
+ return []
+
+ filename = getattr(m, '__file__', '')
+ m_is_init = '__init__' in (filename or '') or filename == mod
+
+ completions = []
+ if (not hasattr(m, '__file__')) or (not only_modules) or m_is_init:
+ completions.extend( [attr for attr in dir(m) if
+ is_importable(m, attr, only_modules)])
+
+ m_all = getattr(m, "__all__", [])
+ if only_modules:
+ completions.extend(attr for attr in m_all if is_possible_submodule(m, attr))
+ else:
+ completions.extend(m_all)
+
+ if m_is_init:
+ completions.extend(arcadia_module_list(mod))
+ completions_set = {c for c in completions if isinstance(c, str)}
+ completions_set.discard('__init__')
+ return sorted(completions_set)
+
+
+#-----------------------------------------------------------------------------
+# Completion-related functions.
+#-----------------------------------------------------------------------------
+
+def quick_completer(cmd, completions):
+ r""" Easily create a trivial completer for a command.
+
+ Takes either a list of completions, or all completions in string (that will
+ be split on whitespace).
+
+ Example::
+
+ [d:\ipython]|1> import ipy_completers
+ [d:\ipython]|2> ipy_completers.quick_completer('foo', ['bar','baz'])
+ [d:\ipython]|3> foo b<TAB>
+ bar baz
+ [d:\ipython]|3> foo ba
+ """
+
+ if isinstance(completions, str):
+ completions = completions.split()
+
+ def do_complete(self, event):
+ return completions
+
+ get_ipython().set_hook('complete_command',do_complete, str_key = cmd)
+
+def module_completion(line):
+ """
+ Returns a list containing the completion possibilities for an import line.
+
+ The line looks like this :
+ 'import xml.d'
+ 'from xml.dom import'
+ """
+
+ words = line.split(' ')
+ nwords = len(words)
+
+ # from whatever <tab> -> 'import '
+ if nwords == 3 and words[0] == 'from':
+ return ['import ']
+
+ # 'from xy<tab>' or 'import xy<tab>'
+ if nwords < 3 and (words[0] in {'%aimport', 'import', 'from'}) :
+ if nwords == 1:
+ return arcadia_get_root_modules()
+ mod = words[1].split('.')
+ if len(mod) < 2:
+ return arcadia_get_root_modules()
+ completion_list = try_import('.'.join(mod[:-1]), True)
+ return ['.'.join(mod[:-1] + [el]) for el in completion_list]
+
+ # 'from xyz import abc<tab>'
+ if nwords >= 3 and words[0] == 'from':
+ mod = words[1]
+ return try_import(mod)
+
+#-----------------------------------------------------------------------------
+# Completers
+#-----------------------------------------------------------------------------
+# These all have the func(self, event) signature to be used as custom
+# completers
+
+def module_completer(self,event):
+ """Give completions after user has typed 'import ...' or 'from ...'"""
+
+ # This works in all versions of python. While 2.5 has
+ # pkgutil.walk_packages(), that particular routine is fairly dangerous,
+ # since it imports *EVERYTHING* on sys.path. That is: a) very slow b) full
+ # of possibly problematic side effects.
+ # This search the folders in the sys.path for available modules.
+
+ return module_completion(event.line)
+
+# FIXME: there's a lot of logic common to the run, cd and builtin file
+# completers, that is currently reimplemented in each.
+
+def magic_run_completer(self, event):
+ """Complete files that end in .py or .ipy or .ipynb for the %run command.
+ """
+ comps = arg_split(event.line, strict=False)
+ # relpath should be the current token that we need to complete.
+ if (len(comps) > 1) and (not event.line.endswith(' ')):
+ relpath = comps[-1].strip("'\"")
+ else:
+ relpath = ''
+
+ #print("\nev=", event) # dbg
+ #print("rp=", relpath) # dbg
+ #print('comps=', comps) # dbg
+
+ lglob = glob.glob
+ isdir = os.path.isdir
+ relpath, tilde_expand, tilde_val = expand_user(relpath)
+
+ # Find if the user has already typed the first filename, after which we
+ # should complete on all files, since after the first one other files may
+ # be arguments to the input script.
+
+ if any(magic_run_re.match(c) for c in comps):
+ matches = [f.replace('\\','/') + ('/' if isdir(f) else '')
+ for f in lglob(relpath+'*')]
+ else:
+ dirs = [f.replace('\\','/') + "/" for f in lglob(relpath+'*') if isdir(f)]
+ pys = [f.replace('\\','/')
+ for f in lglob(relpath+'*.py') + lglob(relpath+'*.ipy') +
+ lglob(relpath+'*.ipynb') + lglob(relpath + '*.pyw')]
+
+ matches = dirs + pys
+
+ #print('run comp:', dirs+pys) # dbg
+ return [compress_user(p, tilde_expand, tilde_val) for p in matches]
+
+
+def cd_completer(self, event):
+ """Completer function for cd, which only returns directories."""
+ ip = get_ipython()
+ relpath = event.symbol
+
+ #print(event) # dbg
+ if event.line.endswith('-b') or ' -b ' in event.line:
+ # return only bookmark completions
+ bkms = self.db.get('bookmarks', None)
+ if bkms:
+ return bkms.keys()
+ else:
+ return []
+
+ if event.symbol == '-':
+ width_dh = str(len(str(len(ip.user_ns['_dh']) + 1)))
+ # jump in directory history by number
+ fmt = '-%0' + width_dh +'d [%s]'
+ ents = [ fmt % (i,s) for i,s in enumerate(ip.user_ns['_dh'])]
+ if len(ents) > 1:
+ return ents
+ return []
+
+ if event.symbol.startswith('--'):
+ return ["--" + os.path.basename(d) for d in ip.user_ns['_dh']]
+
+ # Expand ~ in path and normalize directory separators.
+ relpath, tilde_expand, tilde_val = expand_user(relpath)
+ relpath = relpath.replace('\\','/')
+
+ found = []
+ for d in [f.replace('\\','/') + '/' for f in glob.glob(relpath+'*')
+ if os.path.isdir(f)]:
+ if ' ' in d:
+ # we don't want to deal with any of that, complex code
+ # for this is elsewhere
+ raise TryNext
+
+ found.append(d)
+
+ if not found:
+ if os.path.isdir(relpath):
+ return [compress_user(relpath, tilde_expand, tilde_val)]
+
+ # if no completions so far, try bookmarks
+ bks = self.db.get('bookmarks',{})
+ bkmatches = [s for s in bks if s.startswith(event.symbol)]
+ if bkmatches:
+ return bkmatches
+
+ raise TryNext
+
+ return [compress_user(p, tilde_expand, tilde_val) for p in found]
+
+def reset_completer(self, event):
+ "A completer for %reset magic"
+ return '-f -s in out array dhist'.split()