aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/ipython/py3/IPython/core/displayhook.py
diff options
context:
space:
mode:
authornkozlovskiy <nmk@ydb.tech>2023-09-29 12:24:06 +0300
committernkozlovskiy <nmk@ydb.tech>2023-09-29 12:41:34 +0300
commite0e3e1717e3d33762ce61950504f9637a6e669ed (patch)
treebca3ff6939b10ed60c3d5c12439963a1146b9711 /contrib/python/ipython/py3/IPython/core/displayhook.py
parent38f2c5852db84c7b4d83adfcb009eb61541d1ccd (diff)
downloadydb-e0e3e1717e3d33762ce61950504f9637a6e669ed.tar.gz
add ydb deps
Diffstat (limited to 'contrib/python/ipython/py3/IPython/core/displayhook.py')
-rw-r--r--contrib/python/ipython/py3/IPython/core/displayhook.py331
1 files changed, 331 insertions, 0 deletions
diff --git a/contrib/python/ipython/py3/IPython/core/displayhook.py b/contrib/python/ipython/py3/IPython/core/displayhook.py
new file mode 100644
index 0000000000..aba4f904d8
--- /dev/null
+++ b/contrib/python/ipython/py3/IPython/core/displayhook.py
@@ -0,0 +1,331 @@
+# -*- coding: utf-8 -*-
+"""Displayhook for IPython.
+
+This defines a callable class that IPython uses for `sys.displayhook`.
+"""
+
+# Copyright (c) IPython Development Team.
+# Distributed under the terms of the Modified BSD License.
+
+import builtins as builtin_mod
+import sys
+import io as _io
+import tokenize
+
+from traitlets.config.configurable import Configurable
+from traitlets import Instance, Float
+from warnings import warn
+
+# TODO: Move the various attributes (cache_size, [others now moved]). Some
+# of these are also attributes of InteractiveShell. They should be on ONE object
+# only and the other objects should ask that one object for their values.
+
+class DisplayHook(Configurable):
+ """The custom IPython displayhook to replace sys.displayhook.
+
+ This class does many things, but the basic idea is that it is a callable
+ that gets called anytime user code returns a value.
+ """
+
+ shell = Instance('IPython.core.interactiveshell.InteractiveShellABC',
+ allow_none=True)
+ exec_result = Instance('IPython.core.interactiveshell.ExecutionResult',
+ allow_none=True)
+ cull_fraction = Float(0.2)
+
+ def __init__(self, shell=None, cache_size=1000, **kwargs):
+ super(DisplayHook, self).__init__(shell=shell, **kwargs)
+ cache_size_min = 3
+ if cache_size <= 0:
+ self.do_full_cache = 0
+ cache_size = 0
+ elif cache_size < cache_size_min:
+ self.do_full_cache = 0
+ cache_size = 0
+ warn('caching was disabled (min value for cache size is %s).' %
+ cache_size_min,stacklevel=3)
+ else:
+ self.do_full_cache = 1
+
+ self.cache_size = cache_size
+
+ # we need a reference to the user-level namespace
+ self.shell = shell
+
+ self._,self.__,self.___ = '','',''
+
+ # these are deliberately global:
+ to_user_ns = {'_':self._,'__':self.__,'___':self.___}
+ self.shell.user_ns.update(to_user_ns)
+
+ @property
+ def prompt_count(self):
+ return self.shell.execution_count
+
+ #-------------------------------------------------------------------------
+ # Methods used in __call__. Override these methods to modify the behavior
+ # of the displayhook.
+ #-------------------------------------------------------------------------
+
+ def check_for_underscore(self):
+ """Check if the user has set the '_' variable by hand."""
+ # If something injected a '_' variable in __builtin__, delete
+ # ipython's automatic one so we don't clobber that. gettext() in
+ # particular uses _, so we need to stay away from it.
+ if '_' in builtin_mod.__dict__:
+ try:
+ user_value = self.shell.user_ns['_']
+ if user_value is not self._:
+ return
+ del self.shell.user_ns['_']
+ except KeyError:
+ pass
+
+ def quiet(self):
+ """Should we silence the display hook because of ';'?"""
+ # do not print output if input ends in ';'
+
+ try:
+ cell = self.shell.history_manager.input_hist_parsed[-1]
+ except IndexError:
+ # some uses of ipshellembed may fail here
+ return False
+
+ return self.semicolon_at_end_of_expression(cell)
+
+ @staticmethod
+ def semicolon_at_end_of_expression(expression):
+ """Parse Python expression and detects whether last token is ';'"""
+
+ sio = _io.StringIO(expression)
+ tokens = list(tokenize.generate_tokens(sio.readline))
+
+ for token in reversed(tokens):
+ if token[0] in (tokenize.ENDMARKER, tokenize.NL, tokenize.NEWLINE, tokenize.COMMENT):
+ continue
+ if (token[0] == tokenize.OP) and (token[1] == ';'):
+ return True
+ else:
+ return False
+
+ def start_displayhook(self):
+ """Start the displayhook, initializing resources."""
+ pass
+
+ def write_output_prompt(self):
+ """Write the output prompt.
+
+ The default implementation simply writes the prompt to
+ ``sys.stdout``.
+ """
+ # Use write, not print which adds an extra space.
+ sys.stdout.write(self.shell.separate_out)
+ outprompt = 'Out[{}]: '.format(self.shell.execution_count)
+ if self.do_full_cache:
+ sys.stdout.write(outprompt)
+
+ def compute_format_data(self, result):
+ """Compute format data of the object to be displayed.
+
+ The format data is a generalization of the :func:`repr` of an object.
+ In the default implementation the format data is a :class:`dict` of
+ key value pair where the keys are valid MIME types and the values
+ are JSON'able data structure containing the raw data for that MIME
+ type. It is up to frontends to determine pick a MIME to to use and
+ display that data in an appropriate manner.
+
+ This method only computes the format data for the object and should
+ NOT actually print or write that to a stream.
+
+ Parameters
+ ----------
+ result : object
+ The Python object passed to the display hook, whose format will be
+ computed.
+
+ Returns
+ -------
+ (format_dict, md_dict) : dict
+ format_dict is a :class:`dict` whose keys are valid MIME types and values are
+ JSON'able raw data for that MIME type. It is recommended that
+ all return values of this should always include the "text/plain"
+ MIME type representation of the object.
+ md_dict is a :class:`dict` with the same MIME type keys
+ of metadata associated with each output.
+
+ """
+ return self.shell.display_formatter.format(result)
+
+ # This can be set to True by the write_output_prompt method in a subclass
+ prompt_end_newline = False
+
+ def write_format_data(self, format_dict, md_dict=None) -> None:
+ """Write the format data dict to the frontend.
+
+ This default version of this method simply writes the plain text
+ representation of the object to ``sys.stdout``. Subclasses should
+ override this method to send the entire `format_dict` to the
+ frontends.
+
+ Parameters
+ ----------
+ format_dict : dict
+ The format dict for the object passed to `sys.displayhook`.
+ md_dict : dict (optional)
+ The metadata dict to be associated with the display data.
+ """
+ if 'text/plain' not in format_dict:
+ # nothing to do
+ return
+ # We want to print because we want to always make sure we have a
+ # newline, even if all the prompt separators are ''. This is the
+ # standard IPython behavior.
+ result_repr = format_dict['text/plain']
+ if '\n' in result_repr:
+ # So that multi-line strings line up with the left column of
+ # the screen, instead of having the output prompt mess up
+ # their first line.
+ # We use the prompt template instead of the expanded prompt
+ # because the expansion may add ANSI escapes that will interfere
+ # with our ability to determine whether or not we should add
+ # a newline.
+ if not self.prompt_end_newline:
+ # But avoid extraneous empty lines.
+ result_repr = '\n' + result_repr
+
+ try:
+ print(result_repr)
+ except UnicodeEncodeError:
+ # If a character is not supported by the terminal encoding replace
+ # it with its \u or \x representation
+ print(result_repr.encode(sys.stdout.encoding,'backslashreplace').decode(sys.stdout.encoding))
+
+ def update_user_ns(self, result):
+ """Update user_ns with various things like _, __, _1, etc."""
+
+ # Avoid recursive reference when displaying _oh/Out
+ if self.cache_size and result is not self.shell.user_ns['_oh']:
+ if len(self.shell.user_ns['_oh']) >= self.cache_size and self.do_full_cache:
+ self.cull_cache()
+
+ # Don't overwrite '_' and friends if '_' is in __builtin__
+ # (otherwise we cause buggy behavior for things like gettext). and
+ # do not overwrite _, __ or ___ if one of these has been assigned
+ # by the user.
+ update_unders = True
+ for unders in ['_'*i for i in range(1,4)]:
+ if not unders in self.shell.user_ns:
+ continue
+ if getattr(self, unders) is not self.shell.user_ns.get(unders):
+ update_unders = False
+
+ self.___ = self.__
+ self.__ = self._
+ self._ = result
+
+ if ('_' not in builtin_mod.__dict__) and (update_unders):
+ self.shell.push({'_':self._,
+ '__':self.__,
+ '___':self.___}, interactive=False)
+
+ # hackish access to top-level namespace to create _1,_2... dynamically
+ to_main = {}
+ if self.do_full_cache:
+ new_result = '_%s' % self.prompt_count
+ to_main[new_result] = result
+ self.shell.push(to_main, interactive=False)
+ self.shell.user_ns['_oh'][self.prompt_count] = result
+
+ def fill_exec_result(self, result):
+ if self.exec_result is not None:
+ self.exec_result.result = result
+
+ def log_output(self, format_dict):
+ """Log the output."""
+ if 'text/plain' not in format_dict:
+ # nothing to do
+ return
+ if self.shell.logger.log_output:
+ self.shell.logger.log_write(format_dict['text/plain'], 'output')
+ self.shell.history_manager.output_hist_reprs[self.prompt_count] = \
+ format_dict['text/plain']
+
+ def finish_displayhook(self):
+ """Finish up all displayhook activities."""
+ sys.stdout.write(self.shell.separate_out2)
+ sys.stdout.flush()
+
+ def __call__(self, result=None):
+ """Printing with history cache management.
+
+ This is invoked every time the interpreter needs to print, and is
+ activated by setting the variable sys.displayhook to it.
+ """
+ self.check_for_underscore()
+ if result is not None and not self.quiet():
+ self.start_displayhook()
+ self.write_output_prompt()
+ format_dict, md_dict = self.compute_format_data(result)
+ self.update_user_ns(result)
+ self.fill_exec_result(result)
+ if format_dict:
+ self.write_format_data(format_dict, md_dict)
+ self.log_output(format_dict)
+ self.finish_displayhook()
+
+ def cull_cache(self):
+ """Output cache is full, cull the oldest entries"""
+ oh = self.shell.user_ns.get('_oh', {})
+ sz = len(oh)
+ cull_count = max(int(sz * self.cull_fraction), 2)
+ warn('Output cache limit (currently {sz} entries) hit.\n'
+ 'Flushing oldest {cull_count} entries.'.format(sz=sz, cull_count=cull_count))
+
+ for i, n in enumerate(sorted(oh)):
+ if i >= cull_count:
+ break
+ self.shell.user_ns.pop('_%i' % n, None)
+ oh.pop(n, None)
+
+
+ def flush(self):
+ if not self.do_full_cache:
+ raise ValueError("You shouldn't have reached the cache flush "
+ "if full caching is not enabled!")
+ # delete auto-generated vars from global namespace
+
+ for n in range(1,self.prompt_count + 1):
+ key = '_'+repr(n)
+ try:
+ del self.shell.user_ns[key]
+ except: pass
+ # In some embedded circumstances, the user_ns doesn't have the
+ # '_oh' key set up.
+ oh = self.shell.user_ns.get('_oh', None)
+ if oh is not None:
+ oh.clear()
+
+ # Release our own references to objects:
+ self._, self.__, self.___ = '', '', ''
+
+ if '_' not in builtin_mod.__dict__:
+ self.shell.user_ns.update({'_':self._,'__':self.__,'___':self.___})
+ import gc
+ # TODO: Is this really needed?
+ # IronPython blocks here forever
+ if sys.platform != "cli":
+ gc.collect()
+
+
+class CapturingDisplayHook(object):
+ def __init__(self, shell, outputs=None):
+ self.shell = shell
+ if outputs is None:
+ outputs = []
+ self.outputs = outputs
+
+ def __call__(self, result=None):
+ if result is None:
+ return
+ format_dict, md_dict = self.shell.display_formatter.format(result)
+ self.outputs.append({ 'data': format_dict, 'metadata': md_dict })