aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/ipython/py2/IPython/lib/backgroundjobs.py
diff options
context:
space:
mode:
authorMikhail Borisov <borisov.mikhail@gmail.com>2022-02-10 16:45:39 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:39 +0300
commita6a92afe03e02795227d2641b49819b687f088f8 (patch)
treef6984a1d27d5a7ec88a6fdd6e20cd5b7693b6ece /contrib/python/ipython/py2/IPython/lib/backgroundjobs.py
parentc6dc8b8bd530985bc4cce0137e9a5de32f1087cb (diff)
downloadydb-a6a92afe03e02795227d2641b49819b687f088f8.tar.gz
Restoring authorship annotation for Mikhail Borisov <borisov.mikhail@gmail.com>. Commit 1 of 2.
Diffstat (limited to 'contrib/python/ipython/py2/IPython/lib/backgroundjobs.py')
-rw-r--r--contrib/python/ipython/py2/IPython/lib/backgroundjobs.py980
1 files changed, 490 insertions, 490 deletions
diff --git a/contrib/python/ipython/py2/IPython/lib/backgroundjobs.py b/contrib/python/ipython/py2/IPython/lib/backgroundjobs.py
index b724126bbb..1acfe7df1e 100644
--- a/contrib/python/ipython/py2/IPython/lib/backgroundjobs.py
+++ b/contrib/python/ipython/py2/IPython/lib/backgroundjobs.py
@@ -1,491 +1,491 @@
-# -*- coding: utf-8 -*-
-"""Manage background (threaded) jobs conveniently from an interactive shell.
-
-This module provides a BackgroundJobManager class. This is the main class
-meant for public usage, it implements an object which can create and manage
-new background jobs.
-
-It also provides the actual job classes managed by these BackgroundJobManager
-objects, see their docstrings below.
-
-
-This system was inspired by discussions with B. Granger and the
-BackgroundCommand class described in the book Python Scripting for
-Computational Science, by H. P. Langtangen:
-
-http://folk.uio.no/hpl/scripting
-
-(although ultimately no code from this text was used, as IPython's system is a
-separate implementation).
-
-An example notebook is provided in our documentation illustrating interactive
-use of the system.
-"""
-from __future__ import print_function
-
-#*****************************************************************************
-# Copyright (C) 2005-2006 Fernando Perez <fperez@colorado.edu>
-#
-# Distributed under the terms of the BSD License. The full license is in
-# the file COPYING, distributed as part of this software.
-#*****************************************************************************
-
-# Code begins
-import sys
-import threading
-
-from IPython import get_ipython
-from IPython.core.ultratb import AutoFormattedTB
+# -*- coding: utf-8 -*-
+"""Manage background (threaded) jobs conveniently from an interactive shell.
+
+This module provides a BackgroundJobManager class. This is the main class
+meant for public usage, it implements an object which can create and manage
+new background jobs.
+
+It also provides the actual job classes managed by these BackgroundJobManager
+objects, see their docstrings below.
+
+
+This system was inspired by discussions with B. Granger and the
+BackgroundCommand class described in the book Python Scripting for
+Computational Science, by H. P. Langtangen:
+
+http://folk.uio.no/hpl/scripting
+
+(although ultimately no code from this text was used, as IPython's system is a
+separate implementation).
+
+An example notebook is provided in our documentation illustrating interactive
+use of the system.
+"""
+from __future__ import print_function
+
+#*****************************************************************************
+# Copyright (C) 2005-2006 Fernando Perez <fperez@colorado.edu>
+#
+# Distributed under the terms of the BSD License. The full license is in
+# the file COPYING, distributed as part of this software.
+#*****************************************************************************
+
+# Code begins
+import sys
+import threading
+
+from IPython import get_ipython
+from IPython.core.ultratb import AutoFormattedTB
from logging import error
-from IPython.utils.py3compat import string_types
-
-
-class BackgroundJobManager(object):
- """Class to manage a pool of backgrounded threaded jobs.
-
- Below, we assume that 'jobs' is a BackgroundJobManager instance.
-
- Usage summary (see the method docstrings for details):
-
- jobs.new(...) -> start a new job
-
- jobs() or jobs.status() -> print status summary of all jobs
-
- jobs[N] -> returns job number N.
-
- foo = jobs[N].result -> assign to variable foo the result of job N
-
- jobs[N].traceback() -> print the traceback of dead job N
-
- jobs.remove(N) -> remove (finished) job N
-
- jobs.flush() -> remove all finished jobs
-
- As a convenience feature, BackgroundJobManager instances provide the
- utility result and traceback methods which retrieve the corresponding
- information from the jobs list:
-
- jobs.result(N) <--> jobs[N].result
- jobs.traceback(N) <--> jobs[N].traceback()
-
- While this appears minor, it allows you to use tab completion
- interactively on the job manager instance.
- """
-
- def __init__(self):
- # Lists for job management, accessed via a property to ensure they're
- # up to date.x
- self._running = []
- self._completed = []
- self._dead = []
- # A dict of all jobs, so users can easily access any of them
- self.all = {}
- # For reporting
- self._comp_report = []
- self._dead_report = []
- # Store status codes locally for fast lookups
- self._s_created = BackgroundJobBase.stat_created_c
- self._s_running = BackgroundJobBase.stat_running_c
- self._s_completed = BackgroundJobBase.stat_completed_c
- self._s_dead = BackgroundJobBase.stat_dead_c
-
- @property
- def running(self):
- self._update_status()
- return self._running
-
- @property
- def dead(self):
- self._update_status()
- return self._dead
-
- @property
- def completed(self):
- self._update_status()
- return self._completed
-
- def new(self, func_or_exp, *args, **kwargs):
- """Add a new background job and start it in a separate thread.
-
- There are two types of jobs which can be created:
-
- 1. Jobs based on expressions which can be passed to an eval() call.
- The expression must be given as a string. For example:
-
- job_manager.new('myfunc(x,y,z=1)'[,glob[,loc]])
-
- The given expression is passed to eval(), along with the optional
- global/local dicts provided. If no dicts are given, they are
- extracted automatically from the caller's frame.
-
- A Python statement is NOT a valid eval() expression. Basically, you
- can only use as an eval() argument something which can go on the right
- of an '=' sign and be assigned to a variable.
-
- For example,"print 'hello'" is not valid, but '2+3' is.
-
- 2. Jobs given a function object, optionally passing additional
- positional arguments:
-
- job_manager.new(myfunc, x, y)
-
- The function is called with the given arguments.
-
- If you need to pass keyword arguments to your function, you must
- supply them as a dict named kw:
-
- job_manager.new(myfunc, x, y, kw=dict(z=1))
-
- The reason for this assymmetry is that the new() method needs to
- maintain access to its own keywords, and this prevents name collisions
- between arguments to new() and arguments to your own functions.
-
- In both cases, the result is stored in the job.result field of the
- background job object.
-
- You can set `daemon` attribute of the thread by giving the keyword
- argument `daemon`.
-
- Notes and caveats:
-
- 1. All threads running share the same standard output. Thus, if your
- background jobs generate output, it will come out on top of whatever
- you are currently writing. For this reason, background jobs are best
- used with silent functions which simply return their output.
-
- 2. Threads also all work within the same global namespace, and this
- system does not lock interactive variables. So if you send job to the
- background which operates on a mutable object for a long time, and
- start modifying that same mutable object interactively (or in another
- backgrounded job), all sorts of bizarre behaviour will occur.
-
- 3. If a background job is spending a lot of time inside a C extension
- module which does not release the Python Global Interpreter Lock
- (GIL), this will block the IPython prompt. This is simply because the
- Python interpreter can only switch between threads at Python
- bytecodes. While the execution is inside C code, the interpreter must
- simply wait unless the extension module releases the GIL.
-
- 4. There is no way, due to limitations in the Python threads library,
- to kill a thread once it has started."""
-
- if callable(func_or_exp):
- kw = kwargs.get('kw',{})
- job = BackgroundJobFunc(func_or_exp,*args,**kw)
- elif isinstance(func_or_exp, string_types):
- if not args:
- frame = sys._getframe(1)
- glob, loc = frame.f_globals, frame.f_locals
- elif len(args)==1:
- glob = loc = args[0]
- elif len(args)==2:
- glob,loc = args
- else:
- raise ValueError(
- 'Expression jobs take at most 2 args (globals,locals)')
- job = BackgroundJobExpr(func_or_exp, glob, loc)
- else:
- raise TypeError('invalid args for new job')
-
- if kwargs.get('daemon', False):
- job.daemon = True
- job.num = len(self.all)+1 if self.all else 0
- self.running.append(job)
- self.all[job.num] = job
- print('Starting job # %s in a separate thread.' % job.num)
- job.start()
- return job
-
- def __getitem__(self, job_key):
- num = job_key if isinstance(job_key, int) else job_key.num
- return self.all[num]
-
- def __call__(self):
- """An alias to self.status(),
-
- This allows you to simply call a job manager instance much like the
- Unix `jobs` shell command."""
-
- return self.status()
-
- def _update_status(self):
- """Update the status of the job lists.
-
- This method moves finished jobs to one of two lists:
- - self.completed: jobs which completed successfully
- - self.dead: jobs which finished but died.
-
- It also copies those jobs to corresponding _report lists. These lists
- are used to report jobs completed/dead since the last update, and are
- then cleared by the reporting function after each call."""
-
- # Status codes
- srun, scomp, sdead = self._s_running, self._s_completed, self._s_dead
- # State lists, use the actual lists b/c the public names are properties
- # that call this very function on access
- running, completed, dead = self._running, self._completed, self._dead
-
- # Now, update all state lists
- for num, job in enumerate(running):
- stat = job.stat_code
- if stat == srun:
- continue
- elif stat == scomp:
- completed.append(job)
- self._comp_report.append(job)
- running[num] = False
- elif stat == sdead:
- dead.append(job)
- self._dead_report.append(job)
- running[num] = False
- # Remove dead/completed jobs from running list
- running[:] = filter(None, running)
-
- def _group_report(self,group,name):
- """Report summary for a given job group.
-
- Return True if the group had any elements."""
-
- if group:
- print('%s jobs:' % name)
- for job in group:
- print('%s : %s' % (job.num,job))
- print()
- return True
-
- def _group_flush(self,group,name):
- """Flush a given job group
-
- Return True if the group had any elements."""
-
- njobs = len(group)
- if njobs:
- plural = {1:''}.setdefault(njobs,'s')
- print('Flushing %s %s job%s.' % (njobs,name,plural))
- group[:] = []
- return True
-
- def _status_new(self):
- """Print the status of newly finished jobs.
-
- Return True if any new jobs are reported.
-
- This call resets its own state every time, so it only reports jobs
- which have finished since the last time it was called."""
-
- self._update_status()
- new_comp = self._group_report(self._comp_report, 'Completed')
- new_dead = self._group_report(self._dead_report,
- 'Dead, call jobs.traceback() for details')
- self._comp_report[:] = []
- self._dead_report[:] = []
- return new_comp or new_dead
-
- def status(self,verbose=0):
- """Print a status of all jobs currently being managed."""
-
- self._update_status()
- self._group_report(self.running,'Running')
- self._group_report(self.completed,'Completed')
- self._group_report(self.dead,'Dead')
- # Also flush the report queues
- self._comp_report[:] = []
- self._dead_report[:] = []
-
- def remove(self,num):
- """Remove a finished (completed or dead) job."""
-
- try:
- job = self.all[num]
- except KeyError:
- error('Job #%s not found' % num)
- else:
- stat_code = job.stat_code
- if stat_code == self._s_running:
- error('Job #%s is still running, it can not be removed.' % num)
- return
- elif stat_code == self._s_completed:
- self.completed.remove(job)
- elif stat_code == self._s_dead:
- self.dead.remove(job)
-
- def flush(self):
- """Flush all finished jobs (completed and dead) from lists.
-
- Running jobs are never flushed.
-
- It first calls _status_new(), to update info. If any jobs have
- completed since the last _status_new() call, the flush operation
- aborts."""
-
- # Remove the finished jobs from the master dict
- alljobs = self.all
- for job in self.completed+self.dead:
- del(alljobs[job.num])
-
- # Now flush these lists completely
- fl_comp = self._group_flush(self.completed, 'Completed')
- fl_dead = self._group_flush(self.dead, 'Dead')
- if not (fl_comp or fl_dead):
- print('No jobs to flush.')
-
- def result(self,num):
- """result(N) -> return the result of job N."""
- try:
- return self.all[num].result
- except KeyError:
- error('Job #%s not found' % num)
-
- def _traceback(self, job):
- num = job if isinstance(job, int) else job.num
- try:
- self.all[num].traceback()
- except KeyError:
- error('Job #%s not found' % num)
-
- def traceback(self, job=None):
- if job is None:
- self._update_status()
- for deadjob in self.dead:
- print("Traceback for: %r" % deadjob)
- self._traceback(deadjob)
- print()
- else:
- self._traceback(job)
-
-
-class BackgroundJobBase(threading.Thread):
- """Base class to build BackgroundJob classes.
-
- The derived classes must implement:
-
- - Their own __init__, since the one here raises NotImplementedError. The
- derived constructor must call self._init() at the end, to provide common
- initialization.
-
- - A strform attribute used in calls to __str__.
-
- - A call() method, which will make the actual execution call and must
- return a value to be held in the 'result' field of the job object.
- """
-
- # Class constants for status, in string and as numerical codes (when
- # updating jobs lists, we don't want to do string comparisons). This will
- # be done at every user prompt, so it has to be as fast as possible
- stat_created = 'Created'; stat_created_c = 0
- stat_running = 'Running'; stat_running_c = 1
- stat_completed = 'Completed'; stat_completed_c = 2
- stat_dead = 'Dead (Exception), call jobs.traceback() for details'
- stat_dead_c = -1
-
- def __init__(self):
- """Must be implemented in subclasses.
-
- Subclasses must call :meth:`_init` for standard initialisation.
- """
- raise NotImplementedError("This class can not be instantiated directly.")
-
- def _init(self):
- """Common initialization for all BackgroundJob objects"""
-
- for attr in ['call','strform']:
- assert hasattr(self,attr), "Missing attribute <%s>" % attr
-
- # The num tag can be set by an external job manager
- self.num = None
-
- self.status = BackgroundJobBase.stat_created
- self.stat_code = BackgroundJobBase.stat_created_c
- self.finished = False
- self.result = '<BackgroundJob has not completed>'
-
- # reuse the ipython traceback handler if we can get to it, otherwise
- # make a new one
- try:
- make_tb = get_ipython().InteractiveTB.text
- except:
- make_tb = AutoFormattedTB(mode = 'Context',
- color_scheme='NoColor',
- tb_offset = 1).text
- # Note that the actual API for text() requires the three args to be
- # passed in, so we wrap it in a simple lambda.
- self._make_tb = lambda : make_tb(None, None, None)
-
- # Hold a formatted traceback if one is generated.
- self._tb = None
-
- threading.Thread.__init__(self)
-
- def __str__(self):
- return self.strform
-
- def __repr__(self):
- return '<BackgroundJob #%d: %s>' % (self.num, self.strform)
-
- def traceback(self):
- print(self._tb)
-
- def run(self):
- try:
- self.status = BackgroundJobBase.stat_running
- self.stat_code = BackgroundJobBase.stat_running_c
- self.result = self.call()
- except:
- self.status = BackgroundJobBase.stat_dead
- self.stat_code = BackgroundJobBase.stat_dead_c
- self.finished = None
- self.result = ('<BackgroundJob died, call jobs.traceback() for details>')
- self._tb = self._make_tb()
- else:
- self.status = BackgroundJobBase.stat_completed
- self.stat_code = BackgroundJobBase.stat_completed_c
- self.finished = True
-
-
-class BackgroundJobExpr(BackgroundJobBase):
- """Evaluate an expression as a background job (uses a separate thread)."""
-
- def __init__(self, expression, glob=None, loc=None):
- """Create a new job from a string which can be fed to eval().
-
- global/locals dicts can be provided, which will be passed to the eval
- call."""
-
- # fail immediately if the given expression can't be compiled
- self.code = compile(expression,'<BackgroundJob compilation>','eval')
-
- glob = {} if glob is None else glob
- loc = {} if loc is None else loc
- self.expression = self.strform = expression
- self.glob = glob
- self.loc = loc
- self._init()
-
- def call(self):
- return eval(self.code,self.glob,self.loc)
-
-
-class BackgroundJobFunc(BackgroundJobBase):
- """Run a function call as a background job (uses a separate thread)."""
-
- def __init__(self, func, *args, **kwargs):
- """Create a new job from a callable object.
-
- Any positional arguments and keyword args given to this constructor
- after the initial callable are passed directly to it."""
-
- if not callable(func):
- raise TypeError(
- 'first argument to BackgroundJobFunc must be callable')
-
- self.func = func
- self.args = args
- self.kwargs = kwargs
- # The string form will only include the function passed, because
- # generating string representations of the arguments is a potentially
- # _very_ expensive operation (e.g. with large arrays).
- self.strform = str(func)
- self._init()
-
- def call(self):
- return self.func(*self.args, **self.kwargs)
+from IPython.utils.py3compat import string_types
+
+
+class BackgroundJobManager(object):
+ """Class to manage a pool of backgrounded threaded jobs.
+
+ Below, we assume that 'jobs' is a BackgroundJobManager instance.
+
+ Usage summary (see the method docstrings for details):
+
+ jobs.new(...) -> start a new job
+
+ jobs() or jobs.status() -> print status summary of all jobs
+
+ jobs[N] -> returns job number N.
+
+ foo = jobs[N].result -> assign to variable foo the result of job N
+
+ jobs[N].traceback() -> print the traceback of dead job N
+
+ jobs.remove(N) -> remove (finished) job N
+
+ jobs.flush() -> remove all finished jobs
+
+ As a convenience feature, BackgroundJobManager instances provide the
+ utility result and traceback methods which retrieve the corresponding
+ information from the jobs list:
+
+ jobs.result(N) <--> jobs[N].result
+ jobs.traceback(N) <--> jobs[N].traceback()
+
+ While this appears minor, it allows you to use tab completion
+ interactively on the job manager instance.
+ """
+
+ def __init__(self):
+ # Lists for job management, accessed via a property to ensure they're
+ # up to date.x
+ self._running = []
+ self._completed = []
+ self._dead = []
+ # A dict of all jobs, so users can easily access any of them
+ self.all = {}
+ # For reporting
+ self._comp_report = []
+ self._dead_report = []
+ # Store status codes locally for fast lookups
+ self._s_created = BackgroundJobBase.stat_created_c
+ self._s_running = BackgroundJobBase.stat_running_c
+ self._s_completed = BackgroundJobBase.stat_completed_c
+ self._s_dead = BackgroundJobBase.stat_dead_c
+
+ @property
+ def running(self):
+ self._update_status()
+ return self._running
+
+ @property
+ def dead(self):
+ self._update_status()
+ return self._dead
+
+ @property
+ def completed(self):
+ self._update_status()
+ return self._completed
+
+ def new(self, func_or_exp, *args, **kwargs):
+ """Add a new background job and start it in a separate thread.
+
+ There are two types of jobs which can be created:
+
+ 1. Jobs based on expressions which can be passed to an eval() call.
+ The expression must be given as a string. For example:
+
+ job_manager.new('myfunc(x,y,z=1)'[,glob[,loc]])
+
+ The given expression is passed to eval(), along with the optional
+ global/local dicts provided. If no dicts are given, they are
+ extracted automatically from the caller's frame.
+
+ A Python statement is NOT a valid eval() expression. Basically, you
+ can only use as an eval() argument something which can go on the right
+ of an '=' sign and be assigned to a variable.
+
+ For example,"print 'hello'" is not valid, but '2+3' is.
+
+ 2. Jobs given a function object, optionally passing additional
+ positional arguments:
+
+ job_manager.new(myfunc, x, y)
+
+ The function is called with the given arguments.
+
+ If you need to pass keyword arguments to your function, you must
+ supply them as a dict named kw:
+
+ job_manager.new(myfunc, x, y, kw=dict(z=1))
+
+ The reason for this assymmetry is that the new() method needs to
+ maintain access to its own keywords, and this prevents name collisions
+ between arguments to new() and arguments to your own functions.
+
+ In both cases, the result is stored in the job.result field of the
+ background job object.
+
+ You can set `daemon` attribute of the thread by giving the keyword
+ argument `daemon`.
+
+ Notes and caveats:
+
+ 1. All threads running share the same standard output. Thus, if your
+ background jobs generate output, it will come out on top of whatever
+ you are currently writing. For this reason, background jobs are best
+ used with silent functions which simply return their output.
+
+ 2. Threads also all work within the same global namespace, and this
+ system does not lock interactive variables. So if you send job to the
+ background which operates on a mutable object for a long time, and
+ start modifying that same mutable object interactively (or in another
+ backgrounded job), all sorts of bizarre behaviour will occur.
+
+ 3. If a background job is spending a lot of time inside a C extension
+ module which does not release the Python Global Interpreter Lock
+ (GIL), this will block the IPython prompt. This is simply because the
+ Python interpreter can only switch between threads at Python
+ bytecodes. While the execution is inside C code, the interpreter must
+ simply wait unless the extension module releases the GIL.
+
+ 4. There is no way, due to limitations in the Python threads library,
+ to kill a thread once it has started."""
+
+ if callable(func_or_exp):
+ kw = kwargs.get('kw',{})
+ job = BackgroundJobFunc(func_or_exp,*args,**kw)
+ elif isinstance(func_or_exp, string_types):
+ if not args:
+ frame = sys._getframe(1)
+ glob, loc = frame.f_globals, frame.f_locals
+ elif len(args)==1:
+ glob = loc = args[0]
+ elif len(args)==2:
+ glob,loc = args
+ else:
+ raise ValueError(
+ 'Expression jobs take at most 2 args (globals,locals)')
+ job = BackgroundJobExpr(func_or_exp, glob, loc)
+ else:
+ raise TypeError('invalid args for new job')
+
+ if kwargs.get('daemon', False):
+ job.daemon = True
+ job.num = len(self.all)+1 if self.all else 0
+ self.running.append(job)
+ self.all[job.num] = job
+ print('Starting job # %s in a separate thread.' % job.num)
+ job.start()
+ return job
+
+ def __getitem__(self, job_key):
+ num = job_key if isinstance(job_key, int) else job_key.num
+ return self.all[num]
+
+ def __call__(self):
+ """An alias to self.status(),
+
+ This allows you to simply call a job manager instance much like the
+ Unix `jobs` shell command."""
+
+ return self.status()
+
+ def _update_status(self):
+ """Update the status of the job lists.
+
+ This method moves finished jobs to one of two lists:
+ - self.completed: jobs which completed successfully
+ - self.dead: jobs which finished but died.
+
+ It also copies those jobs to corresponding _report lists. These lists
+ are used to report jobs completed/dead since the last update, and are
+ then cleared by the reporting function after each call."""
+
+ # Status codes
+ srun, scomp, sdead = self._s_running, self._s_completed, self._s_dead
+ # State lists, use the actual lists b/c the public names are properties
+ # that call this very function on access
+ running, completed, dead = self._running, self._completed, self._dead
+
+ # Now, update all state lists
+ for num, job in enumerate(running):
+ stat = job.stat_code
+ if stat == srun:
+ continue
+ elif stat == scomp:
+ completed.append(job)
+ self._comp_report.append(job)
+ running[num] = False
+ elif stat == sdead:
+ dead.append(job)
+ self._dead_report.append(job)
+ running[num] = False
+ # Remove dead/completed jobs from running list
+ running[:] = filter(None, running)
+
+ def _group_report(self,group,name):
+ """Report summary for a given job group.
+
+ Return True if the group had any elements."""
+
+ if group:
+ print('%s jobs:' % name)
+ for job in group:
+ print('%s : %s' % (job.num,job))
+ print()
+ return True
+
+ def _group_flush(self,group,name):
+ """Flush a given job group
+
+ Return True if the group had any elements."""
+
+ njobs = len(group)
+ if njobs:
+ plural = {1:''}.setdefault(njobs,'s')
+ print('Flushing %s %s job%s.' % (njobs,name,plural))
+ group[:] = []
+ return True
+
+ def _status_new(self):
+ """Print the status of newly finished jobs.
+
+ Return True if any new jobs are reported.
+
+ This call resets its own state every time, so it only reports jobs
+ which have finished since the last time it was called."""
+
+ self._update_status()
+ new_comp = self._group_report(self._comp_report, 'Completed')
+ new_dead = self._group_report(self._dead_report,
+ 'Dead, call jobs.traceback() for details')
+ self._comp_report[:] = []
+ self._dead_report[:] = []
+ return new_comp or new_dead
+
+ def status(self,verbose=0):
+ """Print a status of all jobs currently being managed."""
+
+ self._update_status()
+ self._group_report(self.running,'Running')
+ self._group_report(self.completed,'Completed')
+ self._group_report(self.dead,'Dead')
+ # Also flush the report queues
+ self._comp_report[:] = []
+ self._dead_report[:] = []
+
+ def remove(self,num):
+ """Remove a finished (completed or dead) job."""
+
+ try:
+ job = self.all[num]
+ except KeyError:
+ error('Job #%s not found' % num)
+ else:
+ stat_code = job.stat_code
+ if stat_code == self._s_running:
+ error('Job #%s is still running, it can not be removed.' % num)
+ return
+ elif stat_code == self._s_completed:
+ self.completed.remove(job)
+ elif stat_code == self._s_dead:
+ self.dead.remove(job)
+
+ def flush(self):
+ """Flush all finished jobs (completed and dead) from lists.
+
+ Running jobs are never flushed.
+
+ It first calls _status_new(), to update info. If any jobs have
+ completed since the last _status_new() call, the flush operation
+ aborts."""
+
+ # Remove the finished jobs from the master dict
+ alljobs = self.all
+ for job in self.completed+self.dead:
+ del(alljobs[job.num])
+
+ # Now flush these lists completely
+ fl_comp = self._group_flush(self.completed, 'Completed')
+ fl_dead = self._group_flush(self.dead, 'Dead')
+ if not (fl_comp or fl_dead):
+ print('No jobs to flush.')
+
+ def result(self,num):
+ """result(N) -> return the result of job N."""
+ try:
+ return self.all[num].result
+ except KeyError:
+ error('Job #%s not found' % num)
+
+ def _traceback(self, job):
+ num = job if isinstance(job, int) else job.num
+ try:
+ self.all[num].traceback()
+ except KeyError:
+ error('Job #%s not found' % num)
+
+ def traceback(self, job=None):
+ if job is None:
+ self._update_status()
+ for deadjob in self.dead:
+ print("Traceback for: %r" % deadjob)
+ self._traceback(deadjob)
+ print()
+ else:
+ self._traceback(job)
+
+
+class BackgroundJobBase(threading.Thread):
+ """Base class to build BackgroundJob classes.
+
+ The derived classes must implement:
+
+ - Their own __init__, since the one here raises NotImplementedError. The
+ derived constructor must call self._init() at the end, to provide common
+ initialization.
+
+ - A strform attribute used in calls to __str__.
+
+ - A call() method, which will make the actual execution call and must
+ return a value to be held in the 'result' field of the job object.
+ """
+
+ # Class constants for status, in string and as numerical codes (when
+ # updating jobs lists, we don't want to do string comparisons). This will
+ # be done at every user prompt, so it has to be as fast as possible
+ stat_created = 'Created'; stat_created_c = 0
+ stat_running = 'Running'; stat_running_c = 1
+ stat_completed = 'Completed'; stat_completed_c = 2
+ stat_dead = 'Dead (Exception), call jobs.traceback() for details'
+ stat_dead_c = -1
+
+ def __init__(self):
+ """Must be implemented in subclasses.
+
+ Subclasses must call :meth:`_init` for standard initialisation.
+ """
+ raise NotImplementedError("This class can not be instantiated directly.")
+
+ def _init(self):
+ """Common initialization for all BackgroundJob objects"""
+
+ for attr in ['call','strform']:
+ assert hasattr(self,attr), "Missing attribute <%s>" % attr
+
+ # The num tag can be set by an external job manager
+ self.num = None
+
+ self.status = BackgroundJobBase.stat_created
+ self.stat_code = BackgroundJobBase.stat_created_c
+ self.finished = False
+ self.result = '<BackgroundJob has not completed>'
+
+ # reuse the ipython traceback handler if we can get to it, otherwise
+ # make a new one
+ try:
+ make_tb = get_ipython().InteractiveTB.text
+ except:
+ make_tb = AutoFormattedTB(mode = 'Context',
+ color_scheme='NoColor',
+ tb_offset = 1).text
+ # Note that the actual API for text() requires the three args to be
+ # passed in, so we wrap it in a simple lambda.
+ self._make_tb = lambda : make_tb(None, None, None)
+
+ # Hold a formatted traceback if one is generated.
+ self._tb = None
+
+ threading.Thread.__init__(self)
+
+ def __str__(self):
+ return self.strform
+
+ def __repr__(self):
+ return '<BackgroundJob #%d: %s>' % (self.num, self.strform)
+
+ def traceback(self):
+ print(self._tb)
+
+ def run(self):
+ try:
+ self.status = BackgroundJobBase.stat_running
+ self.stat_code = BackgroundJobBase.stat_running_c
+ self.result = self.call()
+ except:
+ self.status = BackgroundJobBase.stat_dead
+ self.stat_code = BackgroundJobBase.stat_dead_c
+ self.finished = None
+ self.result = ('<BackgroundJob died, call jobs.traceback() for details>')
+ self._tb = self._make_tb()
+ else:
+ self.status = BackgroundJobBase.stat_completed
+ self.stat_code = BackgroundJobBase.stat_completed_c
+ self.finished = True
+
+
+class BackgroundJobExpr(BackgroundJobBase):
+ """Evaluate an expression as a background job (uses a separate thread)."""
+
+ def __init__(self, expression, glob=None, loc=None):
+ """Create a new job from a string which can be fed to eval().
+
+ global/locals dicts can be provided, which will be passed to the eval
+ call."""
+
+ # fail immediately if the given expression can't be compiled
+ self.code = compile(expression,'<BackgroundJob compilation>','eval')
+
+ glob = {} if glob is None else glob
+ loc = {} if loc is None else loc
+ self.expression = self.strform = expression
+ self.glob = glob
+ self.loc = loc
+ self._init()
+
+ def call(self):
+ return eval(self.code,self.glob,self.loc)
+
+
+class BackgroundJobFunc(BackgroundJobBase):
+ """Run a function call as a background job (uses a separate thread)."""
+
+ def __init__(self, func, *args, **kwargs):
+ """Create a new job from a callable object.
+
+ Any positional arguments and keyword args given to this constructor
+ after the initial callable are passed directly to it."""
+
+ if not callable(func):
+ raise TypeError(
+ 'first argument to BackgroundJobFunc must be callable')
+
+ self.func = func
+ self.args = args
+ self.kwargs = kwargs
+ # The string form will only include the function passed, because
+ # generating string representations of the arguments is a potentially
+ # _very_ expensive operation (e.g. with large arrays).
+ self.strform = str(func)
+ self._init()
+
+ def call(self):
+ return self.func(*self.args, **self.kwargs)