1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
import functools
from toolz.itertoolz import partition_all
from toolz.utils import no_default
def _reduce(func, seq, initial=None):
if initial is None:
return functools.reduce(func, seq)
else:
return functools.reduce(func, seq, initial)
def fold(binop, seq, default=no_default, map=map, chunksize=128, combine=None):
"""
Reduce without guarantee of ordered reduction.
inputs:
``binop`` - associative operator. The associative property allows us to
leverage a parallel map to perform reductions in parallel.
``seq`` - a sequence to be aggregated
``default`` - an identity element like 0 for ``add`` or 1 for mul
``map`` - an implementation of ``map``. This may be parallel and
determines how work is distributed.
``chunksize`` - Number of elements of ``seq`` that should be handled
within a single function call
``combine`` - Binary operator to combine two intermediate results.
If ``binop`` is of type (total, item) -> total
then ``combine`` is of type (total, total) -> total
Defaults to ``binop`` for common case of operators like add
Fold chunks up the collection into blocks of size ``chunksize`` and then
feeds each of these to calls to ``reduce``. This work is distributed
with a call to ``map``, gathered back and then refolded to finish the
computation. In this way ``fold`` specifies only how to chunk up data but
leaves the distribution of this work to an externally provided ``map``
function. This function can be sequential or rely on multithreading,
multiprocessing, or even distributed solutions.
If ``map`` intends to serialize functions it should be prepared to accept
and serialize lambdas. Note that the standard ``pickle`` module fails
here.
Example
-------
>>> # Provide a parallel map to accomplish a parallel sum
>>> from operator import add
>>> fold(add, [1, 2, 3, 4], chunksize=2, map=map)
10
"""
assert chunksize > 1
if combine is None:
combine = binop
chunks = partition_all(chunksize, seq)
# Evaluate sequence in chunks via map
if default == no_default:
results = map(
functools.partial(_reduce, binop),
chunks)
else:
results = map(
functools.partial(_reduce, binop, initial=default),
chunks)
results = list(results) # TODO: Support complete laziness
if len(results) == 1: # Return completed result
return results[0]
else: # Recurse to reaggregate intermediate results
return fold(combine, results, map=map, chunksize=chunksize)
|