blob: 5a39b908878cc16875294bcd1feedb9def767236 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
(
(let x (Read! world (DataSource '"yt" '"plato") (Key '('table (String '"Input"))) '('"key" '"subkey" '"value") '()))
(let world (Left! x))
(let table0 (Right! x))
(let preMap (lambda '(item) (Just item)))
(let keyExtractor (lambda '(item) (Member item 'key)))
(let init (lambda '(key item) (Member item 'value)))
(let update (lambda '(key item state) (Concat state (Member item 'value))))
(let finish (lambda '(key state) (block '(
(let s (Struct))
(let s (AddMember s 'key key))
(let s (AddMember s 'subkey (String '.)))
(let s (AddMember s 'value state))
(return (Just s))
))))
(let combine (CombineByKey table0 preMap keyExtractor init update finish))
(let listHandler (lambda '(stream) (FlatMap stream (lambda '(pair) (block '(
(let key (Nth pair '0))
(let list (ForwardList (Nth pair '1)))
(let s (Struct))
(let s (AddMember s 'key key))
(let s (AddMember s 'subkey (String '.)))
(let value (Fold1 list
(lambda '(item) (Member item 'value))
(lambda '(item state) (Concat state (Member item 'value)))
))
(let s (AddMember s 'value (Coalesce value (String '""))))
(let ret (AsList s))
(return ret)
))))))
(let reducedTable (PartitionByKey combine keyExtractor (Void) (Void) listHandler))
(let mr_sink (DataSink 'yt (quote plato)))
(let world (Write! world mr_sink (Key '('table (String 'Output))) reducedTable '('('mode 'append))))
(let world (Commit! world mr_sink))
(return world)
)
|