aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/ipython/py2/IPython/lib/backgroundjobs.py
blob: 1acfe7df1eea729597b8df85d6f08d03217b3b91 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
# -*- coding: utf-8 -*- 
"""Manage background (threaded) jobs conveniently from an interactive shell. 
 
This module provides a BackgroundJobManager class.  This is the main class 
meant for public usage, it implements an object which can create and manage 
new background jobs. 
 
It also provides the actual job classes managed by these BackgroundJobManager 
objects, see their docstrings below. 
 
 
This system was inspired by discussions with B. Granger and the 
BackgroundCommand class described in the book Python Scripting for 
Computational Science, by H. P. Langtangen: 
 
http://folk.uio.no/hpl/scripting 
 
(although ultimately no code from this text was used, as IPython's system is a 
separate implementation). 
 
An example notebook is provided in our documentation illustrating interactive 
use of the system. 
""" 
from __future__ import print_function 
 
#***************************************************************************** 
#       Copyright (C) 2005-2006 Fernando Perez <fperez@colorado.edu> 
# 
#  Distributed under the terms of the BSD License.  The full license is in 
#  the file COPYING, distributed as part of this software. 
#***************************************************************************** 
 
# Code begins 
import sys 
import threading 
 
from IPython import get_ipython 
from IPython.core.ultratb import AutoFormattedTB 
from logging import error
from IPython.utils.py3compat import string_types 
 
 
class BackgroundJobManager(object): 
    """Class to manage a pool of backgrounded threaded jobs. 
 
    Below, we assume that 'jobs' is a BackgroundJobManager instance. 
     
    Usage summary (see the method docstrings for details): 
 
      jobs.new(...) -> start a new job 
       
      jobs() or jobs.status() -> print status summary of all jobs 
 
      jobs[N] -> returns job number N. 
 
      foo = jobs[N].result -> assign to variable foo the result of job N 
 
      jobs[N].traceback() -> print the traceback of dead job N 
 
      jobs.remove(N) -> remove (finished) job N 
 
      jobs.flush() -> remove all finished jobs 
       
    As a convenience feature, BackgroundJobManager instances provide the 
    utility result and traceback methods which retrieve the corresponding 
    information from the jobs list: 
 
      jobs.result(N) <--> jobs[N].result 
      jobs.traceback(N) <--> jobs[N].traceback() 
 
    While this appears minor, it allows you to use tab completion 
    interactively on the job manager instance. 
    """ 
 
    def __init__(self): 
        # Lists for job management, accessed via a property to ensure they're 
        # up to date.x 
        self._running  = [] 
        self._completed = [] 
        self._dead = [] 
        # A dict of all jobs, so users can easily access any of them 
        self.all = {} 
        # For reporting 
        self._comp_report = [] 
        self._dead_report = [] 
        # Store status codes locally for fast lookups 
        self._s_created   = BackgroundJobBase.stat_created_c 
        self._s_running   = BackgroundJobBase.stat_running_c 
        self._s_completed = BackgroundJobBase.stat_completed_c 
        self._s_dead      = BackgroundJobBase.stat_dead_c 
 
    @property 
    def running(self): 
        self._update_status() 
        return self._running 
 
    @property 
    def dead(self): 
        self._update_status() 
        return self._dead 
 
    @property 
    def completed(self): 
        self._update_status() 
        return self._completed 
 
    def new(self, func_or_exp, *args, **kwargs): 
        """Add a new background job and start it in a separate thread. 
 
        There are two types of jobs which can be created: 
 
        1. Jobs based on expressions which can be passed to an eval() call. 
        The expression must be given as a string.  For example: 
 
          job_manager.new('myfunc(x,y,z=1)'[,glob[,loc]]) 
 
        The given expression is passed to eval(), along with the optional 
        global/local dicts provided.  If no dicts are given, they are 
        extracted automatically from the caller's frame. 
         
        A Python statement is NOT a valid eval() expression.  Basically, you 
        can only use as an eval() argument something which can go on the right 
        of an '=' sign and be assigned to a variable. 
 
        For example,"print 'hello'" is not valid, but '2+3' is. 
 
        2. Jobs given a function object, optionally passing additional 
        positional arguments: 
 
          job_manager.new(myfunc, x, y) 
 
        The function is called with the given arguments. 
 
        If you need to pass keyword arguments to your function, you must 
        supply them as a dict named kw: 
 
          job_manager.new(myfunc, x, y, kw=dict(z=1)) 
 
        The reason for this assymmetry is that the new() method needs to 
        maintain access to its own keywords, and this prevents name collisions 
        between arguments to new() and arguments to your own functions. 
 
        In both cases, the result is stored in the job.result field of the 
        background job object. 
 
        You can set `daemon` attribute of the thread by giving the keyword 
        argument `daemon`. 
 
        Notes and caveats: 
 
        1. All threads running share the same standard output.  Thus, if your 
        background jobs generate output, it will come out on top of whatever 
        you are currently writing.  For this reason, background jobs are best 
        used with silent functions which simply return their output. 
 
        2. Threads also all work within the same global namespace, and this 
        system does not lock interactive variables.  So if you send job to the 
        background which operates on a mutable object for a long time, and 
        start modifying that same mutable object interactively (or in another 
        backgrounded job), all sorts of bizarre behaviour will occur. 
 
        3. If a background job is spending a lot of time inside a C extension 
        module which does not release the Python Global Interpreter Lock 
        (GIL), this will block the IPython prompt.  This is simply because the 
        Python interpreter can only switch between threads at Python 
        bytecodes.  While the execution is inside C code, the interpreter must 
        simply wait unless the extension module releases the GIL. 
 
        4. There is no way, due to limitations in the Python threads library, 
        to kill a thread once it has started.""" 
         
        if callable(func_or_exp): 
            kw  = kwargs.get('kw',{}) 
            job = BackgroundJobFunc(func_or_exp,*args,**kw) 
        elif isinstance(func_or_exp, string_types): 
            if not args: 
                frame = sys._getframe(1) 
                glob, loc = frame.f_globals, frame.f_locals 
            elif len(args)==1: 
                glob = loc = args[0] 
            elif len(args)==2: 
                glob,loc = args 
            else: 
                raise ValueError( 
                      'Expression jobs take at most 2 args (globals,locals)') 
            job = BackgroundJobExpr(func_or_exp, glob, loc) 
        else: 
            raise TypeError('invalid args for new job') 
 
        if kwargs.get('daemon', False): 
            job.daemon = True 
        job.num = len(self.all)+1 if self.all else 0 
        self.running.append(job) 
        self.all[job.num] = job 
        print('Starting job # %s in a separate thread.' % job.num) 
        job.start() 
        return job 
 
    def __getitem__(self, job_key): 
        num = job_key if isinstance(job_key, int) else job_key.num 
        return self.all[num] 
 
    def __call__(self): 
        """An alias to self.status(), 
 
        This allows you to simply call a job manager instance much like the 
        Unix `jobs` shell command.""" 
 
        return self.status() 
 
    def _update_status(self): 
        """Update the status of the job lists. 
 
        This method moves finished jobs to one of two lists: 
          - self.completed: jobs which completed successfully 
          - self.dead: jobs which finished but died. 
 
        It also copies those jobs to corresponding _report lists.  These lists 
        are used to report jobs completed/dead since the last update, and are 
        then cleared by the reporting function after each call.""" 
 
        # Status codes 
        srun, scomp, sdead = self._s_running, self._s_completed, self._s_dead 
        # State lists, use the actual lists b/c the public names are properties 
        # that call this very function on access 
        running, completed, dead = self._running, self._completed, self._dead 
 
        # Now, update all state lists 
        for num, job in enumerate(running): 
            stat = job.stat_code 
            if stat == srun: 
                continue 
            elif stat == scomp: 
                completed.append(job) 
                self._comp_report.append(job) 
                running[num] = False 
            elif stat == sdead: 
                dead.append(job) 
                self._dead_report.append(job) 
                running[num] = False 
        # Remove dead/completed jobs from running list 
        running[:] = filter(None, running) 
 
    def _group_report(self,group,name): 
        """Report summary for a given job group. 
 
        Return True if the group had any elements.""" 
 
        if group: 
            print('%s jobs:' % name) 
            for job in group: 
                print('%s : %s' % (job.num,job)) 
            print() 
            return True 
 
    def _group_flush(self,group,name): 
        """Flush a given job group 
 
        Return True if the group had any elements.""" 
 
        njobs = len(group) 
        if njobs: 
            plural = {1:''}.setdefault(njobs,'s') 
            print('Flushing %s %s job%s.' % (njobs,name,plural)) 
            group[:] = [] 
            return True 
         
    def _status_new(self): 
        """Print the status of newly finished jobs. 
 
        Return True if any new jobs are reported. 
 
        This call resets its own state every time, so it only reports jobs 
        which have finished since the last time it was called.""" 
 
        self._update_status() 
        new_comp = self._group_report(self._comp_report, 'Completed') 
        new_dead = self._group_report(self._dead_report, 
                                      'Dead, call jobs.traceback() for details') 
        self._comp_report[:] = [] 
        self._dead_report[:] = [] 
        return new_comp or new_dead 
                 
    def status(self,verbose=0): 
        """Print a status of all jobs currently being managed.""" 
 
        self._update_status() 
        self._group_report(self.running,'Running') 
        self._group_report(self.completed,'Completed') 
        self._group_report(self.dead,'Dead') 
        # Also flush the report queues 
        self._comp_report[:] = [] 
        self._dead_report[:] = [] 
 
    def remove(self,num): 
        """Remove a finished (completed or dead) job.""" 
 
        try: 
            job = self.all[num] 
        except KeyError: 
            error('Job #%s not found' % num) 
        else: 
            stat_code = job.stat_code 
            if stat_code == self._s_running: 
                error('Job #%s is still running, it can not be removed.' % num) 
                return 
            elif stat_code == self._s_completed: 
                self.completed.remove(job) 
            elif stat_code == self._s_dead: 
                self.dead.remove(job) 
 
    def flush(self): 
        """Flush all finished jobs (completed and dead) from lists. 
 
        Running jobs are never flushed. 
 
        It first calls _status_new(), to update info. If any jobs have 
        completed since the last _status_new() call, the flush operation 
        aborts.""" 
 
        # Remove the finished jobs from the master dict 
        alljobs = self.all 
        for job in self.completed+self.dead: 
            del(alljobs[job.num]) 
 
        # Now flush these lists completely 
        fl_comp = self._group_flush(self.completed, 'Completed') 
        fl_dead = self._group_flush(self.dead, 'Dead') 
        if not (fl_comp or fl_dead): 
            print('No jobs to flush.') 
 
    def result(self,num): 
        """result(N) -> return the result of job N.""" 
        try: 
            return self.all[num].result 
        except KeyError: 
            error('Job #%s not found' % num) 
 
    def _traceback(self, job): 
        num = job if isinstance(job, int) else job.num 
        try: 
            self.all[num].traceback() 
        except KeyError: 
            error('Job #%s not found' % num) 
 
    def traceback(self, job=None): 
        if job is None: 
            self._update_status() 
            for deadjob in self.dead: 
                print("Traceback for: %r" % deadjob) 
                self._traceback(deadjob) 
                print() 
        else: 
            self._traceback(job) 
 
 
