aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/tests/s-expressions/suites/ManyYamrOperations/MapThenGroup.yql
blob: 6248b7416ea682ba3e96ea2efd62e4b4cabaeff4 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
(
#comment
(let mr_source (DataSource 'yt 'plato))
(let x (Read! world mr_source (Key '('table (String 'Input))) '('key 'subkey 'value) '()))
(let world (Left! x))
(let table1 (Right! x))
(let tresh (Int32 'x"64000000"))
(let table1low (FlatMap table1 (lambda '(item) (block '(
   (let intValueOpt (FromString (Member item 'key) 'Int32))
   (let ret (FlatMap intValueOpt (lambda '(item2) (block '(
      (let s (ListIf (< item2 tresh) item))
      (return s)
   )))))
   (return ret)
)))))
(let keySelector (lambda '(x) (Member x 'key)))
(let listHandler (lambda '(stream) (FlatMap stream (lambda '(pair) (block '(
  (let key (Nth pair '0))
  (let list (ForwardList (Nth pair '1)))
  (let s (Struct))
  (let s (AddMember s 'key key))
  (let s (AddMember s 'subkey (String '.)))
  (let s (AddMember s 'value (ToString (Length list))))
  (let ret (AsList s))
  (return ret)
))))))
(let reducedTable (PartitionByKey table1low keySelector (Void) (Void) listHandler))
(let mr_sink (DataSink 'yt (quote plato)))
(let world (Write! world mr_sink (Key '('table (String 'Output))) reducedTable '('('mode 'append))))
(let world (Commit! world mr_sink))
(return world)
)