aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/tests/s-expressions/suites/Optimizers/TakeLimitAfterGroup.yqls
blob: d0fa6fb1ddd132aff9091d2b0c3ecbff81213199 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
(
(let mr_source (DataSource 'yt 'plato))
(let x (Read! world mr_source
    (Key '('table (String 'Input)))
    (Void) '()))
(let world (Left! x))
(let table1 (Right! x))

(let keyExtractor (lambda '(item) (Member item 'key)))
(let handler (lambda '(stream) (FlatMap stream (lambda '(pair) (block '(
  (let key (Nth pair '0))
  (let list (ForwardList (Nth pair '1)))
  (let s (Struct))
  (let s (AddMember s 'key key))
  (let s (AddMember s 'subkey (String '.)))
  (let f (Fold list (Uint64 '0) (lambda '(item state) (block '(
      (let value (Coalesce (FromString (Member item 'subkey) 'Uint64) (Uint64 '0)))
      (return (+ value state))
  )))))
  (let s (AddMember s 'value (ToString f)))
  (return (Just s))
))))))

(let table2 (PartitionByKey table1 keyExtractor (Void) (Void) handler))
(let table2 (Take table2 (Uint64 '4)))
(let table2 (Sort table2 (Bool 'true) (lambda '(item) (Member item 'value))))
(let res_sink (DataSink 'result))
(let world (Write! world res_sink
    (Key)
    table2 '()))

(let world (Commit! world res_sink))
(return world)
)