aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/tests/s-expressions/suites/InMem/Hopping.yqls
blob: 0ae29c359401cf77e7265cd0c42a335c87499bd1 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
(
(let config (DataSource 'config))
(let world (Configure! world config 'PureDataSource 'yt))

(let timeExtractor (lambda '(item) (Just (Member item 'time))))

(let init (lambda '(item)
    (AsStruct
        '('sum (Member item 'sum))
        '('max (Member item 'max))
)))
(let update (lambda '(item state)
    (AsStruct
        '('sum (AggrAdd (Member item 'sum) (Member state 'sum)))
        '('max (AggrMax (Member item 'max) (Member state 'max)))
)))
(let merge (lambda '(state1 state2)
    (AsStruct
        '('sum (AggrAdd (Member state1 'sum) (Member state2 'sum)))
        '('max (AggrMax (Member state1 'max) (Member state2 'max)))
)))
(let save (lambda '(state) state))
(let load (lambda '(state) state))
(let finish (lambda '(state time) (AddMember state '_yql_time time)))

(let stream (Iterator (AsList
(AsStruct '('time (Timestamp  '1)) '('sum (Uint32 '2)) '('max (String 'f)))
(AsStruct '('time (Timestamp  '2)) '('sum (Uint32 '3)) '('max (String 'a)))
(AsStruct '('time (Timestamp '15)) '('sum (Uint32 '4)) '('max (String 'e)))
(AsStruct '('time (Timestamp '23)) '('sum (Uint32 '6)) '('max (String 'h)))
(AsStruct '('time (Timestamp '24)) '('sum (Uint32 '5)) '('max (String 'd)))
(AsStruct '('time (Timestamp '25)) '('sum (Uint32 '7)) '('max (String 's)))
(AsStruct '('time (Timestamp '40)) '('sum (Uint32 '2)) '('max (String 'j)))
(AsStruct '('time (Timestamp '47)) '('sum (Uint32 '1)) '('max (String 't)))
(AsStruct '('time (Timestamp '51)) '('sum (Uint32 '6)) '('max (String 'b)))
(AsStruct '('time (Timestamp '59)) '('sum (Uint32 '2)) '('max (String 'c)))
(AsStruct '('time (Timestamp '85)) '('sum (Uint32 '8)) '('max (String 'g)))
(AsStruct '('time (Timestamp '55)) '('sum (Uint32 '1000)) '('max (String 'z)))
(AsStruct '('time (Timestamp '200)) '('sum (Uint32 '0)) '('max (String 'a)))
)))

# row with time 55 should be excluded from aggregation due to delay

(let itemType (StructType '('time (DataType 'Timestamp)) '('sum (DataType 'Uint32)) '('max (DataType 'String))))
(let res (HoppingCore stream timeExtractor (Interval '10) (Interval '30) (Interval '20) init update save load merge finish))

(let res_sink (DataSink 'result))
(let world (Write! world res_sink (Key) (Collect res) '('('type))))
(let world (Commit! world res_sink))
(return world)
)