aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/tests/s-expressions/suites/Udf/RecordRemapReduce.yql
blob: df1bf89415a4d3958ea00b73c8ca625b257e811f (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
(
#comment
(let mr_source (DataSource 'yt 'plato))
(let x (Read! world mr_source (Key '('table (String 'Input))) '('key 'subkey 'value) '()))
(let world (Left! x))
(let table1 (Right! x))
(let listHandler (lambda '(stream) (FlatMap stream (lambda '(pair) (block '(
  (let key (Nth pair '0))
  (let list (Collect (Nth pair '1)))
  (let r (FlatMap list (lambda '(x) (block '(
    (let s (Struct))
    (let s (AddMember s 'key key))
    (let s (AddMember s 'subkey (String '.)))
    (let s (AddMember s 'value (ToString (Length list))))
    (return (AsList s))
  )))))
  (return r)
))))))
(let keySelector (lambda '(x) (Member x 'key)))
(let reducedTable (PartitionByKey table1 keySelector (Void) (Void) listHandler))
(let mr_sink (DataSink 'yt 'plato))
(let world (Write! world mr_sink (Key '('table (String 'Output))) reducedTable '('('mode 'append))))
(let world (Commit! world mr_sink))
(return world)
)