blob: 6d0405e5628e880df6e89be037a932e9f30ecb69 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
(
(let world (block '(
(let x (Read! world (DataSource '"yt" '"plato") (Key '('table (String '"Input"))) '('"key") '()))
(let world (Left! x))
(let table1 (Right! x))
(let output (block '(
(let select (block '(
(let core table1)
(let core (block '(
(let Count0_create (lambda '(row) (AggrCountInit row)))
(let Count0_update (lambda '(row state) (AggrCountUpdate row state)))
(let Count0_save (lambda '(state) state))
(let Count0_load (lambda '(item) item))
(let Count0_merge (lambda '(a b) (OptionalReduce a b (lambda '(a b) (+ a b)))))
(let Count0_finish (lambda '(state) state))
(return (Aggregate core '() '('('Count0 (AggregationTraits (StructMemberType (ListItemType (TypeOf core)) '"key") Count0_create Count0_update Count0_save Count0_load Count0_merge Count0_finish (Uint64 '0)) '"key"))))
)))
(let core (FlatMap core (lambda '(row) (block '(
(let res (Struct))
(let res (AddMember res '"column0" (Member row 'Count0)))
(let res (AsList res))
(return res)
)))))
(return core)
)))
(return select)
)))
(let world (block '(
(let result_sink (DataSink 'result))
(let world (Write! world result_sink (Key) output '('('type) '('autoref) '('columns '('"column0")))))
(return (Commit! world result_sink))
)))
(return world)
)))
(let world (block '(
(let plato_sink (DataSink '"yt" '"plato"))
(let world (Commit! world plato_sink))
(return world)
)))
(return world)
)
|