aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/tests/s-expressions/suites/SingleYamrOperation/CombineByKeyStream.yql
blob: 4957a8671d7823c994aea1800c8f8983a5527c1f (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
(
#comment
(let mr_source (DataSource 'yt 'plato))
(let x (Read! world mr_source (Key '('table (String 'Input))) '('key 'subkey 'value) '()))
(let world (Left! x))
(let table1 (Right! x))
(let preMap (lambda '(item) (Iterator (AsList item))))
(let keyExtractor (lambda '(item) (Member item 'key)))
(let init (lambda '(key item) (FromString (Member item 'subkey) 'Uint32)))
(let update (lambda '(key item state) (+ state (FromString (Member item 'subkey) 'Uint32))))
(let finish (lambda '(key state) (block '(
  (let s (Struct))
  (let s (AddMember s 'key key))
  (let s (AddMember s 'subkey (String '.)))
  (let s (AddMember s 'value (Coalesce (Map state (lambda '(x) (ToString x))) (String '""))))
  (return (Iterator (AsList s)))
))))
(let table2 (CombineByKey table1 preMap keyExtractor init update finish))
(let mr_sink (DataSink 'yt 'plato))
(let result (Sort table2 (Bool 'true) (lambda '(x) (Member x 'key))))
(let world (Write! world mr_sink (Key '('table (String 'Output))) result '('('mode 'append))))
(let world (Commit! world mr_sink))
(return world)
)