blob: c28ef4d0352c0670d4fed0deb70aeaf5cc5fab2c (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
(
#comment
(let mr_source (DataSource 'yt 'plato))
(let x (Read! world mr_source (Key '('table (String 'Input))) '('key 'subkey 'value) '()))
(let world (Left! x))
(let table1 (Right! x))
(let keySelector (lambda '(x) (Member x 'key)))
(let listHandler (lambda '(stream) (FlatMap stream (lambda '(pair) (block '(
(let key (Nth pair '0))
(let list (ForwardList (Nth pair '1)))
(let list (Skip (Take (Enumerate list) (Uint64 '2)) (Uint64 '1)))
(let r (FlatMap list (lambda '(x) (block '(
(let s (Struct))
(let s (AddMember s 'key key))
(let s (AddMember s 'subkey (String '.)))
(let s (AddMember s 'value (Concat (ToString (Nth x '0)) (ToString (Member (Nth x '1) 'value)))))
(return (AsList s))
)))))
(return r)
))))))
(let reducedTable (PartitionByKey table1 keySelector (Void) (Void)listHandler))
(let mr_sink (DataSink 'yt (quote plato)))
(let world (Write! world mr_sink (Key '('table (String 'Output))) reducedTable '('('mode 'append))))
(let world (Commit! world mr_sink))
(return world)
)
|