blob: 4357fcf5c75dd29df079d9054c5dd0c4b7975f4a (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
(
#comment
(let config (DataSource 'config))
(let world (Configure! world config 'PureDataSource 'yt))
(let init (lambda '(x parent) '(parent (Member x 'value))))
(let init_distinct (lambda '(x parent) '(parent x)))
(let update_min (lambda '(x y parent) '(parent (Min (Member x 'value) (Nth y '1)))))
(let update_sum_distinct (lambda '(x y parent) '(parent (Add x (Nth y '1)))))
(let save (lambda '(x) x))
(let load (lambda '(x) x))
(let merge_min (lambda '(x y) (AggrMin x y)))
(let merge_sum (lambda '(x y) '((Nth x '0) (AggrAdd (Nth y '1) (Nth x '1)))))
(let finish (lambda '(x) (Nth x '1)))
(let finish_min2 (lambda '(x) '((Nth x '1) (* (Uint32 '2) (Nth x '1)))))
(let list (AsList
(AsStruct '('key (Uint32 '1)) '('value (Uint32 '2)))
(AsStruct '('key (Uint32 '2)) '('value (Uint32 '3)))
(AsStruct '('key (Uint32 '1)) '('value (Uint32 '4)))
(AsStruct '('key (Uint32 '3)) '('value (Uint32 '10)))
(AsStruct '('key (Uint32 '2)) '('value (Uint32 '5)))
(AsStruct '('key (Uint32 '2)) '('value (Uint32 '5)))
))
# non-distinct processes row
(let min (AggregationTraits (ListItemType (TypeOf list)) init update_min save load merge_min finish_min2 (Null)))
# distinct process one column and requires data/data? type
(let sum (AggregationTraits (StructMemberType (ListItemType (TypeOf list)) 'value) init_distinct update_sum_distinct save load merge_sum finish (Null)))
(let resAll (Aggregate list '() '('('('minvalue 'minvalue2) min) '('distsum sum 'value))))
(let res_sink (DataSink 'result))
(let world (Write! world res_sink (Key) resAll '('('type))))
(let resKey (Aggregate list '('key) '('('('minvalue 'minvalue2) min) '('distsum sum 'value))))
(let world (Write! world res_sink (Key) resKey '('('type))))
(let world (Commit! world res_sink))
(return world)
)
|