aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/tests/s-expressions/suites/Optimizers/DemuxOverExtend.yql
blob: b8eba4c967dbae4740a5637c974bd42b26336310 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
(
(let mr_source (DataSource 'yt 'plato))
(let mr_sink (DataSink 'yt 'plato))

(let x (Read! world mr_source (Key '('table (String 'Input))) (Void) '()))
(let world (Left! x))
(let data (Right! x))

(let varTuple (VariantType (TupleType
    (StructType
        '('key (DataType 'String))
        '('subkey (DataType 'String))
        '('value (DataType 'String))
    )
    (StructType
        '('key (DataType 'String))
        '('subkey (DataType 'String))
        '('value (DataType 'String))
    )
)))

(let varData1 (Map data (lambda '(item) (block '(
    (let intValue (FromString (Member item 'key) 'Int32))
    (let res
        (If (Coalesce (Equal (% intValue (Int32 '2)) (Int32 '0)) (Bool 'false))
            (Variant item '0 varTuple)
            (Variant item '1 varTuple)
        )
    )
   (return res)
)))))

(let varData2 (Map data (lambda '(item) (block '(
    (let intValue (FromString (Member item 'key) 'Int32))
    (let intValue (+ intValue (Int32 '1)))
    (let res
        (If (Coalesce (Equal (% intValue (Int32 '2)) (Int32 '0)) (Bool 'false))
            (Variant item '0 varTuple)
            (Variant item '1 varTuple)
        )
    )
   (return res)
)))))

(let dataTuple (Demux (Extend varData1 varData2)))

(let data1 (Nth dataTuple '0))
(let data2 (Nth dataTuple '1))

(let data1 (Filter data1 (lambda '(item) (Less (Member item 'key) (String '200)))))

(let world (Write! world mr_sink (Key '('table (String 'Output))) data1 '('('mode 'renew))))
(let world (Write! world mr_sink (Key '('table (String 'Output))) data2 '('('mode 'append))))

(let world (Commit! world mr_sink))
(return world)
)