blob: 27c4faf64735f44e07b961fc6e871958431d3771 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
(
#comment
(let res_sink (DataSink 'result))
(let mr_source (DataSource 'yt 'plato))
(let x (Read! world mr_source (Key '('table (String 'Input))) '('key 'subkey 'value) '()))
(let world (Left! x))
(let table1 (Right! x))
(let init (lambda '(x parent) '(parent (StrictFromString (Member x 'value) 'Uint32))))
(let init_distinct (lambda '(x parent) '(parent (StrictFromString x 'Uint32))))
(let update_sum_distinct (lambda '(x y parent) '(parent (+ (StrictFromString x 'Uint32) (Nth y '1)))))
(let save (lambda '(x) x))
(let load (lambda '(x) x))
(let merge_sum (lambda '(x y) '((Nth x '0) (AggrAdd (Nth x '1) (Nth y '1)))))
(let finish (lambda '(x) (Nth x '1)))
# distinct process one column and requires data/data? type
(let sum (AggregationTraits (StructMemberType (ListItemType (TypeOf table1)) 'value) init_distinct update_sum_distinct save load merge_sum finish (Null)))
(let resAll (Aggregate table1 '() '('('distsum sum 'value))))
(let world (Write! world res_sink (Key) resAll '('('type))))
(let resKey (Aggregate table1 '('key) '('('distsum sum 'value))))
(let sortedOutput (Sort resKey (Bool 'false) (lambda '(x) (Member x 'key) )))
(let world (Write! world res_sink (Key) sortedOutput '('('type))))
(let world (Commit! world res_sink))
(return world)
)
|