class BackgroundJobBase(threading.Thread): 
    """Base class to build BackgroundJob classes. 
 
    The derived classes must implement: 
 
    - Their own __init__, since the one here raises NotImplementedError.  The 
      derived constructor must call self._init() at the end, to provide common 
      initialization. 
 
    - A strform attribute used in calls to __str__. 
 
    - A call() method, which will make the actual execution call and must 
      return a value to be held in the 'result' field of the job object. 
    """ 
 
    # Class constants for status, in string and as numerical codes (when 
    # updating jobs lists, we don't want to do string comparisons).  This will 
    # be done at every user prompt, so it has to be as fast as possible 
    stat_created   = 'Created'; stat_created_c = 0 
    stat_running   = 'Running'; stat_running_c = 1 
    stat_completed = 'Completed'; stat_completed_c = 2 
    stat_dead      = 'Dead (Exception), call jobs.traceback() for details' 
    stat_dead_c = -1 
 
    def __init__(self): 
        """Must be implemented in subclasses. 
 
        Subclasses must call :meth:`_init` for standard initialisation. 
        """ 
        raise NotImplementedError("This class can not be instantiated directly.") 
 
    def _init(self): 
        """Common initialization for all BackgroundJob objects""" 
         
        for attr in ['call','strform']: 
            assert hasattr(self,attr), "Missing attribute <%s>" % attr 
         
        # The num tag can be set by an external job manager 
        self.num = None 
       
        self.status    = BackgroundJobBase.stat_created 
        self.stat_code = BackgroundJobBase.stat_created_c 
        self.finished  = False 
        self.result    = '<BackgroundJob has not completed>' 
         
        # reuse the ipython traceback handler if we can get to it, otherwise 
        # make a new one 
        try: 
            make_tb = get_ipython().InteractiveTB.text 
        except: 
            make_tb = AutoFormattedTB(mode = 'Context', 
                                      color_scheme='NoColor', 
                                      tb_offset = 1).text 
        # Note that the actual API for text() requires the three args to be 
        # passed in, so we wrap it in a simple lambda. 
        self._make_tb = lambda : make_tb(None, None, None) 
 
        # Hold a formatted traceback if one is generated. 
        self._tb = None 
         
        threading.Thread.__init__(self) 
 
    def __str__(self): 
        return self.strform 
 
    def __repr__(self): 
        return '<BackgroundJob #%d: %s>' % (self.num, self.strform) 
 
    def traceback(self): 
        print(self._tb) 
         
    def run(self): 
        try: 
            self.status    = BackgroundJobBase.stat_running 
            self.stat_code = BackgroundJobBase.stat_running_c 
            self.result    = self.call() 
        except: 
            self.status    = BackgroundJobBase.stat_dead 
            self.stat_code = BackgroundJobBase.stat_dead_c 
            self.finished  = None 
            self.result    = ('<BackgroundJob died, call jobs.traceback() for details>') 
            self._tb       = self._make_tb() 
        else: 
            self.status    = BackgroundJobBase.stat_completed 
            self.stat_code = BackgroundJobBase.stat_completed_c 
            self.finished  = True 
 
 
