aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/tests/s-expressions/suites/MultiIO/MapOverDemux.yql
blob: 4697cb40bafc043660876b457aa4218b40ba8347 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
(
(let mr_source (DataSource 'yt 'plato))
(let mr_sink (DataSink 'yt 'plato))

(let x (Read! world mr_source (Key '('table (String 'Input1))) (Void) '()))
(let world (Left! x))
(let data (Right! x))

(let varTuple (VariantType (TupleType
    (StructType
        '('key (DataType 'String))
        '('subkey (DataType 'String))
        '('value (DataType 'String))
    )
    (StructType
        '('key (DataType 'String))
        '('subkey (DataType 'String))
        '('value (DataType 'String))
    )
)))

(let data (Map data (lambda '(item) (block '(
    (let intValue (FromString (Member item 'key) 'Int32))
    (let res
        (If (Coalesce (Equal (% intValue (Int32 '2)) (Int32 '0)) (Bool 'false))
            (Variant item '0 varTuple)
            (Variant item '1 varTuple)
        )
    )
   (return res)
)))))

(let dataTuple (Demux data))

(let data1 (Nth dataTuple '0))
(let data2 (Nth dataTuple '1))

(let data1 (Filter data1 (lambda '(item) (Less (Member item 'key) (String '200)))))

(let world (Write! world mr_sink (Key '('table (String 'Output1))) data1 '('('mode 'renew))))
(let world (Write! world mr_sink (Key '('table (String 'Output2))) data2 '('('mode 'renew))))

(let world (Commit! world mr_sink))
(return world)
)