aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/tests/s-expressions/suites/SingleYamrOperation/PartitionByKeyStream.yql
blob: 938bb3e8598d4145e1267ecfed68fcf3a6e50ec2 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
(
#comment
(let mr_source (DataSource 'yt 'plato))
(let x (Read! world mr_source (Key '('table (String 'Input))) '('key 'subkey 'value) '()))
(let world (Left! x))
(let table1 (Right! x))
(let keyExtractor (lambda '(item) (Member item 'key)))
(let handler (lambda '(stream) (FlatMap stream (lambda '(pair) (block '(
  (let key (Nth pair '0))
  (let list (ForwardList (Nth pair '1)))
  (let s (Struct))
  (let s (AddMember s 'key key))
  (let s (AddMember s 'subkey (String '.)))
  (let s (AddMember s 'value (ToString (Length list))))
  (return (Iterator (AsList s)))
))))))
(let table2 (PartitionByKey table1 keyExtractor (Void) (Void) handler))
(let mr_sink (DataSink 'yt 'plato))
(let result (Sort table2 (Bool 'true) (lambda '(x) (Member x 'key))))
(let world (Write! world mr_sink (Key '('table (String 'Output))) result '('('mode 'append))))
(let world (Commit! world mr_sink))
(return world)
)