aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/tests/s-expressions/suites/SingleYamrOperation/PartitionByKeySorted.yqls
blob: 88c71a6f9dd54f4fc60f0ba056c2c67042938848 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
(
#comment
(let mr_source (DataSource 'yt 'plato))
(let x (Read! world mr_source (Key '('table (String 'Input))) '('key 'subkey 'value) '()))
(let world (Left! x))
(let table1 (Right! x))
(let keySelector (lambda '(x) (Member x 'key)))
(let sortKeySelector (lambda '(x) (Member x 'value)))
(let listHandler (lambda '(groups) (block '(
  (return (Map groups (lambda '(group) (block '(
     (let key (Nth group '0))
     (let stream (Nth group '1))
     (let s (Struct))
     (let s (AddMember s 'key key))
     (let s (AddMember s 'subkey (String '.)))
     (let s (AddMember s 'value (Collect (Condense stream (String '"") (lambda '(item state) (Bool 'False)) (lambda '(item state) (Concat (Concat state (String '" ")) (Member item 'value)))))))
     (return s)
  )))))
))))
(let reducedTable (PartitionByKey table1 keySelector (Bool 'false) sortKeySelector listHandler))
(let mr_sink (DataSink 'yt (quote plato)))
(let world (Write! world mr_sink (Key '('table (String 'Output))) reducedTable '('('mode 'append))))
(let world (Commit! world mr_sink))
(return world)
)