class BackgroundJobExpr(BackgroundJobBase): 
    """Evaluate an expression as a background job (uses a separate thread).""" 
 
    def __init__(self, expression, glob=None, loc=None): 
        """Create a new job from a string which can be fed to eval(). 
 
        global/locals dicts can be provided, which will be passed to the eval 
        call.""" 
 
        # fail immediately if the given expression can't be compiled 
        self.code = compile(expression,'<BackgroundJob compilation>','eval') 
                 
        glob = {} if glob is None else glob 
        loc = {} if loc is None else loc 
        self.expression = self.strform = expression 
        self.glob = glob 
        self.loc = loc 
        self._init() 
         
    def call(self): 
        return eval(self.code,self.glob,self.loc) 
 
 
class BackgroundJobFunc(BackgroundJobBase): 
    """Run a function call as a background job (uses a separate thread).""" 
 
    def __init__(self, func, *args, **kwargs): 
        """Create a new job from a callable object. 
 
        Any positional arguments and keyword args given to this constructor 
        after the initial callable are passed directly to it.""" 
 
        if not callable(func): 
            raise TypeError( 
                'first argument to BackgroundJobFunc must be callable') 
         
        self.func = func 
        self.args = args 
        self.kwargs = kwargs 
        # The string form will only include the function passed, because 
        # generating string representations of the arguments is a potentially 
        # _very_ expensive operation (e.g. with large arrays). 
        self.strform = str(func) 
        self._init() 
 
    def call(self): 
        return self.func(*self.args, **self.kwargs)