aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/tests/s-expressions/suites/InMem/Fold1MapOverPreservedStream.yqls
blob: 1a3aad97c979e78e6ec5c9a88f0799716b70daa6 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
(
(let config (DataSource 'config))
(let world (Configure! world config 'PureDataSource 'yt))

(let res_sink (DataSink 'result))
(let list (AsList
    '((Uint32 '1) (String 'r))
    '((Uint32 '1) (String 'a))
    '((Uint32 '2) (String 'b))
    '((Uint32 '3) (String 'b))
    '((Uint32 '5) (String 'i))
    '((Uint32 '8) (String 't))
))
(let base_stream (Iterator list))

(let queue (QueueCreate (ListItemType (TypeOf list)) (Uint64 '5) (Uint64 '0)))
#(let queue (QueueCreate (TupleType (DataType 'Uint32) (DataType 'String)) (Uint64 '5) (Uint64 '0)))
(let stream (PreserveStream base_stream queue (Uint64 '2)))

(let init (lambda '(item) (block '(
    (let key (Nth item '0))
    (let val (Nth item '1))
    (let next (QueuePeek queue (Uint64 '1) (DependsOn item)))
    (let over (QueuePeek queue (Uint64 '2) (DependsOn item)))
    (let nkey (Nth next '0))
    (let nval (Nth next '1))
    (let okey (Nth over '0))
    (let oval (Nth over '1))
    (let skey key)
    (let sval val)
    (let validate (== (+ key (Coalesce nkey (Uint32 '0))) (Coalesce okey (Uint32 '0))))
    (let words (Concat (Concat val (Coalesce nval (String '""))) (Coalesce oval (String '""))))
    (let new_item '(key val skey sval validate words))
    (let new_state '(skey sval))
    (return '(new_item new_state)
)))))

(let update (lambda '(item state) (block '(
    (let key (Nth item '0))
    (let val (Nth item '1))
    (let next (QueuePeek queue (Uint64 '1) (DependsOn item)))
    (let over (QueuePeek queue (Uint64 '2) (DependsOn item)))
    (let nkey (Nth next '0))
    (let nval (Nth next '1))
    (let okey (Nth over '0))
    (let oval (Nth over '1))
    (let skey (Nth state '0))
    (let sval (Nth state '1))
    (let skey (+ skey key))
    (let sval (Concat sval val))
    (let validate (== (+ key (Coalesce nkey (Uint32 '0))) (Coalesce okey (Uint32 '0))))
    (let words (Concat (Concat val (Coalesce nval (String '""))) (Coalesce oval (String '""))))
    (let new_item '(key val skey sval validate words))
    (let new_state '(skey sval))
    (return '(new_item new_state)
)))))

(let result (Fold1Map stream init update))

(let world (Write! world res_sink (Key) (Collect result) '('('type))))
(let world (Commit! world res_sink))
(return world)
)